Class ChunkedDeserializingMessage

java.lang.Object
io.fluxzero.sdk.common.serialization.DeserializingMessage
io.fluxzero.sdk.common.serialization.ChunkedDeserializingMessage
All Implemented Interfaces:
HasMetadata, HasMessage

public class ChunkedDeserializingMessage extends DeserializingMessage
Deserializing view for a message whose payload is split across multiple indexed messages.

The first chunk can be handled immediately as an InputStream. Typed payload access waits until the final chunk has been observed in the normal tracking stream.

  • Constructor Details

    • ChunkedDeserializingMessage

      public ChunkedDeserializingMessage(SerializedMessage firstChunk, MessageType messageType, String topic, Serializer serializer)
      Creates a chunked message view starting at the first observed chunk.
      Parameters:
      firstChunk - the first chunk of the message
      messageType - the type of message being reconstructed
      topic - the topic associated with the message, if any
      serializer - the serializer used for typed payload access
  • Method Details

    • recoverFromContinuation

      public static Optional<ChunkedDeserializingMessage> recoverFromContinuation(SerializedMessage continuation, MessageType messageType, String topic, Serializer serializer, long minIndexInclusive, int pageSize, ChunkedDeserializingMessage.ChunkReader chunkReader)
      Reconstructs a chunked message when tracking observes a continuation after the original first chunk was already read by an earlier tracker run.
      Parameters:
      continuation - the observed continuation chunk
      messageType - the type of message being reconstructed
      topic - the topic associated with the message, if any
      serializer - the serializer used for typed payload access
      minIndexInclusive - the first index to inspect while looking for earlier chunks
      pageSize - the maximum number of messages to request per range read
      chunkReader - callback used to read earlier messages from the tracking stream
      Returns:
      a reconstructed chunked message when the first chunk and required previous continuations were found
    • getPayloadAs

      public <R> R getPayloadAs(Type type)
      Description copied from interface: HasMessage
      Retrieves the message payload, deserializing if necessary and optionally converted to the given type.

      By default, this performs a conversion of the payload using JsonUtils.

      Specified by:
      getPayloadAs in interface HasMessage
      Overrides:
      getPayloadAs in class DeserializingMessage
      Type Parameters:
      R - the expected payload type
      Returns:
      the payload converted to the given type
    • getPayload

      public <V> V getPayload()
      Description copied from interface: HasMessage
      Retrieves the message payload, deserializing if necessary, cast to the expected type.

      By default, this delegates to toMessage().getPayload().

      Specified by:
      getPayload in interface HasMessage
      Overrides:
      getPayload in class DeserializingMessage
      Type Parameters:
      V - the expected payload type
      Returns:
      the deserialized payload
    • toMessage

      public Message toMessage()
      Description copied from interface: HasMessage
      Returns the underlying Message representation of this object.
      Specified by:
      toMessage in interface HasMessage
      Overrides:
      toMessage in class DeserializingMessage
      Returns:
      the Message backing this instance
    • getMetadata

      public Metadata getMetadata()
      Description copied from interface: HasMetadata
      Returns the Metadata associated with this object.
      Specified by:
      getMetadata in interface HasMetadata
      Overrides:
      getMetadata in class DeserializingMessage
      Returns:
      metadata attached to this instance; never null
    • getPayloadClass

      public Class<?> getPayloadClass()
      Description copied from interface: HasMessage
      Returns the runtime class of the payload object, or Void.class if the payload is null.
      Specified by:
      getPayloadClass in interface HasMessage
      Overrides:
      getPayloadClass in class DeserializingMessage
      Returns:
      the payload's class
    • getSerializedObject

      public SerializedMessage getSerializedObject()
      Overrides:
      getSerializedObject in class DeserializingMessage
    • completion

      public CompletableFuture<Void> completion()
      Returns a future that completes when the final chunk has been observed.
    • getAggregatedPayloadBytes

      public byte[] getAggregatedPayloadBytes()
      Returns the fully aggregated payload bytes, waiting for the final chunk when necessary.
    • enableStreamingOnlyMode

      public void enableStreamingOnlyMode()
      Switches this message to streaming-only mode. Subsequent chunks are exposed through the input stream but are no longer retained in heap for full-payload aggregation.
    • appendChunk

      public void appendChunk(SerializedMessage chunk)
      Appends an additional chunk to this message.
    • appendObservedContinuation

      public void appendObservedContinuation(SerializedMessage message)
      Appends a continuation chunk that was already present in the current tracker batch.
    • fail

      public void fail(Throwable error)
      Fails the stream and any pending aggregated deserialization with the given error.
    • aggregatedMessage

      protected DeserializingMessage aggregatedMessage()
    • aggregatedSerializedMessage

      protected SerializedMessage aggregatedSerializedMessage()