Description:
Fetches messages from Apache Kafka, specifically for 0.8.x versions. The complementary NiFi processor for sending messages is PutKafka.
Tags:
Kafka, Apache, Get, Ingest, Ingress, Topic, PubSub
Properties:
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values.
Name | Default Value | Allowable Values | Description |
ZooKeeper Connection String | The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port> combinations. For example, host1:2181,host2:2181,host3:2188 | ||
Topic Name | The Kafka Topic to pull messages from | ||
Zookeeper Commit Frequency | 60 secs | Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will result in better overall performance but can result in more data duplication if a NiFi node is lost | |
Batch Size | 1 | Specifies the maximum number of messages to combine into a single FlowFile. These messages will be concatenated together with the <Message Demarcator> string placed between the content of each message. If the messages from Kafka should not be concatenated together, leave this value at 1. | |
Message Demarcator | \n | Specifies the characters to use in order to demarcate multiple messages from Kafka. If the <Batch Size> property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, this value will be placed in between them. | |
Client Name | NiFi-mock-processor | Client Name to use when communicating with Kafka | |
Group ID | mock-processor | A Group ID is used to identify consumers that are within the same consumer group | |
Kafka Communications Timeout | 30 secs | The amount of time to wait for a response from Kafka before determining that there is a communications error | |
ZooKeeper Communications Timeout | 30 secs | The amount of time to wait for a response from ZooKeeper before determining that there is a communications error | |
Auto Offset Reset | largest | * smallest</br> * largest | Automatically reset the offset to the smallest or largest offset available on the broker |
Dynamic Properties:
Dynamic Properties allow the user to specify both the name and value of a property.
Name | Value | Description |
The name of a Kafka configuration property. | The value of a given Kafka configuration property. | These properties will be added on the Kafka configuration after loading any provided configuration properties. In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be overriden with warning message describing the override. For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. |
Relationships:
Name | Description |
success | All FlowFiles that are created are routed to this relationship |
Reads Attributes:
None specified.
Writes Attributes:
Name | Description |
kafka.topic | The name of the Kafka Topic from which the message was received |
kafka.key | The key of the Kafka message, if it exists and batch size is 1. If the message does not have a key, or if the batch size is greater than 1, this attribute will not be added |
kafka.partition | The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1 |
kafka.offset | The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1 |
Description:
This Processors polls Apache Kafka for data. When a message is received from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value of the Kafka message. If the message has a key associated with it, an attribute named kafka.key will be added to the FlowFile, with the value being the UTF-8 Encoded value of the Message’s Key.
Kafka supports the notion of a Consumer Group when pulling messages in order to provide scalability while still offering a publish-subscribe interface. Each Consumer Group must have a unique identifier. The Consumer Group identifier that is used by NiFi is the UUID of the Processor. This means that all of the nodes within a cluster will use the same Consumer Group Identifier so that they do not receive duplicate data but multiple GetKafka Processors can be used to pull from multiple Topics, as each Processor will receive a different Processor UUID and therefore a different Consumer Group Identifier.