Description and usage of ValidateRecord:
Validates the Records of an incoming FlowFile against a given schema. All records that adhere to the schema are routed to the “valid” relationship while records that do not adhere to schema are routed to the “invalid” relationship. It is therefore possible for a single incoming FlowFile to be split into two individual FlowFiles if some records are valid according to the schema and others are not. Any FlowFile that is routed to the “invalid” relationship will emit a ROUTE Provenance Event with the Details field populated to explain why records were invalid. In addition, to gain further explanation of why records were invalid, DEBUG-level logging can be enabled for the “org.apache.nifi.processors.standard.ValidateRecord” logger.
Tags:
record, schema, validate
Properties:
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
Name |
Default Value |
Allowable Values |
Description |
Record Reader |
Controller Service API: RecordReaderFactory Implementations: CSVReader GrokReader AvroReader JsonTreeReader JsonPathReader ScriptedReader |
Specifies the Controller Service to use for reading incoming data | |
Record Writer |
Controller Service API: RecordSetWriterFactory Implementations: JsonRecordSetWriter FreeFormTextRecordSetWriter AvroRecordSetWriter ScriptedRecordSetWriter CSVRecordSetWriter |
Specifies the Controller Service to use for writing out the records | |
Schema Access Strategy |
reader-schema |
* Use Reader's Schema * Use Schema Name Property * Use Schema Text Property |
Specifies how to obtain the schema that should be used to validate records |
Schema Registry |
Controller Service API: SchemaRegistry Implementations: AvroSchemaRegistryHortonworksSchemaRegistry ConfluentSchemaRegistry |
Specifies the Controller Service to use for the Schema Registry. This is necessary only if the Schema Access Strategy is set to "Use 'Schema Name' Property". | |
Schema Name | ${schema.name} |
Specifies the name of the schema to lookup in the Schema Registry property Supports Expression Language: true |
|
Schema Text | ${avro.schema} |
The text of an Avro-formatted Schema Supports Expression Language: true |
|
Allow Extra Fields |
true |
* true * false |
If the incoming data has fields that are not present in the schema, this property determines if the Record is valid. If true, the Record is still valid. If false, the Record will be invalid due to the extra fields. |
Strict Type Checking |
true |
* true * false |
If the incoming data has a Record where a field is not of the correct type, this property determine whether how to handle the Record. If true, the Record will still be considered invalid. If false, the Record will be considered valid and the field will be coerced into the correct type (if possible, according to the type coercion supported by the Record Writer). |
Relationships:
Name |
Description |
valid | Records that are valid according to the schema will be routed to this relationship |
invalid | Records that are not valid according to the schema will be routed to this relationship |
failure | If the records cannot be read, validated, or written, for any reason, the original FlowFile will be routed to this relationship |
Reads Attributes:
None specified.
Writes Attributes:
Name |
Description |
mime.type | Sets the mime.type attribute to the MIME Type specified by the Record Writer. |
record.count | The number of records in the FlowFile routed to a relationship. |
State management:
This component does not store state.
Restricted:
This component is not restricted.
Input requirement:
This component requires an incoming relationship.
Sample Workflow:
This sample workflow uses the ValidateRecord processor to implement in Data Integration platform.
List of processors used in this sample:
Processor |
Comments |
GetFile | Reads content from the given file and generate as flowfile. |
UpdateAttribute | Adds Schema Name as an attribute to the flowfile. |
ValidateRecord | Converts the CSV flowfile contents to JSON and writes the data to ‘Valid’ and ‘Invalid’ relationships. |
LogAttribute | Fetch valid and invalid data. |
Workflow screenshot
Step 1: Configure GetFile processor
Drag and drop the GetFile processor to the canvas area. Configure Input directory and other required properties in configuration dialog as shown in the following screenshots.
Step 2: Configure UpdateAttribute processor
Drag and drop the UpdateAttribute processor to the canvas area. UpdateAttribute processor can be used to add schema name as attribute to the flow file. Add new property ‘schema.name’ with required file name as a value in configure dialogue. Also make connection between GetFile and UpdateAttribute with ‘Success’ relationship.
Step 3: Configure ValidateRecord processor
Drag and drop the ValidateRecord processor to the canvas area. Converts the CSV flowfile contents to JSON and writes the data to:
- "Valid" relationship for records that adhere to the schema.
- "Invalid" relationship for records that do not adhere to the schema.
Also make connection between UpdateAttribute and ValidateRecord with ‘Success’ relationship.
1.Configure AvroSchemaRegistry Controller services:
The Schema Registry property is set to the AvroSchemaRegistry controller service which defines the schema name and enable the AvroSchemaRegistry controller services.
2.Configure CSVReader Controller service:
Select the View Configuration button next to the CSVReader controller service and configure required properties in configuration dialog as shown in the following screenshot.
3.Configure JSONRecordSetWriter Controller service:
Select the View Configuration button next to the JSONRecordSetWriter controller service and configure required properties in configuration dialog as shown in the following screenshot.
Step 4: Configure LogAttribute processor:
Drag and drop the LogAttribute processor to the canvas area. LogAttribute processor can be used to fetch the valid and invalid data. From that, we can investigate the contents and provenance of the queued flowfiles. Configure required properties in configuration dialog as shown in the following screenshot. Also make connection between ValidateRecord and one LogAttribute with ‘valid’ another with ‘invalid’ relationship.
Step 5: Starting workflow
Once all processors are configured, start the workflow. You can see the data flow through the processors.