Description and usage of EnforceOrder processor:
Enforces expected ordering of FlowFiles those belong to the same data group. Although PriorityAttributePrioritizer can be used on a connection to ensure that flow files going through that connection are in priority order, depending on error-handling, branching, and other flow designs, it is possible for FlowFiles to get out-of-order. EnforceOrder can be used to enforce original ordering for those FlowFiles. [IMPORTANT] In order to take effect of EnforceOrder, FirstInFirstOutPrioritizer should be used at EVERY downstream relationship UNTIL the order of FlowFiles physically get FIXED by operation such as MergeContent or being stored to the final destination.
Tags:
sort, order
Properties:
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
Name | Default Value | Allowable Values | Description |
Group Identifier | ${filename} | EnforceOrder is capable of multiple ordering groups. 'Group Identifier' is used to determine which group a FlowFile belongs to. This property will be evaluated with each incoming FlowFile. If evaluated result is empty, the FlowFile will be routed to failure.</br> Supports Expression Language: true | |
Order Attribute | A name of FlowFile attribute whose value will be used to enforce order of FlowFiles within a group. If a FlowFile does not have this attribute, or its value is not an integer, the FlowFile will be routed to failure. | ||
Initial Order | 0 | When the first FlowFile of a group arrives, initial target order will be computed and stored in the managed state. After that, target order will start being tracked by EnforceOrder and stored in the state management store. If Expression Language is used but evaluated result was not an integer, then the FlowFile will be routed to failure, and initial order will be left unknown until consecutive FlowFiles provide a valid initial order.</br> Supports Expression Language: true | |
Maximum Order | If specified, any FlowFiles that have larger order will be routed to failure. This property is computed only once for a given group. After a maximum order is computed, it will be persisted in the state management store and used for other FlowFiles belonging to the same group. If Expression Language is used but evaluated result was not an integer, then the FlowFile will be routed to failure, and maximum order will be left unknown until consecutive FlowFiles provide a valid maximum order.</br> Supports Expression Language: true | ||
Batch Count | 1000 | The maximum number of FlowFiles that EnforceOrder can process at an execution. | |
Wait Timeout | 10 min | Indicates the duration after which waiting FlowFiles will be routed to the 'overtook' relationship. | |
Inactive Timeout | 30 min | Indicates the duration after which state for an inactive group will be cleared from managed state. Group is determined as inactive if any new incoming FlowFile has not seen for a group for specified duration. Inactive Timeout must be longer than Wait Timeout. If a FlowFile arrives late after its group is already cleared, it will be treated as a brand new group, but will never match the order since expected preceding FlowFiles are already gone. The FlowFile will eventually timeout for waiting and routed to 'overtook'. To avoid this, group states should be kept long enough, however, shorter duration would be helpful for reusing the same group identifier again. |
Relationships:
Name | Description |
overtook | A FlowFile that waited for preceding FlowFiles longer than Wait Timeout and overtook those FlowFiles, will be routed to this relationship. |
skipped | A FlowFile that has an order younger than current, which means arrived too late and skipped, will be routed to this relationship. |
success | A FlowFile with a matching order number will be routed to this relationship. |
wait | A FlowFile with non matching order will be routed to this relationship. |
failure | A FlowFiles which does not have required attributes, or fails to compute those will be routed to this relationship. |
Reads Attributes:
None specified.
Writes Attributes:
Name | Description |
EnforceOrder.startedAt | All FlowFiles going through this processor will have this attribute. This value is used to determine wait timeout. |
EnforceOrder.result | All FlowFiles going through this processor will have this attribute denoting which relationship it was routed to. |
EnforceOrder.detail | FlowFiles routed to 'failure' or 'skipped' relationship will have this attribute describing details. |
EnforceOrder.expectedOrder | FlowFiles routed to 'wait' or 'skipped' relationship will have this attribute denoting expected order when the FlowFile was processed. |
State management:
Scope | Description |
LOCAL | EnforceOrder uses following states per ordering group: '**groupId**.target' is a order number which is being waited to arrive next. When a FlowFile with a matching order arrives, or a FlowFile overtakes the FlowFile being waited for because of wait timeout, target order will be updated to (FlowFile.order + 1). '**groupId**.max is the maximum order number for a group. '**groupId**.updatedAt' is a timestamp when the order of a group was updated last time. These managed states will be removed automatically once a group is determined as inactive, see 'Inactive Timeout' for detail. |
Restricted:
This component is not restricted.
Input requirement:
This component requires an incoming relationship.
How to Configure?
Sample for EnforceOrder processor:
This sample explains how to order the flow files sequentially to merge as a single flow file and store it in the local file system.
Overview:
Step 1: Generate custom files
Drag and drop the GenerateFlowFile processors to generate the custom flow files and configure it as follows.
1_GenerateFlowFile configuration:
2_GenerateFlowFile configuration:
3_GenerateFlowFile configuration:
Step 2: Set group name and priority
Drag and drop the UpdateAttribute processors to set the group name and priority as follows.
1_UpdateAttribute configuration:
2_UpdateAttribute configuration:
3_UpdateAttribute configuration:
Step 3: Configure EnforceOrder processor
Drag and drop the EnforceOrder processor to sort the flow files based on the group name and priority attribute.
Properties and usage:
Group Identifier: Specify the group name attribute in the UpdateAttribute processor.
Order Attribute: Specify the attribute name which contains the values in integers and sort the flow files based on the attribute value.
Initial Order: Specify the initial priority value to start sorting the flow files.
Maximum Order: Specify the maximum priority number to process the flow files. The following properties will be stored automatically after the execution of the flow.
Batch Count: Specify the number of flow files to be processed at a time.
Wait timeout: Specify the time for the flow file to stay in the ‘wait’ relationship. If the incoming flow file arrives when the state is cleared or after the state expires, then the flow file will be routed to the ‘wait’ relationship. The flow file waits until the ‘Wait timeout’ and then routed to ‘success.’
Inactive timeout: Specify the time for the flow file to stay in the ‘skipped’ relationship and to automatically expire the state if no flow files arrive from the same group until the ‘Inactive timeout.’ If the incoming flow file arrives before the state expires, then the flow file will be routed to a ‘skipped’ relationship. The flow file waits until the expiration of state and then routed to ‘success.’
Connect the “wait” and “skipped” relationships to itself.
Note: Configure the connection between EnforceOrder and MergeContent processors with FirstInFirstOutPrioritizer.
Step 4: Merge content of flow files
Drag and drop the MergeContent processor to merge the content of the multiple flow files into a single file and configure it as follows.
Step 5: Store the file
Drag and drop the PutFile processor to store the merged file in the local file system and configure it as follows.
Output: