Description and usage of SpringContextProcessor:

A Processor that supports sending and receiving data from application defined in Spring Application Context via predefined in/out MessageChannels.

Tags:

Spring, Message, Get, Put, Integration

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values.

Name Default Value Allowable Values Description
Application Context config path The path to the Spring Application Context configuration file relative to the classpath
Application Context class path Path to the directory with resources (i.e., JARs, configuration files etc.) required to be on the classpath of the ApplicationContext.
Send Timeout Timeout for sending data to Spring Application Context. Defaults to 0.
Receive Timeout Timeout for receiving date from Spring context. Defaults to 0.

Relationships:

Name Description
failure All FlowFiles that cannot be sent to Spring Application Context are routed to this relationship
success All FlowFiles that are successfully received from Spring Application Context are routed to this relationship

Reads Attributes:

None specified.

Writes Attributes:

None specified.

Description:

SpringContextProcessor - allows integration of processes encapsulated in Spring Application Context to run as NiFi processor by becoming a runtime host for an instance of Spring Application Context.

Communication between NiFi and process encapsulated within Spring Application Context is accomplished via Spring Messaging (one of the core modules of Spring Framework) and supports 3 usage modes:

  • Headless - no interaction with NiFi, meaning nothing is sent to it and nothing is received from it (i.e., some monitoring app). In this case NiFi simply plays the role of the runtime host.

  • One way (NiFi -> Spring or Spring -> NiFi). - This depends on existence of pre-defined message channel in Spring Application Context. The name of the channel should be “fromNiFi” and the type org.springframework.messaging.MessageChannel.

  • By-directional (NiFi -> Spring -> Nifi or Spring -> NiFi -> Spring) - This depends on existence of two channels in Spring Application Context. One channel receives messages from NiFi with name “fromNiFi” and type org.springframework.messaging.MessageChanneli> and another is to receive messages from Spring with name “toNiFi” and type org.springframework.messaging.PollableChannel.

The example below demonstrates template configuration for bi-directional Spring Application Context configuration:

  <int:channel id=”fromNiFi”/>

/your custom app configuration to receive messages from ‘fromNiFi’ channel and optionally send back to NiFi via ‘toNiFi’ channel.It could contain any Spring-based application (i.e., Spring Integration, Apache Camel and/or custom code). All you need to do is inject
channels into your beans and send/receive messages from it.
/

<int:channel id=”toNiFi”>

<int:queue/>

</int:channel>

The component is based on assumption that user has an existing Spring Application encapsulated in Spring Context that exposes optional in/out MessagingChannels to allow data to flow to/from ApplicationContext and into/out-of. NiFi. Such application is realized by having a directory on the file system, which contains contains all required resources for such application to run. Such resources usually are JAR files to satisfy application’s class-path as well as JAR representing the application and its configuration. Below is the example of what such directory may contain. In this case the ‘SI_DEMO-0.0.1-SNAPSHOT.jar’ represents the actual application and the rest of the JARs represent class-path dependency required by an application.

    deps
     -- SI_DEMO-0.0.1-SNAPSHOT.jar
     -- aopalliance-1.0.jar
     -- commons-logging-1.2.jar
     -- spring-aop-4.2.4.RELEASE.jar
     -- spring-beans-4.2.4.RELEASE.jar
     -- spring-context-4.2.4.RELEASE.jar
     -- spring-core-4.2.4.RELEASE.jar
     -- spring-expression-4.2.4.RELEASE.jar
     -- spring-integration-core-4.2.5.RELEASE.jar
     -- spring-messaging-4.2.4.RELEASE.jar

