Class AbstractWebsocketClient
java.lang.Object
io.fluxzero.sdk.common.websocket.AbstractWebsocketClient
- All Implemented Interfaces:
WebsocketEndpoint, AutoCloseable
- Direct Known Subclasses:
WebSocketEventStoreClient, WebsocketGatewayClient, WebsocketKeyValueClient, WebsocketSchedulingClient, WebSocketSearchClient, WebsocketTrackingClient
public abstract class AbstractWebsocketClient
extends Object
implements WebsocketEndpoint, AutoCloseable
Abstract base class for all WebSocket-based clients in the Fluxzero Java client.
This class provides robust connection management, message dispatching, result handling, batching, metrics publishing,
and ping-based health checking. It underpins core components such as WebsocketGatewayClient, providing the
shared infrastructure needed for durable, resilient WebSocket communication with the Fluxzero Runtime.
Core Responsibilities
- Establishing and maintaining WebSocket connections with automatic reconnection support
- Managing message sending and batching via
RequestandRequestBatch - Receiving and processing incoming
RequestResultandResultBatchmessages - Supporting command guarantees (e.g., SENT, STORED) with retries and backpressure handling
- Sending periodic ping frames to detect connection drops
- Integrating with the Fluxzero metrics infrastructure for custom performance telemetry
Key Features
- Session Pooling: Maintains multiple concurrent sessions to handle high-throughput scenarios
- Request Backlogs: Each session has a backlog to buffer and batch outgoing requests
- Ping Scheduling: Scheduled tasks detect broken sessions using WebSocket pings
- Auto Retry: Failed requests are retried if the session is closed unexpectedly
- Async Result Handling: Responses are handled on a separate thread pool to avoid blocking I/O
- Metrics Publishing: Optional emission of message-related metrics based on configuration
- See Also:
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected static classprotected static final recordprotected static classprotected classNested classes/interfaces inherited from interface WebsocketEndpoint
WebsocketEndpoint.ReceiveTiming -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected static final Stringprotected static final Stringprotected static final intprotected static final Durationstatic com.fasterxml.jackson.databind.ObjectMapperstatic WebsocketConnectorprotected static final Stringprotected static final Stringprotected static final Stringprotected static final Stringprotected static final String -
Constructor Summary
ConstructorsConstructorDescriptionAbstractWebsocketClient(WebsocketConnector connector, URI endpointUri, WebSocketClient client, boolean allowMetrics, Duration reconnectDelay, com.fasterxml.jackson.databind.ObjectMapper objectMapper, int numberOfSessions) Constructs a WebSocket client with fine-grained control over connection setup.AbstractWebsocketClient(URI endpointUri, WebSocketClient client, boolean allowMetrics) Creates a WebSocket client using the given endpoint URI, client implementation, and a flag to allow metrics.AbstractWebsocketClient(URI endpointUri, WebSocketClient client, boolean allowMetrics, int numberOfSessions) Creates a WebSocket client with multiple parallel sessions using default settings. -
Method Summary
Modifier and TypeMethodDescriptionprotected voidabort(WebsocketSession session, String reason) booleanReturns whether this endpoint wants low-level receive timing for binary messages.voidclose()protected voidclose(boolean clearOutstandingRequests) protected WebsocketSessionconnectToServer(WebsocketConnector connector, URI endpointUri) protected RetryConfigurationcreateConnectionRetryConfiguration(URI endpointUri, Duration reconnectDelay) protected static AbstractWebsocketClient.ConnectionSetupcreateConnectionSetup(WebSocketClient.ClientConfig clientConfig) protected CompressionAlgorithmgetCompressionAlgorithm(WebsocketSession session) protected Durationprotected StringgetNegotiatedSessionId(WebsocketSession session) getRuntimeVersion(WebsocketSession session) protected WebSocketTransportFormatgetTransportFormat(WebsocketSession session) protected voidhandleClose(WebsocketSession session, WebsocketCloseReason closeReason) protected voidhandleError(WebsocketSession session, Throwable e) protected voidhandleMessage(byte[] bytes, WebsocketSession session, WebsocketEndpoint.ReceiveTiming receiveTiming) protected voidhandlePong(WebsocketSession session) protected voidhandleResult(RequestResult result, String batchId, String sessionId) protected voidhandleResult(RequestResult result, String batchId, String sessionId, io.fluxzero.sdk.common.websocket.WebsocketResultDiagnostics.ResultTiming clientResultTiming) protected voidlogConnectionRetryStatus(URI endpointUri, RetryStatus status) protected voidlogSuccessfulReconnect(URI endpointUri, RetryStatus status) protected MetadatavoidonClose(WebsocketSession session, WebsocketCloseReason closeReason) Called once when a session closes or is aborted.voidonError(WebsocketSession session, Throwable e) Called when the underlying WebSocket implementation reports an error.voidonMessage(byte[] bytes, WebsocketSession session) Called when a complete binary message has been received.voidonMessage(byte[] bytes, WebsocketSession session, WebsocketEndpoint.ReceiveTiming receiveTiming) Called when a complete binary message has been received, with low-level receive timing when available.voidonOpen(WebsocketSession session) Called after the opening handshake has completed and the session metadata is available.voidonPong(ByteBuffer message, WebsocketSession session) Called when a pong frame has been received.protected voidretryOutstandingRequests(String sessionId) protected voidretryOutstandingRequestsAsync(String sessionId) protected DurationDelay before outstanding requests from a closed session are resent on a replacement session.protected AbstractWebsocketClient.PingRegistrationschedulePing(WebsocketSession session) protected <R extends RequestResult>
CompletableFuture<R> protected <R extends RequestResult>
RsendAndWait(Request request) protected CompletableFuture<Void> sendCommand(Command command) protected voidsendPing(WebsocketSession session) toString()protected WebSocketTransportCodectransportCodec(WebsocketSession session) protected voidtryPublishMetrics(JsonType message, Metadata metadata)
-
Field Details
-
CONNECTION_TIMEOUT_FAILSAFE_GRACE
-
CONNECTION_RETRY_LOG_INTERVAL
protected static final int CONNECTION_RETRY_LOG_INTERVAL- See Also:
-
CLIENT_HANDSHAKE_CONFIGURATOR_USER_PROPERTY
-
CLIENT_SESSION_ID_USER_PROPERTY
-
RUNTIME_SESSION_ID_USER_PROPERTY
-
NEGOTIATED_SESSION_ID_USER_PROPERTY
-
RUNTIME_VERSION_USER_PROPERTY
-
SELECTED_COMPRESSION_ALGORITHM_USER_PROPERTY
-
SELECTED_TRANSPORT_FORMAT_USER_PROPERTY
-
defaultWebsocketConnector
-
defaultObjectMapper
public static com.fasterxml.jackson.databind.ObjectMapper defaultObjectMapper
-
-
Constructor Details
-
AbstractWebsocketClient
Creates a WebSocket client using the given endpoint URI, client implementation, and a flag to allow metrics. Uses a default WebSocket container, default object mapper, and a single WebSocket session.- Parameters:
endpointUri- the URI of the WebSocket endpoint to connect toclient- the client implementation that provides configuration and gateway accessallowMetrics- whether metrics should be published for each request
-
AbstractWebsocketClient
public AbstractWebsocketClient(URI endpointUri, WebSocketClient client, boolean allowMetrics, int numberOfSessions) Creates a WebSocket client with multiple parallel sessions using default settings. This constructor allows you to specify the number of WebSocket sessions to use, which is useful for increasing throughput and isolating message streams.- Parameters:
endpointUri- the URI of the WebSocket endpoint to connect toclient- the client implementation that provides configuration and gateway accessallowMetrics- whether metrics should be published for each requestnumberOfSessions- the number of WebSocket sessions to maintain concurrently
-
AbstractWebsocketClient
public AbstractWebsocketClient(WebsocketConnector connector, URI endpointUri, WebSocketClient client, boolean allowMetrics, Duration reconnectDelay, com.fasterxml.jackson.databind.ObjectMapper objectMapper, int numberOfSessions) Constructs a WebSocket client with fine-grained control over connection setup. This constructor allows you to specify a custom container, reconnect delay, object mapper, and session count. It is primarily used for advanced configuration or test scenarios.- Parameters:
connector- the WebSocket connector to use for establishing connectionsendpointUri- the WebSocket server endpointclient- the client providing config and access to the Fluxzero RuntimeallowMetrics- flag to enable or disable automatic metrics publishingreconnectDelay- the delay between reconnect attempts if the connection is lostobjectMapper- the Jackson object mapper for (de)serializing requests and responsesnumberOfSessions- the number of WebSocket sessions to establish in parallel
-
-
Method Details
-
createConnectionRetryConfiguration
protected RetryConfiguration createConnectionRetryConfiguration(URI endpointUri, Duration reconnectDelay) -
logSuccessfulReconnect
-
logConnectionRetryStatus
-
connectToServer
protected WebsocketSession connectToServer(WebsocketConnector connector, URI endpointUri) throws Exception - Throws:
Exception
-
getConnectionTimeoutFailsafeGrace
-
onOpen
Description copied from interface:WebsocketEndpointCalled after the opening handshake has completed and the session metadata is available.- Specified by:
onOpenin interfaceWebsocketEndpoint- Parameters:
session- the newly opened session
-
send
-
sendAndWait
-
sendCommand
-
onMessage
Description copied from interface:WebsocketEndpointCalled when a complete binary message has been received.- Specified by:
onMessagein interfaceWebsocketEndpoint- Parameters:
bytes- the full binary message payloadsession- the session that received the message
-
onMessage
public void onMessage(byte[] bytes, WebsocketSession session, WebsocketEndpoint.ReceiveTiming receiveTiming) Description copied from interface:WebsocketEndpointCalled when a complete binary message has been received, with low-level receive timing when available.- Specified by:
onMessagein interfaceWebsocketEndpoint- Parameters:
bytes- the full binary message payloadsession- the session that received the messagereceiveTiming- timing captured by the underlying websocket listener
-
captureReceiveTiming
public boolean captureReceiveTiming()Description copied from interface:WebsocketEndpointReturns whether this endpoint wants low-level receive timing for binary messages.- Specified by:
captureReceiveTimingin interfaceWebsocketEndpoint- Returns:
truewhen the websocket session should capture receive and dispatch timestamps
-
handleMessage
protected void handleMessage(byte[] bytes, WebsocketSession session, WebsocketEndpoint.ReceiveTiming receiveTiming) -
handleResult
-
handleResult
protected void handleResult(RequestResult result, String batchId, String sessionId, io.fluxzero.sdk.common.websocket.WebsocketResultDiagnostics.ResultTiming clientResultTiming) -
schedulePing
-
sendPing
-
onPong
Description copied from interface:WebsocketEndpointCalled when a pong frame has been received.- Specified by:
onPongin interfaceWebsocketEndpoint- Parameters:
message- the pong application datasession- the session that received the pong
-
handlePong
-
abort
-
onClose
Description copied from interface:WebsocketEndpointCalled once when a session closes or is aborted.- Specified by:
onClosein interfaceWebsocketEndpoint- Parameters:
session- the closed sessioncloseReason- the close status and optional reason
-
handleClose
-
retryOutstandingRequestsAsync
-
retryOutstandingRequests
-
retryOutstandingRequestsDelay
Delay before outstanding requests from a closed session are resent on a replacement session. -
onError
Description copied from interface:WebsocketEndpointCalled when the underlying WebSocket implementation reports an error.- Specified by:
onErrorin interfaceWebsocketEndpoint- Parameters:
session- the session that failede- the reported error
-
handleError
-
close
public void close()- Specified by:
closein interfaceAutoCloseable
-
toString
-
close
protected void close(boolean clearOutstandingRequests) -
tryPublishMetrics
-
metricsMetadata
-
createConnectionSetup
protected static AbstractWebsocketClient.ConnectionSetup createConnectionSetup(WebSocketClient.ClientConfig clientConfig) -
getNegotiatedSessionId
-
getCompressionAlgorithm
-
getTransportFormat
-
transportCodec
-
getRuntimeVersion
-