StompSession

interface StompSession(source)

A coroutine-based STOMP session. This interface defines interactions with a STOMP server.

Suspension & Receipts

The STOMP protocol supports RECEIPT frames, allowing the client to know when the server has received a frame. This only happens if a receipt header is set on the client frame.

If auto-receipt is enabled, a receipt header is automatically generated and added to all client frames supporting the mechanism, and for which a receipt header is not already present. If auto-receipt is not enabled, a receipt header may still be provided manually in the parameters of some overloads.

When a receipt header is present (automatically added or manually provided), the method that is used to send the frame suspends until the corresponding RECEIPT frame is received from the server. If no RECEIPT frame is received from the server in the configured time limit, a LostReceiptException is thrown.

If no receipt is provided and auto-receipt is disabled, the method used to send the frame doesn't wait for a RECEIPT frame and never throws LostReceiptException. Instead, it returns immediately after the underlying web socket implementation is done sending the frame.

Subscriptions

The subscribe overloads are a bit unconventional because they are suspending and yet they return a Flow.

They make use of the suspension mechanism to allow callers to wait for the subscription to actually happen. This can only be guaranteed when making use of the receipts mechanism (please see the previous section). All subscribe overloads send a SUBSCRIBE frame immediately, and suspend until the corresponding RECEIPT frame is received (when applicable) or when the SUBSCRIBE frame is sent (when the receipt mechanism is not used).

The returned Flow is the flow of messages that are received as part of the subscription. This flow automatically unsubscribes by sending an UNSUBSCRIBE frame in the following situations:

  • the flow collector's job is cancelled

  • the flow collector's code throws an exception

  • the flow's consumer uses a terminal operator that ends the flow early, such as first

Because of this automatic unsubscription, the returned flow can only be collected once. Multiple collections of the returned flow (parallel or sequential) result in an unspecified behaviour.

If an error occurs upstream (e.g. STOMP ERROR frame or unexpected web socket closure), then all subscription flow collectors throw the relevant exception, but no UNSUBSCRIBE frame is sent (because the connection is failed).

Various extension functions are available to subscribe to a destination with predefined message conversions. You can also apply your own operators on the returned flows to convert/handle message frames.

Heart beats

When configured, heart beats can be used as a keep-alive to detect if the connection is lost. The StompConfig.heartBeat property should be used to configure heart beats in the StompClient.

Sending heart beats is automatically handled by StompSession implementations. If expected heart beats are not received in time, a MissingHeartBeatException is thrown and fails active subscriptions.

Inheritors

Functions

Link copied to clipboard
abstract suspend fun abort(transactionId: String)

Sends an ABORT frame with the given transactionId.

Link copied to clipboard
abstract suspend fun ack(ackId: String, transactionId: String? = null)

Sends an ACK frame with the given ackId.

Link copied to clipboard
abstract suspend fun begin(transactionId: String)

Sends a BEGIN frame with the given transactionId.

Link copied to clipboard
abstract suspend fun commit(transactionId: String)

Sends a COMMIT frame with the given transactionId.

Link copied to clipboard
abstract suspend fun disconnect()

If graceful disconnect is enabled (which is the default), sends a DISCONNECT frame to close the session, waits for the relevant RECEIPT frame, and then closes the connection. Otherwise, force-closes the connection.

Link copied to clipboard
abstract suspend fun nack(ackId: String, transactionId: String? = null)

Sends a NACK frame with the given ackId.

Link copied to clipboard
abstract suspend fun send(headers: StompSendHeaders, body: FrameBody?): StompReceipt?

Sends a SEND frame to the server with the given headers and the given body.

Link copied to clipboard
suspend fun StompSession.sendBinary(destination: String, body: ByteString?): StompReceipt?

Sends a SEND frame to the server at the given destination with the given binary body.

Link copied to clipboard
suspend fun StompSession.sendEmptyMsg(destination: String): StompReceipt?

Sends a SEND frame to the server at the given destination without body.

Link copied to clipboard
suspend fun StompSession.sendText(destination: String, body: String?): StompReceipt?

Sends a SEND frame to the server at the given destination with the given textual body.

Link copied to clipboard
abstract suspend fun subscribe(headers: StompSubscribeHeaders): Flow<StompFrame.Message>

Subscribes and returns a Flow of MESSAGE frames that unsubscribes automatically when the collector is done or cancelled. The returned flow can be collected only once.

Link copied to clipboard
suspend fun StompSession.subscribe(destination: String): Flow<StompFrame.Message>

Subscribes and returns a Flow of MESSAGE frames that unsubscribes automatically when the collector is done or cancelled. The returned flow can be collected only once.

Link copied to clipboard
suspend fun StompSession.subscribeBinary(destination: String): Flow<ByteString>

Subscribes and returns a Flow of binary message bodies that unsubscribes automatically when the collector is done or cancelled. The returned flow can be collected only once.

Link copied to clipboard
suspend fun StompSession.subscribeText(destination: String): Flow<String>

Subscribes and returns a Flow of text message bodies that unsubscribes automatically when the collector is done or cancelled. The returned flow can be collected only once.

Link copied to clipboard

Wraps this StompSession to add methods that can convert message bodies using the provided converter.

Link copied to clipboard
suspend fun <T> StompSession.withTransaction(block: suspend StompSession.(transactionId: String) -> T): T

Executes the given block as part of a transaction.