Description and usage of ExecuteGroovyScript processor:

Experimental Extended Groovy script processor. The script is responsible for handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by the script. If the handling is incomplete or incorrect, the session will be rolled back.

Tags:

script, groovy, groovyx

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the Expression Language Guide.

Name

Default Value

Allowable Values

Description

Script File Path to script file to execute. Only one of Script File or Script Body may be used

Supports Expression Language: true (will be evaluated using variable registry only)


Script Body Body of script to execute. Only one of Script File or Script Body may be used

Failure strategy

rollback * rollback
* transfer to failure
What to do with unhandled exceptions. If you want to manage exception by code then keep the default value `rollback`. If `transfer to failure` selected and unhandled exception occurred then all flowFiles received from incoming queues in this session will be transferred to `failure` relationship with additional attributes set: ERROR_MESSAGE and ERROR_STACKTRACE. If `rollback` selected and unhandled exception occurred then all flowFiles received from incoming queues will be penalized and returned. If the processor has no incoming connections then this parameter has no effect.
Additional classpath Classpath list separated by semicolon. You can use masks like `*`, `*.jar` in file name.

Supports Expression Language: true (will be evaluated using variable registry only)


Dynamic Properties:

Dynamic Properties allow the user to specify both the name and value of a property.

Name

Value

Description

A script engine property to update The value to set it to Updates a script engine property specified by the Dynamic Property's key with the value specified by the Dynamic Property's value. Use `CTL.` to access any controller services.

Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)


Relationships:

Name

Description

success FlowFiles that were successfully processed
failure FlowFiles that failed to be processed

Reads Attributes:

None specified.

Writes Attributes:

None specified.

State management:

This component does not store state.

Restricted:

Required Permission

Explanation

execute code Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.

Input requirement:

This component requires an incoming relationship.

System Resource Considerations:

None specified.

Summary

This is a grooviest groovy script :)

Script Bindings:

variable

type

description

session org.apache.nifi.processor.ProcessSession the session that is used to get, change, and transfer input files
context org.apache.nifi.processor.ProcessContext the context (almost unused)
log org.apache.nifi.logging.ComponentLog the logger for this processor instance
REL_SUCCESS org.apache.nifi.processor.Relationship the success relationship
REL_FAILURE org.apache.nifi.processor.Relationship the failure relationship
CTL java.util.HashMap<String,ControllerService> Map populated with controller services defined with `CTL.*` processor properties. 
The `CTL.` prefixed properties could be linked to controller service and provides access to this service from a script without additional code.
SQL java.util.HashMap<String,groovy.sql.Sql> Map populated with `groovy.sql.Sql` objects connected to corresponding database defined with `SQL.*` processor properties. 
The `SQL.` prefixed properties could be linked only to DBCPService.
Dynamic processor properties org.apache.nifi.components.PropertyDescriptor All processor properties not started with `CTL.` or `SQL.` are bound to script variables

SQL map details

Example: if you defined property SQL.mydb and linked it to any DBCPService, then you can access it from code SQL.mydb.rows(‘select * from mytable’)

The processor automatically takes connection from dbcp service before executing script and tries to handle transaction: 
database transactions automatically rolled back on script exception and committed on success. 
Or you can manage transaction manually. 

NOTE: Script must not disconnect connection. 

Data Integration Processors Execute Groovy Script

Data Integration Processors file extension

SessionFile - flow file extension

The (org.apache.nifi.processors.groovyx.flow.SessionFile) is an actual object returned by session in Extended Groovy processor.
This flow file is a container that references session and the real flow file.

This allows to use simplified syntax to work with file attributes and content:

set new attribute value

flowFile.ATTRIBUTE_NAME = ATTRIBUTE_VALUE 
flowFile.'mime.type' = 'text/xml' 
flowFile.putAttribute("ATTRIBUTE_NAME", ATTRIBUTE_VALUE)
//the same as
flowFile = session.putAttribute(flowFile, "ATTRIBUTE_NAME", ATTRIBUTE_VALUE)

remove attribute

flowFile.ATTRIBUTE_NAME = null
//equals to
flowFile = session.removeAttribute(flowFile, "ATTRIBUTE_NAME")

get attribute value

String a = flowFile.ATTRIBUTE_NAME

write content

flowFile.write("UTF-8", "THE CharSequence to write into flow file replacing current content")
flowFile.write("UTF-8"){writer-> 
  do something with java.io.Writer...
}
flowFile.write{outStream-> 
  do something with output stream...
}
flowFile.write{inStream, outStream-> 
  do something with input and output streams... 
}

get content

InputStream i = flowFile.read()
def json = new groovy.json.JsonSlurper().parse( flowFile.read() )
String text = flowFile.read().getText("UTF-8")

transfer flow file to success relation

REL_SUCCESS << flowFile 
flowFile.transfer(REL_SUCCESS)
//the same as:
session.transfer(flowFile, REL_SUCCESS)

work with dbcp

import groovy.sql.Sql
//define property named `SQL.db` connected to a DBCPConnectionPool controller service
//for this case it's an H2 database example

//read value from the database with prepared statement 
//and assign into flowfile attribute `db.yesterday`
def daysAdd = -1
def row = SQL.db.firstRow("select dateadd('DAY', ${daysAdd}, sysdate) as DB_DATE from dual")
flowFile.'db.yesterday' = row.DB_DATE
//to work with BLOBs and CLOBs in the database 
//use parameter casting using groovy.sql.Sql.BLOB(Stream) and groovy.sql.Sql.CLOB(Reader)

//write content of the flow file into database blob
flowFile.read{ rawIn->
  def parms = [
    p_id   : flowFile.ID as Long, //get flow file attribute named `ID`
    p_data : Sql.BLOB( rawIn ),   //use input stream as BLOB sql parameter
]
  SQL.db.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id")
}

Handling processor start & stop

In the extended groovy processor you can catch start and stop events by providing corresponding static methods:

import org.apache.nifi.processor.ProcessContext
import java.util.concurrent.atomic.AtomicLong
class Const{
  static Date startTime = null;
  static AtomicLong triggerCount = null;
}

static onStart(ProcessContext context){
  Const.startTime = new Date()
  Const.triggerCount = new AtomicLong(0)
  println "onStart $context ${Const.startTime}"
}

static onStop(ProcessContext context){
  def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000
  println "onStop $context executed ${ Const.triggerCount } times during ${ alive } seconds"
}


flowFile.'trigger.count' = Const.triggerCount.incrementAndGet()
REL_SUCCESS << flowFile