Class InMemoryMessageStore
java.lang.Object
io.fluxzero.sdk.tracking.client.InMemoryMessageStore
- All Implemented Interfaces:
Monitored<List<SerializedMessage>>, HasMessageStore, MessageStore, AutoCloseable
- Direct Known Subclasses:
InMemoryEventStore, InMemoryScheduleStore
An in-memory implementation of the
MessageStore interface for storing SerializedMessages without
external persistence.
This store underpins both local tracking (via LocalTrackingClient) and local publishing (via in-memory
GatewayClient) in test and development environments.
Behavior
- Messages are assigned a unique, incrementing index upon append if none is present.
- Stored messages are retained in memory using a
ConcurrentSkipListMapkeyed by index. - Supports expiration via
retentionTime, with periodic purging during appends. - Supports message monitors that are notified after every append.
Thread Safety
- Append and monitor notifications are synchronized to preserve consistency across batch inserts.
- Message storage is based on concurrent data structures, safe for multi-threaded access.
- Monitors use a
CopyOnWriteArraySetfor thread-safe iteration and updates.
Use Cases
- Unit and integration tests for consumers, handlers, and gateways
- Simulating message flow in local environments without a Fluxzero backend
- Standalone tools that mock message streams
Message Expiration
- Expired messages are purged based on wall-clock time via
Fluxzero.currentTime(). - The purge logic is triggered during each call to
append(List)when a retention policy is set.
- See Also:
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionappend(List<SerializedMessage> messages) Appends a list of messages to the store.voidclose()Default no-op close method.protected Collection<SerializedMessage> filterMessages(Collection<SerializedMessage> messages) Retrieves a batch of messages starting from the givenminIndex.protected SerializedMessagegetMessage(long index) voidprotected voidnotifyMonitors(List<SerializedMessage> messages) protected voidpurgeExpiredMessages(Duration messageExpiration) registerMonitor(Consumer<List<SerializedMessage>> monitor) Registers a monitor that will be notified when an activity of typeToccurs.toString()Methods inherited from class Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface MessageStore
append, getBatch, getMessageStore, setRetentionTime, unwrap
-
Constructor Details
-
InMemoryMessageStore
-
-
Method Details
-
append
Description copied from interface:MessageStoreAppends a list of messages to the store.- Specified by:
appendin interfaceMessageStore- Parameters:
messages- messages to append- Returns:
- a
CompletableFuturethat completes when the messages have been successfully appended
-
getBatch
Description copied from interface:MessageStoreRetrieves a batch of messages starting from the givenminIndex.- Specified by:
getBatchin interfaceMessageStore- Parameters:
minIndex- minimum message index to start frommaxSize- maximum number of messages to retrieveinclusive- whether to include the message atminIndex- Returns:
- a list of
SerializedMessageinstances
-
notifyMonitors
public void notifyMonitors() -
notifyMonitors
-
purgeExpiredMessages
-
filterMessages
-
getMessage
-
registerMonitor
Description copied from interface:MonitoredRegisters a monitor that will be notified when an activity of typeToccurs.- Specified by:
registerMonitorin interfaceMonitored<List<SerializedMessage>>- Parameters:
monitor- the callback to invoke with each observed value- Returns:
- a
Registrationthat can be used to cancel the monitoring
-
close
public void close()Description copied from interface:MessageStoreDefault no-op close method. Override if resources need cleanup.- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceMessageStore
-
toString
-