Wait in Data Intergration

Description:
Routes incoming FlowFiles to the ‘wait’ relationship until a matching release signal is stored in the distributed cache from a corresponding Notify processor. When a matching release signal is identified, a waiting FlowFile is routed to the ‘success’ relationship, with attributes copied from the FlowFile that produced the release signal from the Notify processor. The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to ‘expired’ if they exceed the Expiration Duration. If you need to wait for more than one signal, specify the desired number of signals via the ‘Target Signal Count’ property. This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. In order to wait for all fragments to be processed, connect the ‘original’ relationship to a Wait processor, and the ‘splits’ relationship to a corresponding Notify processor. Configure the Notify and Wait processors to use the ‘${fragment.identifier}’ as the value of ‘Release Signal Identifier’, and specify ‘${fragment.count}’ as the value of ‘Target Signal Count’ in the Wait processor.

Tags:
map, cache, wait, hold, distributed, signal, release

Properties:
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the Expression Language Guide.

Name

Default Value

Allowable Values

Description

Release Signal Identifier

A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the release signal cache key

Supports Expression Language: true


Target Signal Count

1 A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the target signal count. This processor checks whether the signal count has reached this number. If Signal Counter Name is specified, this processor checks a particular counter, otherwise checks against total count in a signal.

Supports Expression Language: true


Signal Counter Name A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the signal counter name. If not specified, this processor checks the total count in a signal.

Supports Expression Language: true


Wait Buffer Count

1 Specify the maximum number of incoming FlowFiles that can be buffered to check whether it can move forward. The more buffer can provide the better performance, as it reduces the number of interactions with cache service by grouping FlowFiles by signal identifier. Only a signal identifier can be processed at a processor execution.

Releasable FlowFile Count

1 A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the releasable FlowFile count. This specifies how many FlowFiles can be released when a target count reaches target signal count. Zero (0) has a special meaning, any number of FlowFiles can be released as long as signal count matches target.

Supports Expression Language: true


Expiration Duration

10 min Indicates the duration after which waiting FlowFiles will be routed to the 'expired' relationship

Distributed Cache Service

Controller Service API:


AtomicDistributedMapCacheClient

Implementation:

DistributedMapCacheClientService


The Controller Service that is used to check for release signals from a corresponding Notify processor

Attribute Copy Mode

keeporiginal
  • Replace if present When cached attributes are copied onto released FlowFiles, they replace any matching attributes.
  • Keep original Attributes on released FlowFiles are not overwritten by copied cached attributes.
Specifies how to handle attributes copied from FlowFiles entering the Notify processor

Wait Mode

wait
  • Transfer to wait relationship Transfer a FlowFile to the 'wait' relationship when whose release signal has not been notified yet. This mode allows other incoming FlowFiles to be enqueued by moving FlowFiles into the wait relationship.
  • Keep in the upstream connection Transfer a FlowFile to the upstream connection where it comes from when whose release signal has not been notified yet. This mode helps keeping upstream connection being full so that the upstream source processor will not be scheduled while back-pressure is active and limit incoming FlowFiles.
Specifies how to handle a FlowFile waiting for a notify signal

Relationships:

Name

Description

expired A FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship
success A FlowFile with a matching release signal in the cache will be routed to this relationship
wait A FlowFile with no matching release signal in the cache will be routed to this relationship
failure When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship

Reads Attributes:
None specified.
Writes Attributes:

Name

Description

wait.start.timestamp All FlowFiles will have an attribute 'wait.start.timestamp', which sets the initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile.
wait.counter.<counterName> If a signal exists when the processor runs, each count value in the signal is copied.

State management:
This component does not store state.

Restricted:
This component is not restricted.

Input requirement:
This component requires an incoming relationship.