Class InMemoryMessageStore

java.lang.Object
io.fluxzero.sdk.tracking.client.InMemoryMessageStore
All Implemented Interfaces:
Monitored<List<SerializedMessage>>, HasMessageStore, MessageStore, AutoCloseable
Direct Known Subclasses:
InMemoryEventStore, InMemoryScheduleStore

public class InMemoryMessageStore extends Object implements MessageStore
An in-memory implementation of the MessageStore interface for storing SerializedMessages without external persistence.

This store underpins both local tracking (via LocalTrackingClient) and local publishing (via in-memory GatewayClient) in test and development environments.

Behavior

  • Messages are assigned a unique, incrementing index upon append if none is present.
  • Stored messages are retained in memory using a ConcurrentSkipListMap keyed by index.
  • Supports expiration via retentionTime, with periodic purging during appends.
  • Supports message monitors that are notified after every append.

Thread Safety

  • Append and monitor notifications are synchronized to preserve consistency across batch inserts.
  • Message storage is based on concurrent data structures, safe for multi-threaded access.
  • Monitors use a CopyOnWriteArraySet for thread-safe iteration and updates.

Use Cases

  • Unit and integration tests for consumers, handlers, and gateways
  • Simulating message flow in local environments without a Fluxzero backend
  • Standalone tools that mock message streams

Message Expiration

  • Expired messages are purged based on wall-clock time via Fluxzero.currentTime().
  • The purge logic is triggered during each call to append(List) when a retention policy is set.
See Also:
  • Constructor Details

    • InMemoryMessageStore

      public InMemoryMessageStore(MessageType messageType)
  • Method Details

    • append

      public CompletableFuture<Void> append(List<SerializedMessage> messages)
      Description copied from interface: MessageStore
      Appends a list of messages to the store.
      Specified by:
      append in interface MessageStore
      Parameters:
      messages - messages to append
      Returns:
      a CompletableFuture that completes when the messages have been successfully appended
    • getBatch

      public List<SerializedMessage> getBatch(Long minIndex, int maxSize, boolean inclusive)
      Description copied from interface: MessageStore
      Retrieves a batch of messages starting from the given minIndex.
      Specified by:
      getBatch in interface MessageStore
      Parameters:
      minIndex - minimum message index to start from
      maxSize - maximum number of messages to retrieve
      inclusive - whether to include the message at minIndex
      Returns:
      a list of SerializedMessage instances
    • getBatch

      public List<SerializedMessage> getBatch(Long minIndex, int maxSize, boolean inclusive, long maxBytes)
      Description copied from interface: MessageStore
      Retrieves a batch of messages starting from the given minIndex, limiting both message count and serialized payload bytes.

      If the first available message is larger than maxBytes, it is still returned so consumers can make progress.

      Specified by:
      getBatch in interface MessageStore
      Parameters:
      minIndex - minimum message index to start from
      maxSize - maximum number of messages to retrieve
      inclusive - whether to include the message at minIndex
      maxBytes - maximum number of serialized payload bytes to retrieve, or 0 for no byte limit
      Returns:
      a list of SerializedMessage instances
    • scanBatch

      public MessageStoreBatch scanBatch(Long minIndex, int maxSize, boolean inclusive, long maxBytes, Predicate<? super SerializedMessage> filter)
      Description copied from interface: MessageStore
      Scans messages starting from the given minIndex, returning messages accepted by filter and metadata about the unfiltered source scan.

      maxSize limits the number of source messages scanned. maxBytes limits the cumulative serialized payload bytes of accepted messages. If the first accepted message is larger than maxBytes, it is still returned so consumers can make progress.

      Specified by:
      scanBatch in interface MessageStore
      Parameters:
      minIndex - minimum message index to start from
      maxSize - maximum number of source messages to scan
      inclusive - whether to include the message at minIndex
      maxBytes - maximum number of accepted serialized payload bytes, or 0 for no byte limit
      filter - predicate deciding which messages enter the returned batch
      Returns:
      accepted messages and source scan metadata
    • notifyMonitors

      public void notifyMonitors()
    • notifyMonitors

      protected void notifyMonitors(List<SerializedMessage> messages)
    • purgeExpiredMessages

      protected void purgeExpiredMessages(Duration messageExpiration)
    • filterMessages

      protected Collection<SerializedMessage> filterMessages(Collection<SerializedMessage> messages)
    • messagesFrom

      protected Collection<SerializedMessage> messagesFrom(Long minIndex, boolean inclusive)
    • getMessage

      protected SerializedMessage getMessage(long index)
    • truncate

      public void truncate()
      Description copied from interface: MessageStore
      Removes all messages from this store while keeping the logical log available for future appends.
      Specified by:
      truncate in interface MessageStore
    • registerMonitor

      public Registration registerMonitor(Consumer<List<SerializedMessage>> monitor)
      Description copied from interface: Monitored
      Registers a monitor that will be notified when an activity of type T occurs.
      Specified by:
      registerMonitor in interface Monitored<List<SerializedMessage>>
      Parameters:
      monitor - the callback to invoke with each observed value
      Returns:
      a Registration that can be used to cancel the monitoring
    • close

      public void close()
      Description copied from interface: MessageStore
      Default no-op close method. Override if resources need cleanup.
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface MessageStore
    • toString

      public String toString()
      Overrides:
      toString in class Object