Description and usage of ValidateRecord:

Validates the Records of an incoming FlowFile against a given schema. All records that adhere to the schema are routed to the “valid” relationship while records that do not adhere to schema are routed to the “invalid” relationship. It is therefore possible for a single incoming FlowFile to be split into two individual FlowFiles if some records are valid according to the schema and others are not. Any FlowFile that is routed to the “invalid” relationship will emit a ROUTE Provenance Event with the Details field populated to explain why records were invalid. In addition, to gain further explanation of why records were invalid, DEBUG-level logging can be enabled for the “org.apache.nifi.processors.standard.ValidateRecord” logger.

Tags:

record, schema, validate

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Name

Default Value

Allowable Values

Description

Record Reader

Controller Service API:


RecordReaderFactory

Implementations:


CSVReader
GrokReader
AvroReader
JsonTreeReader
JsonPathReader
ScriptedReader
Specifies the Controller Service to use for reading incoming data

Record Writer

Controller Service API:


RecordSetWriterFactory

Implementations:


JsonRecordSetWriter
FreeFormTextRecordSetWriter
AvroRecordSetWriter
ScriptedRecordSetWriter
CSVRecordSetWriter
Specifies the Controller Service to use for writing out the records

Schema Access Strategy

reader-schema * Use Reader's Schema
* Use Schema Name Property
* Use Schema Text Property 
Specifies how to obtain the schema that should be used to validate records
Schema Registry

Controller Service API:


SchemaRegistry

Implementations:

AvroSchemaRegistry
HortonworksSchemaRegistry
ConfluentSchemaRegistry
Specifies the Controller Service to use for the Schema Registry. This is necessary only if the Schema Access Strategy is set to "Use 'Schema Name' Property".
Schema Name ${schema.name} Specifies the name of the schema to lookup in the Schema Registry property

Supports Expression Language: true


Schema Text ${avro.schema} The text of an Avro-formatted Schema

Supports Expression Language: true


Allow Extra Fields

true * true
* false
If the incoming data has fields that are not present in the schema, this property determines if the Record is valid. If true, the Record is still valid. If false, the Record will be invalid due to the extra fields.

Strict Type Checking

true * true
* false
If the incoming data has a Record where a field is not of the correct type, this property determine whether how to handle the Record. If true, the Record will still be considered invalid. If false, the Record will be considered valid and the field will be coerced into the correct type (if possible, according to the type coercion supported by the Record Writer).

Relationships:

Name

Description

valid Records that are valid according to the schema will be routed to this relationship
invalid Records that are not valid according to the schema will be routed to this relationship
failure If the records cannot be read, validated, or written, for any reason, the original FlowFile will be routed to this relationship

Reads Attributes:

None specified.

Writes Attributes:

Name

Description

mime.type Sets the mime.type attribute to the MIME Type specified by the Record Writer.
record.count The number of records in the FlowFile routed to a relationship.

State management:

This component does not store state.

Restricted:

This component is not restricted.

Input requirement:

This component requires an incoming relationship.

Sample Workflow:

This sample workflow uses the ValidateRecord processor to implement in Data Integration platform.

List of processors used in this sample:

Processor

Comments

GetFile Reads content from the given file and generate as flowfile.
UpdateAttribute Adds Schema Name as an attribute to the flowfile.
ValidateRecord Converts the CSV flowfile contents to JSON and writes the data to ‘Valid’ and ‘Invalid’ relationships.
LogAttribute Fetch valid and invalid data.


Workflow screenshot

Overall workflow

Step 1: Configure GetFile processor

Drag and drop the GetFile processor to the canvas area. Configure Input directory and other required properties in configuration dialog as shown in the following screenshots.

Drag and drop GetFile processor

Configuration of Get file processor

Step 2: Configure UpdateAttribute processor

Drag and drop the UpdateAttribute processor to the canvas area. UpdateAttribute processor can be used to add schema name as attribute to the flow file. Add new property ‘schema.name’ with required file name as a value in configure dialogue. Also make connection between GetFile and UpdateAttribute with ‘Success’ relationship.

Configuration of UpdateAttribute processor

Step 3: Configure ValidateRecord processor

Drag and drop the ValidateRecord processor to the canvas area. Converts the CSV flowfile contents to JSON and writes the data to:

  • "Valid" relationship for records that adhere to the schema.
  • "Invalid" relationship for records that do not adhere to the schema.

Also make connection between UpdateAttribute and ValidateRecord with ‘Success’ relationship.

Configuration of ValidateRecord processor

1.Configure AvroSchemaRegistry Controller services:

The Schema Registry property is set to the AvroSchemaRegistry controller service which defines the schema name and enable the AvroSchemaRegistry controller services.

AvroSchemaRegistry controller service Configuration

2.Configure CSVReader Controller service:

Select the View Configuration button next to the CSVReader controller service and configure required properties in configuration dialog as shown in the following screenshot.

CSVReader Controller service Configuration

3.Configure JSONRecordSetWriter Controller service:

Select the View Configuration button next to the JSONRecordSetWriter controller service and configure required properties in configuration dialog as shown in the following screenshot.

JSONRecordSetWriter Controller service Configuration

Step 4: Configure LogAttribute processor:

Drag and drop the LogAttribute processor to the canvas area. LogAttribute processor can be used to fetch the valid and invalid data. From that, we can investigate the contents and provenance of the queued flowfiles. Configure required properties in configuration dialog as shown in the following screenshot. Also make connection between ValidateRecord and one LogAttribute with ‘valid’ another with ‘invalid’ relationship.

Log Attribute Configuration

Step 5: Starting workflow

Once all processors are configured, start the workflow. You can see the data flow through the processors.

Workflow running