Description:

Consumes messages from Apache Kafka built against the Kafka 0.9.x Consumer API. The complementary NiFi processor for sending messages is PublishKafka.

Tags:

Kafka, Get, Ingest, Ingress, Topic, PubSub, Consume, 0.9.x

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the Expression Language Guide.

Name Default Value Allowable Values Description
Kafka Brokers localhost:9092 A comma-separated list of known Kafka Brokers in the format host:port Supports Expression Language: true
Security Protocol PLAINTEXT * PLAINTEXT

* SSL

* SASL_PLAINTEXT

* SASL_SSL

Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.
Kerberos Service Name The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. Corresponds to Kafka's 'security.protocol' property.It is ignored unless one of the SASL options of the 'Security Protocol' are selected.
SSL Context Service Controller Service API: SSLContextService Implementation: StandardSSLContextService Specifies the SSL Context Service to use for communicating with Kafka.
Topic Name(s) The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated. Supports Expression Language: true
Group ID A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.
Offset Reset latest * earliest

* latest

* none

Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.
Key Attribute Encoding utf-8 * UTF-8

* Hex

FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.
Message Demarcator Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received will result in a single FlowFile which time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS
Supports Expression Language: true
Max Uncommitted Time 1 secs Specifies the maximum amount of time allowed to pass before offsets must be committed.
This value impacts how often offsets will be committed. Committing offsets less often increases throughput but also increases the window of potential data duplication in the event of a rebalance or JVM restart between commits. This value is also related to maximum poll records and the use of a message demarcator. When using a message demarcator we can have far more uncommitted messages than when we're not as there is much less for us to keep track of in memory.

Relationships:

Name Description
success FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.

Reads Attributes:

None specified.

Writes Attributes:

Name Description
kafka.count The number of messages written if more than one
kafka.key The key of message if present and if single message. How the key is encoded depends on the value of the 'Key Attribute Encoding' property.
kafka.offset The offset of the message in the partition of the topic.
kafka.partition The partition of the topic the message or message bundle is from
kafka.topic The topic the message or message bundle is from

State management:

This component does not store state.

Summary:

This Processor polls Apache Kafka for data using KafkaConsumer API available with Kafka 0.10.x. When a message is received from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value of the Kafka message.