Class CachingTrackingClient
- All Implemented Interfaces:
TrackingClient, AutoCloseable
TrackingClient implementation that wraps another client (typically a WebsocketTrackingClient)
and caches recent messages in memory to reduce redundant round trips to the Fluxzero Runtime.
This client is particularly useful in environments where multiple consumers or trackers are processing the same stream of messages. Rather than each tracker reading from the backend individually, a shared in-memory cache serves recent messages directly when possible.
Behavior
- Internally starts a special tracker that continuously appends new messages to a bounded in-memory cache.
- Trackers that read from this client are first served from the local cache when possible.
- Falls back to the delegate
TrackingClientfor uncached or missed messages. - Falls back to the delegate when the cache has no immediately available messages.
- Cache size is limited via
maxCacheSize; old messages are evicted in insertion order.
Use Cases
- Optimizing performance when many trackers are polling the same stream concurrently
- Reducing network latency and load on the Fluxzero Runtime for high-volume message types
- Minimizing end-to-end processing delay in horizontally scaled applications
Tracking Mechanics
- Uses a background tracker configured with
ignoreSegment = trueandclientControlledIndex = trueto stream all new messages into the cache. - Trackers calling
read(String, Long, ConsumerConfiguration)are served from the cache if theirlastIndexis already present and the next batch is available in memory. - If not, the delegate is queried directly to maintain completeness and runtime-side lifecycle behavior.
Thread Safety
- The cache is backed by a
ConcurrentSkipListMapfor safe concurrent access. - Eviction is synchronized to prevent race conditions.
- See Also:
-
Constructor Summary
ConstructorsConstructorDescriptionCachingTrackingClient(TrackingClient delegate, int maxCacheSize) CachingTrackingClient(WebsocketTrackingClient delegate) -
Method Summary
Modifier and TypeMethodDescriptionprotected voidcacheNewMessages(List<SerializedMessage> messages) claimSegment(String trackerId, Long lastIndex, ConsumerConfiguration config) Claims a processing segment for the given tracker.voidclose()Closes any open resources associated with this client.disconnectTracker(String consumer, String trackerId, boolean sendFinalEmptyBatch, Guarantee guarantee) Disconnects the specified tracker from its segment with the specified delivery guarantee.protected MessageBatchdoWaitForCachedBatch(ConsumerConfiguration config, long minIndex, ClaimSegmentResult claim, Instant deadline) protected List<SerializedMessage> filterMessages(List<SerializedMessage> messages, int[] segmentRange, Position position, ConsumerConfiguration config) protected Predicate<SerializedMessage> filterPredicate(int[] segmentRange, Position position, ConsumerConfiguration config) protected MessageBatchgetMessageBatch(ConsumerConfiguration config, long minIndex, ClaimSegmentResult claim) Returns theMessageType(e.g., COMMAND, EVENT, QUERY) associated with this tracking client.getPosition(String consumer) Returns the current committed tracking position for the given consumer.getTopic()Returns the topic associated with this tracking client.read(String trackerId, Long lastIndex, ConsumerConfiguration config) Asynchronously reads the next availableMessageBatchfor a given tracker.readFromIndex(long minIndex, int maxSize) Fetches messages starting from the given index up to the provided max size.readFromIndex(long minIndex, int maxSize, long maxBytes) Fetches messages directly from the message log while limiting both message count and serialized payload bytes.readRange(long minIndexInclusive, long maxIndexExclusive, int maxSize) Fetches messages in the range[minIndexInclusive, maxIndexExclusive)up to the provided max size.readRange(long minIndexInclusive, long maxIndexExclusive, int maxSize, long maxBytes) Fetches messages in the range[minIndexInclusive, maxIndexExclusive)while limiting both message count and serialized payload bytes.protected voidresetPosition(String consumer, long lastIndex, Guarantee guarantee) Resets the consumer's tracking position to a given index with a specific delivery guarantee.storePosition(String consumer, int[] segment, long lastIndex, Guarantee guarantee) Stores the last successfully processed position for a consumer with a specific delivery guarantee.protected CompletableFuture<MessageBatch> waitForCachedBatch(ConsumerConfiguration config, long minIndex, ClaimSegmentResult claim, Instant deadline) Methods inherited from class Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface TrackingClient
disconnectTracker, readAndWait, resetPosition, storePosition
-
Constructor Details
-
CachingTrackingClient
-
CachingTrackingClient
-
-
Method Details
-
read
public CompletableFuture<MessageBatch> read(String trackerId, Long lastIndex, ConsumerConfiguration config) Description copied from interface:TrackingClientAsynchronously reads the next availableMessageBatchfor a given tracker.- Specified by:
readin interfaceTrackingClient- Parameters:
trackerId- the unique ID for the tracker thread requesting messageslastIndex- the last index successfully handled by this trackerconfig- the full configuration for the consumer- Returns:
- a
CompletableFuturethat completes with the next batch of messages
-
waitForCachedBatch
protected CompletableFuture<MessageBatch> waitForCachedBatch(ConsumerConfiguration config, long minIndex, ClaimSegmentResult claim, Instant deadline) -
doWaitForCachedBatch
protected MessageBatch doWaitForCachedBatch(ConsumerConfiguration config, long minIndex, ClaimSegmentResult claim, Instant deadline) throws InterruptedException - Throws:
InterruptedException
-
getMessageBatch
protected MessageBatch getMessageBatch(ConsumerConfiguration config, long minIndex, ClaimSegmentResult claim) -
filterMessages
protected List<SerializedMessage> filterMessages(List<SerializedMessage> messages, int[] segmentRange, Position position, ConsumerConfiguration config) -
filterPredicate
protected Predicate<SerializedMessage> filterPredicate(int[] segmentRange, Position position, ConsumerConfiguration config) -
cacheNewMessages
-
removeOldMessages
protected void removeOldMessages() -
readFromIndex
Description copied from interface:TrackingClientFetches messages starting from the given index up to the provided max size.This method bypasses consumer configurations and is primarily used for diagnostics or reprocessing.
- Specified by:
readFromIndexin interfaceTrackingClient- Parameters:
minIndex- the starting index (inclusive)maxSize- the maximum number of messages to retrieve- Returns:
- a list of serialized messages starting at the given index
-
readFromIndex
Description copied from interface:TrackingClientFetches messages directly from the message log while limiting both message count and serialized payload bytes.If the first available message is larger than
maxBytes, it is still returned so consumers can make progress. AmaxBytesvalue of0disables the byte limit.- Specified by:
readFromIndexin interfaceTrackingClient- Parameters:
minIndex- the starting index (inclusive)maxSize- the maximum number of messages to retrievemaxBytes- the maximum number of serialized payload bytes to retrieve- Returns:
- a list of serialized messages starting at the given index
-
readRange
public List<SerializedMessage> readRange(long minIndexInclusive, long maxIndexExclusive, int maxSize) Description copied from interface:TrackingClientFetches messages in the range[minIndexInclusive, maxIndexExclusive)up to the provided max size.Implementations may override this to perform a bounded server-side range read. The default implementation keeps the wire protocol backward compatible by using
TrackingClient.readFromIndex(long, int)and filtering locally.- Specified by:
readRangein interfaceTrackingClient- Parameters:
minIndexInclusive- the starting index (inclusive)maxIndexExclusive- the maximum index (exclusive)maxSize- the maximum number of messages to retrieve- Returns:
- a list of serialized messages in the requested range
-
readRange
public List<SerializedMessage> readRange(long minIndexInclusive, long maxIndexExclusive, int maxSize, long maxBytes) Description copied from interface:TrackingClientFetches messages in the range[minIndexInclusive, maxIndexExclusive)while limiting both message count and serialized payload bytes.- Specified by:
readRangein interfaceTrackingClient- Parameters:
minIndexInclusive- the starting index (inclusive)maxIndexExclusive- the maximum index (exclusive)maxSize- the maximum number of messages to retrievemaxBytes- the maximum number of serialized payload bytes to retrieve- Returns:
- a list of serialized messages in the requested range
-
claimSegment
public CompletableFuture<ClaimSegmentResult> claimSegment(String trackerId, Long lastIndex, ConsumerConfiguration config) Description copied from interface:TrackingClientClaims a processing segment for the given tracker.Segments are used to partition the message log among multiple tracker threads for parallel processing.
- Specified by:
claimSegmentin interfaceTrackingClient- Parameters:
trackerId- the unique identifier of the tracker attempting to claim a segmentlastIndex- the tracker's last successfully processed indexconfig- the full consumer configuration- Returns:
- a
CompletableFutureresolving to the result of the claim
-
storePosition
public CompletableFuture<Void> storePosition(String consumer, int[] segment, long lastIndex, Guarantee guarantee) Description copied from interface:TrackingClientStores the last successfully processed position for a consumer with a specific delivery guarantee.- Specified by:
storePositionin interfaceTrackingClient- Parameters:
consumer- the name of the consumersegment- the segment the tracker is processinglastIndex- the last message index processedguarantee- delivery guarantee (e.g., STORED, SENT)- Returns:
- a future indicating completion
-
resetPosition
Description copied from interface:TrackingClientResets the consumer's tracking position to a given index with a specific delivery guarantee.- Specified by:
resetPositionin interfaceTrackingClient- Parameters:
consumer- the name of the consumerlastIndex- the new index to start fromguarantee- the delivery guarantee- Returns:
- a future indicating completion
-
getPosition
Description copied from interface:TrackingClientReturns the current committed tracking position for the given consumer.- Specified by:
getPositionin interfaceTrackingClient- Parameters:
consumer- the name of the consumer- Returns:
- the last known committed position
-
disconnectTracker
public CompletableFuture<Void> disconnectTracker(String consumer, String trackerId, boolean sendFinalEmptyBatch, Guarantee guarantee) Description copied from interface:TrackingClientDisconnects the specified tracker from its segment with the specified delivery guarantee.- Specified by:
disconnectTrackerin interfaceTrackingClient- Parameters:
consumer- the name of the consumer grouptrackerId- the ID of the tracker thread being disconnectedsendFinalEmptyBatch- whether to send a final empty batch to commit stateguarantee- the delivery guarantee to use- Returns:
- a future indicating disconnection
-
getMessageType
Description copied from interface:TrackingClientReturns theMessageType(e.g., COMMAND, EVENT, QUERY) associated with this tracking client.- Specified by:
getMessageTypein interfaceTrackingClient- Returns:
- the message type
-
getTopic
Description copied from interface:TrackingClientReturns the topic associated with this tracking client.This is applicable only when
TrackingClient.getMessageType()isMessageType.DOCUMENTorMessageType.CUSTOM, where messages are organized into named topics beyond the standard type-based categorization.For other
MessageTypes (e.g.,COMMAND,EVENT), the concept of a topic is implicit and not required for tracking.- Specified by:
getTopicin interfaceTrackingClient- Returns:
- the topic name, or
nullif not applicable for the message type
-
close
public void close()Description copied from interface:TrackingClientCloses any open resources associated with this client.Once closed, the client should no longer be used to fetch or commit tracking state.
- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceTrackingClient
-