Class InMemoryMessageStore
java.lang.Object
io.fluxzero.sdk.tracking.client.InMemoryMessageStore
- All Implemented Interfaces:
Monitored<List<SerializedMessage>>, HasMessageStore, MessageStore, AutoCloseable
- Direct Known Subclasses:
InMemoryEventStore, InMemoryScheduleStore
An in-memory implementation of the
MessageStore interface for storing SerializedMessages without
external persistence.
This store underpins both local tracking (via LocalTrackingClient) and local publishing (via in-memory
GatewayClient) in test and development environments.
Behavior
- Messages are assigned a unique, incrementing index upon append if none is present.
- Stored messages are retained in memory using a
ConcurrentSkipListMapkeyed by index. - Supports expiration via
retentionTime, with periodic purging during appends. - Supports message monitors that are notified after every append.
Thread Safety
- Append and monitor notifications are synchronized to preserve consistency across batch inserts.
- Message storage is based on concurrent data structures, safe for multi-threaded access.
- Monitors use a
CopyOnWriteArraySetfor thread-safe iteration and updates.
Use Cases
- Unit and integration tests for consumers, handlers, and gateways
- Simulating message flow in local environments without a Fluxzero backend
- Standalone tools that mock message streams
Message Expiration
- Expired messages are purged based on wall-clock time via
Fluxzero.currentTime(). - The purge logic is triggered during each call to
append(List)when a retention policy is set.
- See Also:
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionappend(List<SerializedMessage> messages) Appends a list of messages to the store.voidclose()Default no-op close method.protected Collection<SerializedMessage> filterMessages(Collection<SerializedMessage> messages) Retrieves a batch of messages starting from the givenminIndex.Retrieves a batch of messages starting from the givenminIndex, limiting both message count and serialized payload bytes.protected SerializedMessagegetMessage(long index) protected Collection<SerializedMessage> messagesFrom(Long minIndex, boolean inclusive) voidprotected voidnotifyMonitors(List<SerializedMessage> messages) protected voidpurgeExpiredMessages(Duration messageExpiration) registerMonitor(Consumer<List<SerializedMessage>> monitor) Registers a monitor that will be notified when an activity of typeToccurs.scanBatch(Long minIndex, int maxSize, boolean inclusive, long maxBytes, Predicate<? super SerializedMessage> filter) Scans messages starting from the givenminIndex, returning messages accepted byfilterand metadata about the unfiltered source scan.toString()voidtruncate()Removes all messages from this store while keeping the logical log available for future appends.Methods inherited from class Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface MessageStore
append, getBatch, getBatch, getMessageStore, setRetentionTime, unwrap
-
Constructor Details
-
InMemoryMessageStore
-
-
Method Details
-
append
Description copied from interface:MessageStoreAppends a list of messages to the store.- Specified by:
appendin interfaceMessageStore- Parameters:
messages- messages to append- Returns:
- a
CompletableFuturethat completes when the messages have been successfully appended
-
getBatch
Description copied from interface:MessageStoreRetrieves a batch of messages starting from the givenminIndex.- Specified by:
getBatchin interfaceMessageStore- Parameters:
minIndex- minimum message index to start frommaxSize- maximum number of messages to retrieveinclusive- whether to include the message atminIndex- Returns:
- a list of
SerializedMessageinstances
-
getBatch
public List<SerializedMessage> getBatch(Long minIndex, int maxSize, boolean inclusive, long maxBytes) Description copied from interface:MessageStoreRetrieves a batch of messages starting from the givenminIndex, limiting both message count and serialized payload bytes.If the first available message is larger than
maxBytes, it is still returned so consumers can make progress.- Specified by:
getBatchin interfaceMessageStore- Parameters:
minIndex- minimum message index to start frommaxSize- maximum number of messages to retrieveinclusive- whether to include the message atminIndexmaxBytes- maximum number of serialized payload bytes to retrieve, or0for no byte limit- Returns:
- a list of
SerializedMessageinstances
-
scanBatch
public MessageStoreBatch scanBatch(Long minIndex, int maxSize, boolean inclusive, long maxBytes, Predicate<? super SerializedMessage> filter) Description copied from interface:MessageStoreScans messages starting from the givenminIndex, returning messages accepted byfilterand metadata about the unfiltered source scan.maxSizelimits the number of source messages scanned.maxByteslimits the cumulative serialized payload bytes of accepted messages. If the first accepted message is larger thanmaxBytes, it is still returned so consumers can make progress.- Specified by:
scanBatchin interfaceMessageStore- Parameters:
minIndex- minimum message index to start frommaxSize- maximum number of source messages to scaninclusive- whether to include the message atminIndexmaxBytes- maximum number of accepted serialized payload bytes, or0for no byte limitfilter- predicate deciding which messages enter the returned batch- Returns:
- accepted messages and source scan metadata
-
notifyMonitors
public void notifyMonitors() -
notifyMonitors
-
purgeExpiredMessages
-
filterMessages
-
messagesFrom
-
getMessage
-
truncate
public void truncate()Description copied from interface:MessageStoreRemoves all messages from this store while keeping the logical log available for future appends.- Specified by:
truncatein interfaceMessageStore
-
registerMonitor
Description copied from interface:MonitoredRegisters a monitor that will be notified when an activity of typeToccurs.- Specified by:
registerMonitorin interfaceMonitored<List<SerializedMessage>>- Parameters:
monitor- the callback to invoke with each observed value- Returns:
- a
Registrationthat can be used to cancel the monitoring
-
close
public void close()Description copied from interface:MessageStoreDefault no-op close method. Override if resources need cleanup.- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceMessageStore
-
toString
-