You introduce the processor the usual way and then configure its properties:

  • Application Context config path [REQUIRED] - a path to the Application Context configuration. The path is relative to the class-path of the application defined by the Application Context class path property

  • Application Context class path [REQUIRED] - a path to a directory on the file system where application dependencies are.

  • Send Timeout [OPTIONAL] - the wait time for sending messages to Spring Application Context. Only required if NiFi plans to send data to Spring. Defaults to 0 (don’t wait). FlowFiles that were successfully sent to Spring will be removed from session while FlowFiles that could not be sent to Spring will be routed to failure relationship.

  • Receive Timeout - [OPTIONAL] - the wait time for receiving messages from Spring Application Context. Only required if NiFi plans to receive data from Spring. Defaults to 0 (don’t wait). FlowFile is created if and only if a message is successfully received from Spring. It is then transfered to success relationship.

How to Configure?

This sample explains how to send a message from Spring application to the Data integration platform and log its attributes in a log file.

Create a Spring Application to send message

Add maven dependencies:

Add the following dependencies in the pom.xml file of the created maven application and build the project.

<dependencies>
	<dependency>
		<groupId>org.springframework.integration</groupId>
		<artifactId>spring-integration-core</artifactId>
		<version>4.2.4.RELEASE</version>
	</dependency>
</dependencies
<build>
	<plugins>
		<plugin>
			<artifactId>maven-dependency-plugin</artifactId>
			<executions>
				<execution>
					<phase>install</phase>
					<goals>
						<goal>copy-dependencies</goal>
					</goals>
					<configuration>
			     <outputDirectory>target/deps</outputDirectory>
				     </configuration>
				</execution>
			</executions>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-jar-plugin</artifactId>
			<version>2.3.1</version>
			<configuration>
				<outputDirectory>target/deps</outputDirectory>
			</configuration>
		</plugin>
	</plugins>
</build>

Creating a message:

In this step, implement a program that will generate a message ‘Message from Spring!’ for every 2 seconds for sending it through the message channel.

SendOnlyService.java:

package oz.spring;

import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

public class SendOnlyService implements SmartLifecycle {

private volatile MessageChannel channel;

private volatile boolean started;

private volatile ScheduledExecutorService executor;

public void setChannel(MessageChannel channel) {
	this.channel = channel;
}

@Override
public void start() {
	this.executor = Executors.newSingleThreadScheduledExecutor();
	final Random r = new Random();
	this.started = true;
	this.executor.execute(new Runnable() {
		@Override
		public void run() {
			System.out.println("=> Generating message");
			Message<String> message = MessageBuilder
					.withPayload("Message from Spring!")
					.build();
			channel.send(message);
			executor.schedule(this, r.nextInt(2000),TimeUnit.MILLISECONDS);
		}
	});
}

@Override
public void stop() {
	this.started = false;
	this.executor.shutdown();
}

@Override
public boolean isRunning() {
	return this.started;
}

@Override
public int getPhase() {
	return 0;
}

@Override
public boolean isAutoStartup() {
	return true;
}

@Override
public void stop(Runnable callback) {
	this.stop();
	callback.run();
}
}

Creating message channel:

In this step, create a message channel named ‘toNifi’ to send a message to the Data integration platform through this channel.

toNiFiOnly.xml:

<bean class="oz.spring.SendOnlyService">
	<property name="channel" ref="toNiFi"/>
</bean>
	
<int:channel id="toNiFi">
	<int:queue/>
</int:channel>

Create a workflow in the Data Integration Platform:

Overview:

sample

Step 1: Receive a message from a Spring application

Drag and drop the SpringContext processor and specify the following properties:

Application Context config path: <Your application XML file containing channel to communicate with Nifi>

Application Context classpath: <Specify the directory containing dependencies>

sample

After starting the SpringContext processor, the flow files will be generated for every 2 seconds with the message printed as follows:

sample

Step 2: Log flow file attributes

Drag and drop the LogAttribute processor and select the type of log level as ‘info’ for ‘success’ relationship from the SpringContext processor.

sample

Drag and drop the other LogAttribute processor and select the type of log level as ‘warn’ for ‘failure’ relationship from the SpringContext processor.

sample

After starting the processors, the message from spring application and flow file attributes will be logged in the Data integration logs folder as follows:

sample