Wait in Data Intergration
Description:
Routes incoming FlowFiles to the ‘wait’ relationship until a matching release signal is stored in the distributed cache from a corresponding Notify processor. When a matching release signal is identified, a waiting FlowFile is routed to the ‘success’ relationship, with attributes copied from the FlowFile that produced the release signal from the Notify processor. The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to ‘expired’ if they exceed the Expiration Duration. If you need to wait for more than one signal, specify the desired number of signals via the ‘Target Signal Count’ property. This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. In order to wait for all fragments to be processed, connect the ‘original’ relationship to a Wait processor, and the ‘splits’ relationship to a corresponding Notify processor. Configure the Notify and Wait processors to use the ‘${fragment.identifier}’ as the value of ‘Release Signal Identifier’, and specify ‘${fragment.count}’ as the value of ‘Target Signal Count’ in the Wait processor.
Tags:
map, cache, wait, hold, distributed, signal, release
Properties:
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the Expression Language Guide.
Name |
Default Value |
Allowable Values |
Description |
Release Signal Identifier |
A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the release signal cache key Supports Expression Language: true |
||
Target Signal Count |
1 |
A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the target signal count. This processor checks whether the signal count has reached this number. If Signal Counter Name is specified, this processor checks a particular counter, otherwise checks against total count in a signal. Supports Expression Language: true |
|
Signal Counter Name |
A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the signal counter name. If not specified, this processor checks the total count in a signal. Supports Expression Language: true |
||
Wait Buffer Count |
1 | Specify the maximum number of incoming FlowFiles that can be buffered to check whether it can move forward. The more buffer can provide the better performance, as it reduces the number of interactions with cache service by grouping FlowFiles by signal identifier. Only a signal identifier can be processed at a processor execution. | |
Releasable FlowFile Count |
1 |
A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the releasable FlowFile count. This specifies how many FlowFiles can be released when a target count reaches target signal count. Zero (0) has a special meaning, any number of FlowFiles can be released as long as signal count matches target. Supports Expression Language: true |
|
Expiration Duration |
10 min | Indicates the duration after which waiting FlowFiles will be routed to the 'expired' relationship | |
Distributed Cache Service |
Controller Service API: AtomicDistributedMapCacheClient Implementation: DistributedMapCacheClientService |
The Controller Service that is used to check for release signals from a corresponding Notify processor | |
Attribute Copy Mode |
keeporiginal |
|
Specifies how to handle attributes copied from FlowFiles entering the Notify processor |
Wait Mode |
wait |
|
Specifies how to handle a FlowFile waiting for a notify signal |
Relationships:
Name |
Description |
expired | A FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship |
success | A FlowFile with a matching release signal in the cache will be routed to this relationship |
wait | A FlowFile with no matching release signal in the cache will be routed to this relationship |
failure | When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship |
Reads Attributes:
None specified.
Writes Attributes:
Name |
Description |
wait.start.timestamp | All FlowFiles will have an attribute 'wait.start.timestamp', which sets the initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile. |
wait.counter.<counterName> | If a signal exists when the processor runs, each count value in the signal is copied. |
State management:
This component does not store state.
Restricted:
This component is not restricted.
Input requirement:
This component requires an incoming relationship.