Annotation Interface Consumer
Consumer within a Fluxzero application.
A consumer represents an isolated group of handlers that independently track and process messages from one or more
message logs. It can be applied at the class or package level to group handlers together. Handlers that do not
explicitly declare a Consumer are assigned according to the application's configuration, as defined via
FluxzeroBuilder.addConsumerConfiguration(ConsumerConfiguration, MessageType...) )}. If no specific
configuration is provided, the handler will be assigned to the application's default consumer.
A consumer consists of one or more trackers—individual threads or processes that fetch and process message
segments. Each tracker is responsible for a disjoint segment of the message log, allowing for parallel consumption.
By default, messages are sharded into 128 segments; a consumer with threads = 2 will assign 64 segments to
each tracker.
This annotation offers fine-grained control over message processing characteristics including concurrency, batching, backpressure, result publication, and handler exclusivity.
Terminology
- Consumer: Named group of handlers with isolated message tracking state.
- Tracker: A processing thread assigned to a specific segment of the message log.
- Handler: Method annotated with
@HandleEvent,@HandleCommand, etc., which processes messages.
Example:
@Consumer(name = "audit", threads = 3, passive = true)
class AuditHandler {
@HandleCommand
void on(AuthenticateUser command) {
// log for auditing; result will not be published due to passive = true
}
}
-
Required Element Summary
Required Elements -
Optional Element Summary
Optional ElementsModifier and TypeOptional ElementDescriptionClass<? extends BatchInterceptor>[]Interceptors applied at the batch level across all messages in a poll cycle.booleanIftrue, the consumer will not rely on Fluxzero's internal tracking index.Class<? extends DispatchInterceptor>[]Dispatch interceptors that become active while this consumer is processing a batch or handler.Unit formaxWaitDuration().Class<? extends ErrorHandler> Error handler invoked when a message processing error occurs.booleanDetermines whether handlers assigned to this consumer are excluded from other consumers.booleanWhether this consumer should remain exclusive for shared handlers frommaxIndexExclusive()onward.booleanWhether this consumer should remain exclusive for shared handlers beforeminIndex()is reached.booleanIftrue, only messages explicitly targeted at this application instance will be processed.Class<? extends FlowRegulator> Regulates message flow and backpressure behavior.Class<? extends HandlerInterceptor>[]Interceptors applied to individual handler method invocations.booleanIftrue, this consumer will bypass the default segment-based sharding applied by the Fluxzero Runtime and attempt to process all message segments.intMaximum number of messages to fetch in a batch.longOptional exclusive upper bound for message processing.longMaximum time to wait before fetching a new batch, when none are available.longOptional minimum message index from which this consumer should begin processing.Specifies the namespace under which the consumer tracks messages.booleanIndicates that this consumer should process messages without publishing result messages.booleanIftrue, designates a single tracker within this consumer as the "main" tracker, responsible for processing all messages across all segments.booleanWhether this consumer is taking manual control over storing its position in the log.intThe number of tracker threads to allocate for this consumer.Optional regular expression used to filter message payload types on the Fluxzero Runtime.
-
Element Details
-
name
String nameThe unique name of the consumer. Required. This isolates its tracking tokens from other consumers. -
threads
int threadsThe number of tracker threads to allocate for this consumer. Each thread processes a unique segment of the message log. Default is1.- Default:
1
-
maxFetchSize
int maxFetchSizeMaximum number of messages to fetch in a batch. Default is1024.- Default:
1024
-
maxWaitDuration
long maxWaitDurationMaximum time to wait before fetching a new batch, when none are available. SeedurationUnit()for the time unit. Default is60(seconds).- Default:
60L
-
durationUnit
-
handlerInterceptors
Class<? extends HandlerInterceptor>[] handlerInterceptorsInterceptors applied to individual handler method invocations.These interceptors are ordered within the consumer using
@Order. Global handler interceptors configured viaFluxzeroBuilderkeep their existing position in the chain; consumer interceptors are only sorted relative to other interceptors declared on the same consumer.- Default:
{}
-
batchInterceptors
Class<? extends BatchInterceptor>[] batchInterceptorsInterceptors applied at the batch level across all messages in a poll cycle.These interceptors are ordered within the consumer using
@Order. Their relative position with respect to globally registered batch interceptors is preserved: consumer-specific negative orders run before global built-ins for that consumer, while zero, positive, or missing values run after them.- Default:
{}
-
dispatchInterceptors
Class<? extends DispatchInterceptor>[] dispatchInterceptorsDispatch interceptors that become active while this consumer is processing a batch or handler.These interceptors are ordered within the consumer using
@Orderand are applied throughAdhocDispatchInterceptor. They therefore affect dispatches performed by this consumer without changing the global dispatch chain. If ad hoc dispatch interceptors are disabled globally, these consumer-scoped dispatch interceptors are inactive. Globally service-loaded dispatch interceptors remain part of the normal global dispatch chain.- Default:
{}
-
errorHandler
Class<? extends ErrorHandler> errorHandlerError handler invoked when a message processing error occurs. Default isLoggingErrorHandlerwhich logs errors and allows message tracking and processing to continue.- Default:
io.fluxzero.sdk.tracking.LoggingErrorHandler.class
-
flowRegulator
Class<? extends FlowRegulator> flowRegulatorRegulates message flow and backpressure behavior. Default isNoOpFlowRegulator.- Default:
io.fluxzero.sdk.tracking.NoOpFlowRegulator.class
-
filterMessageTarget
boolean filterMessageTargetIftrue, only messages explicitly targeted at this application instance will be processed. Typically used for tracking ofResultorWebResponsemessages. Iftrue, this consumer will only receive results targeted for this application instance.- Default:
false
-
ignoreSegment
boolean ignoreSegmentIftrue, this consumer will bypass the default segment-based sharding applied by the Fluxzero Runtime and attempt to process all message segments.By default, Fluxzero shards messages across consumers using a routing key present in the message payload, or the message ID if no routing key is specified. However, some handlers may require a custom sharding strategy— for instance, sharding based on a different property in the payload.
Setting
ignoreSegment = trueallows such handlers to override Fluxzero's internal routing and apply their own logic. A common pattern is to use the@RoutingKeyannotation on a handler method to specify a custom property:@HandleEvent @RoutingKey("organisationId") void handle(CreateUser event) { // process based on organisationId instead of the default routing key }- Default:
false
-
singleTracker
boolean singleTrackerIftrue, designates a single tracker within this consumer as the "main" tracker, responsible for processing all messages across all segments.Although multiple tracker threads may be configured (via
threads()), only one tracker will be assigned all segments. Other trackers will remain idle and receive no segment assignments.This setting is useful when:
- Messages must be processed strictly in global index order by a single process.
- No suitable routing key exists for meaningful partitioning.
- Handler logic requires a holistic or stateful view of all messages across the log.
In contrast to regular segmented consumers, this mode disables concurrent processing across trackers but ensures strict ordering.
- Default:
false
-
clientControlledIndex
boolean clientControlledIndexIftrue, the consumer will not rely on Fluxzero's internal tracking index. Instead, the application itself is responsible for determining which messages to process.This is typically used in combination with
ignoreSegment()set totrueto ensure that all application instances receive every message—rather than a sharded subset.This mode is useful for scenarios where message delivery must be broadcast to all instances. For example, a WebSocket endpoint that pushes updates to connected clients may need to observe the full message stream, ensuring that each client sees every relevant update.
When
false(the default), Fluxzero tracks message indices and distributes segments to consumer trackers for balanced parallel processing.- Default:
false
-
storePositionManually
boolean storePositionManuallyWhether this consumer is taking manual control over storing its position in the log.When
true, the consumer is responsible for explicitly storing its position after processing one or more message batches. This allows for greater control — for example, when handling long-running workflows that span multiple batches, or when committing position should be deferred until post-processing is complete.When
false(the default), the position is automatically updated after each message batch is processed, ensuring progress is recorded and avoiding reprocessing on restart.Note: Even with manual position tracking enabled, the consumer will continue to receive "new" messages as long as the tracking process remains active. However, its persisted position will not be updated unless explicitly stored.
- Default:
false
-
exclusive
boolean exclusiveDetermines whether handlers assigned to this consumer are excluded from other consumers.If
true(default), a handler will only be active in this consumer. Iffalse, the same handler may be active in multiple consumers simultaneously. This enables advanced scenarios such as parallel replays alongside live processing.- Default:
true
-
passive
boolean passiveIndicates that this consumer should process messages without publishing result messages.When
true, return values from request handlers (e.g.,@HandleCommand,@HandleQuery,@HandleWebRequest) are ignored and not appended to the result log. This is useful for secondary consumers that perform side-effects or projections without impacting the result flow.- Default:
false
-
minIndex
long minIndexOptional minimum message index from which this consumer should begin processing.If set to a non-negative value, only messages at or above this index will be processed. If negative (the default), the consumer will start processing from the current end of the message log – i.e., it will only receive new messages from this point forward.
- Default:
-1L
-
maxIndexExclusive
long maxIndexExclusiveOptional exclusive upper bound for message processing. Messages at or above this index will not be processed. Ignored if negative.- Default:
-1L
-
exclusiveBeforeMinIndex
boolean exclusiveBeforeMinIndexWhether this consumer should remain exclusive for shared handlers beforeminIndex()is reached.Set to
falseto allow another consumer to handle the same shared handler before this consumer's minimum index takes effect.This is mainly useful during a consumer split, where a new consumer should take over from a specific index onward, while an existing consumer remains responsible for older messages. In that scenario, the new consumer typically sets
minIndex()andexclusiveBeforeMinIndex = false.In regular single-consumer setups this should usually remain
true.- Default:
true
-
exclusiveAfterMaxIndex
boolean exclusiveAfterMaxIndexWhether this consumer should remain exclusive for shared handlers frommaxIndexExclusive()onward.Set to
falseto allow another consumer to take over shared handler invocation once this consumer's exclusive upper bound has been reached.This is mainly useful during a consumer merge or handover, where an existing consumer should keep handling messages only up to a certain point, after which another consumer takes over. In that scenario, the retiring consumer typically sets
maxIndexExclusive()andexclusiveAfterMaxIndex = false.In regular single-consumer setups this should usually remain
true.- Default:
true
-
typeFilter
String typeFilterOptional regular expression used to filter message payload types on the Fluxzero Runtime.When specified, this filter is applied server-side to restrict the messages delivered to the consumer based on the fully qualified type name of the payload.
If left empty (the default), all message types are delivered to the client, and filtering is performed locally by the handlers. This is typically the preferred approach, as it avoids tightly coupling consumer configuration to type naming and allows for greater flexibility.
Example:
typeFilter = ".*\\.CreateUser$|.*\\.UpdateUser$"matches anyCreateUserorUpdateUsermessage types, regardless of package. This is useful for selectively tracking a set of message types without tying the filter to specific namespaces.- Default:
""
-
namespace
String namespaceSpecifies the namespace under which the consumer tracks messages.If left empty, the default namespace of the client application is used.
- Returns:
- The namespace under which the consumer tracks messages, or an empty string if the default client namespace should be used.
- See Also:
- Default:
""
-