Class DefaultTrackingStrategy
- All Implemented Interfaces:
TrackingStrategy, Closeable, AutoCloseable
Message segments are determined by the clients that publish the messages (usually based on the consistent hash of some routing key, like the value of a user id).
If a client joins or leaves the cluster the segment range mapped to each client is recalculated so messages may get routed differently than before.
Clients can safely join or leave the cluster at any time. The strategy guarantees that a message is not consumed by more than one client.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected static classprotected class -
Constructor Summary
ConstructorsModifierConstructorDescriptionDefaultTrackingStrategy(MessageStore source, PositionStore positionStore) DefaultTrackingStrategy(MessageStore source, PositionStore positionStore, TaskScheduler scheduler) protectedDefaultTrackingStrategy(MessageStore source, PositionStore positionStore, TaskScheduler scheduler, int segments) -
Method Summary
Modifier and TypeMethodDescriptionprotected intadjustMaxSize(Tracker tracker, int maxSize) claimSegment(Tracker tracker) Claims one or more message segments for the given tracker.protected voidclaimSegment(Tracker tracker, DefaultTrackingStrategy.TrackerRequest<ClaimResult> request) protected int[]claimSegmentRange(Tracker tracker) voidclose()Closes the tracking strategy and releases any underlying resources.disconnectTrackers(Predicate<Tracker> predicate, boolean sendFinalEmptyBatch) Disconnects trackers that match the provided filter.protected SerializedMessageensureMessageSegment(SerializedMessage message) protected List<SerializedMessage> filter(List<SerializedMessage> messages, int[] segmentRange, Position position, Tracker tracker) protected Predicate<SerializedMessage> filterPredicate(int[] segmentRange, Position position, Tracker tracker) protected List<SerializedMessage> Requests a new batch of messages for the given tracker.protected voidgetBatch(Tracker tracker, DefaultTrackingStrategy.TrackerRequest<MessageBatch> request) protected voidonClusterUpdate(TrackerCluster cluster) protected voidonUpdate(List<SerializedMessage> messages) protected Positionprotected voidpurgeCeasedTrackers(Duration delay) protected MessageStoreBatchscanBatch(int[] segment, Position position, int batchSize, long maxBytes, Predicate<? super SerializedMessage> filter) protected voidwaitForMessages(Tracker tracker, MessageBatch emptyBatch) protected voidwaitForMessages(Tracker tracker, MessageBatch emptyBatch, DefaultTrackingStrategy.TrackerRequest<MessageBatch> request) protected voidwaitForUpdate(Tracker tracker, MessageBatch emptyBatch, Runnable followUp) protected voidwaitForUpdate(Tracker tracker, MessageBatch emptyBatch, Runnable followUp, DefaultTrackingStrategy.TrackerRequest<?> request)
-
Constructor Details
-
DefaultTrackingStrategy
-
DefaultTrackingStrategy
public DefaultTrackingStrategy(MessageStore source, PositionStore positionStore, TaskScheduler scheduler) -
DefaultTrackingStrategy
protected DefaultTrackingStrategy(MessageStore source, PositionStore positionStore, TaskScheduler scheduler, int segments)
-
-
Method Details
-
getBatch
Description copied from interface:TrackingStrategyRequests a new batch of messages for the given tracker.This method is typically invoked by the
Trackerwhen it is ready to handle more messages. Depending on the strategy, this method may:- Fetch messages directly from a
MessageStoreand complete the result (e.g. for log tailing), or - Suspend the result until messages become available
- Specified by:
getBatchin interfaceTrackingStrategy- Parameters:
tracker- the tracker requesting a batch- Returns:
- future completed with the requested batch
- Fetch messages directly from a
-
getBatch
protected void getBatch(Tracker tracker, DefaultTrackingStrategy.TrackerRequest<MessageBatch> request) -
claimSegment
Description copied from interface:TrackingStrategyClaims one or more message segments for the given tracker.This method is invoked when segment-based partitioning is enabled. It ensures that each segment is only claimed by a single tracker at a time and may release conflicting claims if necessary.
- Specified by:
claimSegmentin interfaceTrackingStrategy- Parameters:
tracker- the tracker attempting to claim a segment- Returns:
- future completed with the claimed segment and current position
-
claimSegment
protected void claimSegment(Tracker tracker, DefaultTrackingStrategy.TrackerRequest<ClaimResult> request) -
getBatch
-
scanBatch
protected MessageStoreBatch scanBatch(int[] segment, Position position, int batchSize, long maxBytes, Predicate<? super SerializedMessage> filter) -
waitForMessages
-
waitForMessages
protected void waitForMessages(Tracker tracker, MessageBatch emptyBatch, DefaultTrackingStrategy.TrackerRequest<MessageBatch> request) -
waitForUpdate
-
waitForUpdate
protected void waitForUpdate(Tracker tracker, MessageBatch emptyBatch, Runnable followUp, DefaultTrackingStrategy.TrackerRequest<?> request) -
position
-
filter
protected List<SerializedMessage> filter(List<SerializedMessage> messages, int[] segmentRange, Position position, Tracker tracker) -
filterPredicate
protected Predicate<SerializedMessage> filterPredicate(int[] segmentRange, Position position, Tracker tracker) -
ensureMessageSegment
-
adjustMaxSize
-
claimSegmentRange
-
onUpdate
-
onClusterUpdate
-
disconnectTrackers
Description copied from interface:TrackingStrategyDisconnects trackers that match the provided filter.This is typically used during client shutdown, reconfiguration, or error handling to forcibly remove trackers from the strategy's internal registry.
- Specified by:
disconnectTrackersin interfaceTrackingStrategy- Parameters:
predicate- filter for matching trackers to disconnectsendFinalEmptyBatch- iftrue, a final empty batch should be sent to each disconnected tracker to allow graceful termination- Returns:
- the trackers that were disconnected (empty if no trackers matched the filter)
-
purgeCeasedTrackers
-
close
public void close()Description copied from interface:TrackingStrategyCloses the tracking strategy and releases any underlying resources.- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceCloseable- Specified by:
closein interfaceTrackingStrategy
-