Description:
Execute a Flume sink. Each input FlowFile is converted into a Flume Event for the processing by the sink.
Tags:
flume, Hadoop, put, sink
Properties:
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values.
Name | Default Value | Allowable Values | Description |
Sink Type | The component type name for the sink. For some sinks, this is a short, symbolic name (e.g. HDFS). For others, it's the fully-qualified name of the Sink class. See the Flume User Guide for details. | ||
Agent Name | tier1 | The name of the agent used in the Flume sink configuration | |
Sink Name | sink-1 | The name of the sink used in the Flume sink configuration | |
Flume Configuration | The Flume configuration for the sink copied from the flume.properties file |
Relationships:
Name | Description |
success | |
failure |
Reads Attributes:
None specified.
Writes Attributes:
None specified.
Data Model
This processor executes an Apache Flume sink. FlowFiles are wrapped in Flume’s Event interface. The content of the FlowFile becomes the body of the Event and the attributes of the FlowFile become Event headers. The following special headers are also set:
Flume Event Header | FlowFile Attribute |
nifi.entry.date | FlowFile#getEntryDate() |
nifi.id | FlowFile#getId() |
nifi.last.queue.date | FlowFile#getLastQueueDate() |
nifi.lineage.identifiers.${i} | FlowFile#getLineageIdentifiers()[i] |
nifi.lineage.start.date | FlowFile#getLineageStartDate() |
nifi.size | FlowFile#getSize() |
Warning
In NiFi, the contents of a FlowFile are accessed via a stream, but in Flume it is stored in a byte array. This means the full content will be loaded into memory when a FlowFile is processed by the ExecuteFlumeSink processor. You should consider the typical size of the FlowFiles you’ll process and the batch size, if any, your sink is configured with when setting NiFi’s heap size.
Configuration Details
This processor is designed to execute arbitrary Flume sinks. Most of the details of configuring the sink is deferred to Flume’s built-in configuration system. For details on the available settings for each sink type, refer to the Flume [User Guide](http://flume.apache.org/FlumeUserGuide.html#flume-sinks “”). Configuring the Flume sink is a four step process:
-
Set the Sink Type property to a valid Flume sink type.
-
Set the Agent Name property to the name of the agent in your Flume configuration. This is the prefix of the properties in the Flume configuration file. Example: tier1
-
Set the Sink Name property to the name of the sink in your Flume configuration. If Agent Name is tier1, then the Sink Name is the value of the tier1.sinks property. Example: sink-1
-
Copy and paste the configuration for the sink from your Flume configuration file into the Flume Configuration property. Assuming you’re using the same Agent Name and Sink Name as in the examples above, this will be all of the properties that start with tier1.sinks.sink-1. Do not copy the tier1.sinks.sink-1.type or tier1.sinks.sink-1.channel properties.
Usage Example
Assuming you had the following existing Flume configuration file:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinks.k1.type = HDFS
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Then you’d configure the ExecuteFlumeSink as follows:
Property | Value |
Sink Type | HDFS |
Agent Name | a1 |
Sink Name | k1 |
Flume Configuration |
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute |