Class DefaultTracking
java.lang.Object
io.fluxzero.sdk.tracking.DefaultTracking
- All Implemented Interfaces:
Tracking, AutoCloseable
Default implementation of the
Tracking interface that coordinates message tracking for a specific
MessageType.
This class is responsible for:
- Assigning handler objects to appropriate
ConsumerConfigurations based on declared filters - Creating and managing
Trackerinstances for those consumers and their associated topics - Ensuring messages are deserialized, dispatched, and (if applicable) responded to with proper error handling
- Invoking handlers using the provided
HandlerFactoryandHandlerInvoker - Integrating with
ResultGatewayto send back command/query/web responses when needed
Supports per-consumer batch interceptors and general batch processing logic, including:
- Functional and technical exception management with retry hooks
- Tracking exclusivity to prevent handlers from being assigned to multiple consumers simultaneously
- Internal shutdown coordination and pending message flushes via
close()
Typical Usage
This class is used internally when starting aFluxzero.registerHandlers(List) invocation
for a given MessageType, and typically shouldn't be used directly by application developers.- See Also:
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoidclose()Shuts down all started trackers and waits briefly for asynchronous results (e.g. command responses) to complete.protected Consumer<List<SerializedMessage>> createConsumer(ConsumerConfiguration config, List<Handler<DeserializingMessage>> handlers) protected List<DeserializingMessage> deserializeMessageList(List<SerializedMessage> serializedMessages, String topic, TrackingClient trackingClient, Map<String, ChunkedDeserializingMessage> activeChunkedMessages, int recoveryMaxFetchSize) protected ObjectdoHandle(DeserializingMessage message, HandlerInvoker h, Handler<DeserializingMessage> handler, ConsumerConfiguration config) protected ObjectdoHandle(DeserializingMessage message, HandlerMethod<? super DeserializingMessage> h, Handler<DeserializingMessage> handler, ConsumerConfiguration config) protected HandlerMethod<? super DeserializingMessage> getHandlerMethodOrNull(DeserializingMessage message, Handler<DeserializingMessage> handler, ConsumerConfiguration config) protected Optional<HandlerInvoker> getInvoker(DeserializingMessage message, Handler<DeserializingMessage> handler, ConsumerConfiguration config) protected Objecthandle(DeserializingMessage message, HandlerInvoker h, Handler<DeserializingMessage> handler, ConsumerConfiguration config) protected Objecthandle(DeserializingMessage message, HandlerMethod<? super DeserializingMessage> h, Handler<DeserializingMessage> handler, ConsumerConfiguration config) protected ObjectprocessError(Throwable e, DeserializingMessage message, HandlerDescriptor h, Handler<DeserializingMessage> handler, ConsumerConfiguration config, Callable<Object> retry) protected ObjectprocessError(Throwable e, DeserializingMessage message, HandlerInvoker h, Handler<DeserializingMessage> handler, ConsumerConfiguration config) protected CompletionStage<Void> reportResult(Object result, HandlerDescriptor h, DeserializingMessage message, ConsumerConfiguration config) retainTrackingTopics(ConsumerConfiguration configuration, List<Handler<DeserializingMessage>> activeHandlers, List<Handler<DeserializingMessage>> addedHandlers, Fluxzero fluxzero) protected <T> voidsetThreadLocal(ThreadLocal<T> threadLocal, T value) protected booleanshouldSendResponse(HandlerDescriptor invoker, DeserializingMessage request, Object result, ConsumerConfiguration config) Starts tracking by assigning the given handlers to configured consumers and creating topic-specific or shared trackers.protected RegistrationstartTracking(ConsumerConfiguration configuration, List<Handler<DeserializingMessage>> handlers, String topic, Fluxzero fluxzero) protected voidstopTracker(DeserializingMessage message, Handler<DeserializingMessage> handler, Throwable e) trackingTopics(List<Handler<DeserializingMessage>> handlers) protected CompletionStage<Void> tryHandle(DeserializingMessage message, Handler<DeserializingMessage> handler, ConsumerConfiguration config, boolean reportResult) protected <T> TwithHandlerContext(DeserializingMessage message, Fluxzero fluxzero, Tracker tracker, User user, Supplier<T> task)
-
Constructor Details
-
DefaultTracking
public DefaultTracking()
-
-
Method Details
-
start
Starts tracking by assigning the given handlers to configured consumers and creating topic-specific or shared trackers.Throws a
TrackingExceptionif handlers can't be matched to consumers.- Specified by:
startin interfaceTracking- Parameters:
fluxzero- the owningFluxzeroinstancehandlers- the handler instances to assign and activate- Returns:
- a
Registrationthat can be used to stop all created trackers - Throws:
TrackingException- if no consumer is found for a handler
-
retainTrackingTopics
protected List<String> retainTrackingTopics(ConsumerConfiguration configuration, List<Handler<DeserializingMessage>> activeHandlers, List<Handler<DeserializingMessage>> addedHandlers, Fluxzero fluxzero) -
trackingTopics
-
startTracking
protected Registration startTracking(ConsumerConfiguration configuration, List<Handler<DeserializingMessage>> handlers, String topic, Fluxzero fluxzero) -
createConsumer
protected Consumer<List<SerializedMessage>> createConsumer(ConsumerConfiguration config, List<Handler<DeserializingMessage>> handlers) -
deserializeMessageList
protected List<DeserializingMessage> deserializeMessageList(List<SerializedMessage> serializedMessages, String topic, TrackingClient trackingClient, Map<String, ChunkedDeserializingMessage> activeChunkedMessages, int recoveryMaxFetchSize) -
tryHandle
protected CompletionStage<Void> tryHandle(DeserializingMessage message, Handler<DeserializingMessage> handler, ConsumerConfiguration config, boolean reportResult) -
getHandlerMethodOrNull
protected HandlerMethod<? super DeserializingMessage> getHandlerMethodOrNull(DeserializingMessage message, Handler<DeserializingMessage> handler, ConsumerConfiguration config) -
getInvoker
protected Optional<HandlerInvoker> getInvoker(DeserializingMessage message, Handler<DeserializingMessage> handler, ConsumerConfiguration config) -
handle
protected Object handle(DeserializingMessage message, HandlerInvoker h, Handler<DeserializingMessage> handler, ConsumerConfiguration config) -
handle
protected Object handle(DeserializingMessage message, HandlerMethod<? super DeserializingMessage> h, Handler<DeserializingMessage> handler, ConsumerConfiguration config) -
doHandle
protected Object doHandle(DeserializingMessage message, HandlerInvoker h, Handler<DeserializingMessage> handler, ConsumerConfiguration config) -
doHandle
protected Object doHandle(DeserializingMessage message, HandlerMethod<? super DeserializingMessage> h, Handler<DeserializingMessage> handler, ConsumerConfiguration config) -
withHandlerContext
protected <T> T withHandlerContext(DeserializingMessage message, Fluxzero fluxzero, Tracker tracker, User user, Supplier<T> task) -
setThreadLocal
-
processError
protected Object processError(Throwable e, DeserializingMessage message, HandlerInvoker h, Handler<DeserializingMessage> handler, ConsumerConfiguration config) -
processError
protected Object processError(Throwable e, DeserializingMessage message, HandlerDescriptor h, Handler<DeserializingMessage> handler, ConsumerConfiguration config, Callable<Object> retry) -
reportResult
protected CompletionStage<Void> reportResult(Object result, HandlerDescriptor h, DeserializingMessage message, ConsumerConfiguration config) -
shouldSendResponse
protected boolean shouldSendResponse(HandlerDescriptor invoker, DeserializingMessage request, Object result, ConsumerConfiguration config) -
stopTracker
protected void stopTracker(DeserializingMessage message, Handler<DeserializingMessage> handler, Throwable e) -
close
public void close()Shuts down all started trackers and waits briefly for asynchronous results (e.g. command responses) to complete.- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceTracking
-