How to create a custom processor in Data Integration Platform

Data Integration Platform provided support to create a custom processor to meet custom or user specific requirements. This document will explain the steps involved in developing a custom processor and deploy them into the Data Integration Platform.

Follow below steps to create a custom processor.

Steps Involved

Perquisite

  • Maven 3.3.x or later
  • Java-8 or later


Project Creation

To keep the example as simple, we are going to create a processor to perform addition of 2 numbers and returns the value into the flow file.

Step 1: Choose / Create a working directory

Create a directory (folder) where you want to create a custom processor project say for example:

E:\CustomProcessor

Step 2: Generate Maven Archetype

Run the following command in command prompt on the created directory

Command

mvn archetype:generate

maven archetype command

Step 3: Input Parameters

You will be asked for more parameters while generating archetype,

Search for org.apache.nifi:nifi-processor-bundle-archetype project and note its archetype-number and type on the filter mentioned as follows.

Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): 1057: nifi

maven input parameters

Step 4: Choosing Archetype

Choose archetype to create customer processor

Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): : 1

archetype selection

Step 5: Version Selection

  • Select the version.
  • For Data Integration Platform 2.3.0.x, choose version 1.3.0
  • For Data Integration Platform 2.4.0.x, choose version 1.4.0

Choose a number: 25: 25

version selection

Step 6: Group ID, Artifact ID and Artifact Base Name

Enter the groupId, artifactId, version and artifactBaseName.

Define value for property ‘groupId’: org.apache.nifi

Define value for property ‘ artifactId’: sampleaddition

Define value for property ‘version’ 1.0-SNAPSHOT: : 1.4.0

Define value for property ‘artifactBaseName’: custom

Define value for property ‘package’ org.apache.nifi.processors.custom: : Optional, Enter package name if required else leave it as empty

Entering groupId, artifactId, version and artifactBaseName

Step 7: Open Project in IDE

Now you can find the project files under

E:\CustomProcessor\sampleaddition

project folder

Open the project in NetBeans or Eclipse. Below screenshot taken from NetBeans

Importing project to Netbeans IDE

Step 8: IDE Maven Settings

Change Maven Home in NetBeans appropriately to your installed Maven location,

  1. Open tools->options->Java->Maven
  2. Choose Maven home
  3. Choose “Skip Tests”

IDE Maven Settings

Step 10: Modules

You can see two modules in that,

  • nifi-custom-processors
  • nifi-custom-nar

Open the nifi-custom-processors project

Project modules

Implementation


Project File MyProcessor.java

You can find MyProcessor.java, change the name as per your requirement say SampleAddition.javaand implement the code to perform required operation.

  • Under the class initialize the property field which appears properties tab of the custom processor using PropertyDescriptor as data type.

    Code Implementation

       public static final PropertyDescriptor NUMBER_1 = new PropertyDescriptor
              .Builder().name("Input Value 1")
              .displayName("Input Value 1")
              .description("Enter the input value 1 to perform addition operation")
              .required(true)
              .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
              .build();
      public static final PropertyDescriptor NUMBER_2 = new PropertyDescriptor
              .Builder().name("Input Value 2")
              .displayName("Input Value 2")
              .description("Enter the input value 2 to perform addition operation")
              .required(true)
              .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
              .build();
    

    Custom Processor input values

  • Set the relationship for the processor using Relationship as data type which appears settings tab of the processor.

    Setting the Relationship

      public static final Relationship REL_SUCCESS = new Relationship.Builder()
                  .name("Success")
                  .description("All created FlowFiles are routed to this relationship")
                  .build();
    

    Processor Info

  • Add all the Processor description and relationship on the Init() function.

    Setting description on init function

  • Under onTrigger function get the input value and write the codes to accomplish the requirement. Write the result into the flowfile and transfer it into the success relationship.

    onTrigger image

      @Override
      public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {  
      int num1 = Integer.parseInt(context.getProperty(NUMBER_1).getValue()); // Get the input value 1;  
      int num2 = Integer.parseInt(context.getProperty(NUMBER_2).getValue()); // Get the input value 2;  
      final String output = String.valueOf((num1 + num2)); // Performs addition operation and write the result to variable output.  
      FlowFile flowFile = session.get(); // Creates the flowfile  
      flowFile = session.write(flowFile, new StreamCallback() {  
      @Override  
      public void process(InputStream inputStream, OutputStream outputStream) throws IOException {  
      IOUtils.write(output, outputStream); // writes the result to the flowfile.  
      }  
      });  
      //session.write(flowFile, osc)  
      if (flowFile == null) {  
      return;  
      }  
      session.transfer(flowFile, REL_SUCCESS); // Transfer the output flowfile to success relationship.  
      }  
    


  • Add the following maven dependency for the IOUtils.write() function in the following pom.xml (E:\CustomProcessor\nifi-custom-processors)
    Maven Dependency

          <dependency>
              <groupId>commons-io</groupId>
              <artifactId>commons-io</artifactId>
              <version>2.6</version>
          </dependency>
    


Resource file changes

Also, change the Java class name under nifi-custom-processors/Other Sources/src/main/resources/META-INF.services/org.apache.nifi.processor.Processor

Change Java Class Name

Packaging

Now clean and build the maven project to generate the required jar and nar file

Maven clean build

Once you build successfully, you can find the nar file under

E:\CustomProcessor\sampleaddition\nifi-custom-nar\target\nifi-custom-nar-1.4.0.nar

Deployment

Copy the nar file into “C:\Syncfusion\DataIntegration\<version>\SDK\NIFI\lib” and restart the Data Integration service using Data Integration Service Manager.

Workflow Creation

  • After restart the Data Integration Platform, you can find the custom processor listed under Others category of component toolbar. Drag and drop the custom processor SampleAddition into the canvas and configure the processor.

    Custom processor in canvas

    Processor configuration

  • After configuring the processor, start the processor. The addition operation gets performed and the output writes into flowfile.

    Flow file

    Ouput of the processor