Description:

Fetches messages from Apache Kafka, specifically for 0.8.x versions. The complementary NiFi processor for sending messages is PutKafka.

Tags:

Kafka, Apache, Get, Ingest, Ingress, Topic, PubSub

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values.

Name Default Value Allowable Values Description
ZooKeeper Connection String The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port> combinations. For example, host1:2181,host2:2181,host3:2188
Topic Name The Kafka Topic to pull messages from
Zookeeper Commit Frequency 60 secs Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will result in better overall performance but can result in more data duplication if a NiFi node is lost
Batch Size 1 Specifies the maximum number of messages to combine into a single FlowFile. These messages will be concatenated together with the <Message Demarcator> string placed between the content of each message. If the messages from Kafka should not be concatenated together, leave this value at 1.
Message Demarcator \n Specifies the characters to use in order to demarcate multiple messages from Kafka. If the <Batch Size> property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, this value will be placed in between them.
Client Name NiFi-mock-processor Client Name to use when communicating with Kafka
Group ID mock-processor A Group ID is used to identify consumers that are within the same consumer group
Kafka Communications Timeout 30 secs The amount of time to wait for a response from Kafka before determining that there is a communications error
ZooKeeper Communications Timeout 30 secs The amount of time to wait for a response from ZooKeeper before determining that there is a communications error
Auto Offset Reset largest * smallest</br> * largest Automatically reset the offset to the smallest or largest offset available on the broker

Dynamic Properties:
Dynamic Properties allow the user to specify both the name and value of a property.

Name Value Description
The name of a Kafka configuration property. The value of a given Kafka configuration property. These properties will be added on the Kafka configuration after loading any provided configuration properties. In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be overriden with warning message describing the override. For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.

Relationships:

Name Description
success All FlowFiles that are created are routed to this relationship

Reads Attributes:

None specified.

Writes Attributes:

Name Description
kafka.topic The name of the Kafka Topic from which the message was received
kafka.key The key of the Kafka message, if it exists and batch size is 1. If the message does not have a key, or if the batch size is greater than 1, this attribute will not be added
kafka.partition The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1
kafka.offset The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1

Description:

This Processors polls Apache Kafka for data. When a message is received from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value of the Kafka message. If the message has a key associated with it, an attribute named kafka.key will be added to the FlowFile, with the value being the UTF-8 Encoded value of the Message’s Key.

Kafka supports the notion of a Consumer Group when pulling messages in order to provide scalability while still offering a publish-subscribe interface. Each Consumer Group must have a unique identifier. The Consumer Group identifier that is used by NiFi is the UUID of the Processor. This means that all of the nodes within a cluster will use the same Consumer Group Identifier so that they do not receive duplicate data but multiple GetKafka Processors can be used to pull from multiple Topics, as each Processor will receive a different Processor UUID and therefore a different Consumer Group Identifier.