Description:

Execute a Flume sink. Each input FlowFile is converted into a Flume Event for the processing by the sink.

Tags:

flume, Hadoop, put, sink

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values.

Name Default Value Allowable Values Description
Sink Type The component type name for the sink. For some sinks, this is a short, symbolic name (e.g. HDFS). For others, it's the fully-qualified name of the Sink class. See the Flume User Guide for details.
Agent Name tier1 The name of the agent used in the Flume sink configuration
Sink Name sink-1 The name of the sink used in the Flume sink configuration
Flume Configuration The Flume configuration for the sink copied from the flume.properties file

Relationships:

Name Description
success
failure

Reads Attributes:

None specified.

Writes Attributes:

None specified.

Data Model

This processor executes an Apache Flume sink. FlowFiles are wrapped in Flume’s Event interface. The content of the FlowFile becomes the body of the Event and the attributes of the FlowFile become Event headers. The following special headers are also set:

Flume Event Header FlowFile Attribute
nifi.entry.date FlowFile#getEntryDate()
nifi.id FlowFile#getId()
nifi.last.queue.date FlowFile#getLastQueueDate()
nifi.lineage.identifiers.${i} FlowFile#getLineageIdentifiers()[i]
nifi.lineage.start.date FlowFile#getLineageStartDate()
nifi.size FlowFile#getSize()

Warning
In NiFi, the contents of a FlowFile are accessed via a stream, but in Flume it is stored in a byte array. This means the full content will be loaded into memory when a FlowFile is processed by the ExecuteFlumeSink processor. You should consider the typical size of the FlowFiles you’ll process and the batch size, if any, your sink is configured with when setting NiFi’s heap size.

Configuration Details

This processor is designed to execute arbitrary Flume sinks. Most of the details of configuring the sink is deferred to Flume’s built-in configuration system. For details on the available settings for each sink type, refer to the Flume [User Guide](http://flume.apache.org/FlumeUserGuide.html#flume-sinks “”). Configuring the Flume sink is a four step process:

  1. Set the Sink Type property to a valid Flume sink type.

  2. Set the Agent Name property to the name of the agent in your Flume configuration. This is the prefix of the properties in the Flume configuration file. Example: tier1

  3. Set the Sink Name property to the name of the sink in your Flume configuration. If Agent Name is tier1, then the Sink Name is the value of the tier1.sinks property. Example: sink-1

  4. Copy and paste the configuration for the sink from your Flume configuration file into the Flume Configuration property. Assuming you’re using the same Agent Name and Sink Name as in the examples above, this will be all of the properties that start with tier1.sinks.sink-1. Do not copy the tier1.sinks.sink-1.type or tier1.sinks.sink-1.channel properties.

Usage Example

Assuming you had the following existing Flume configuration file:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = HDFS
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Then you’d configure the ExecuteFlumeSink as follows:

Property Value
Sink Type HDFS
Agent Name a1
Sink Name k1
Flume Configuration a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute