Consumes messages from Apache Kafka built against the Kafka 0.9.x Consumer API. The complementary NiFi processor for sending messages is PublishKafka.
Kafka, Get, Ingest, Ingress, Topic, PubSub, Consume, 0.9.x
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the Expression Language Guide.
|Name||Default Value||Allowable Values||Description|
|Kafka Brokers||localhost:9092||A comma-separated list of known Kafka Brokers in the format host:port Supports Expression Language: true|
|Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.|
|Kerberos Service Name||The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. Corresponds to Kafka's 'security.protocol' property.It is ignored unless one of the SASL options of the 'Security Protocol' are selected.|
|SSL Context Service||Controller Service API: SSLContextService Implementation: StandardSSLContextService||Specifies the SSL Context Service to use for communicating with Kafka.|
|Topic Name(s)||The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated. Supports Expression Language: true|
|Group ID||A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.|
|Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.|
|Key Attribute Encoding||utf-8||
|FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.|
Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received will result in a single FlowFile which time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS
Supports Expression Language: true
|Max Uncommitted Time||1 secs||
Specifies the maximum amount of time allowed to pass before offsets must be committed.
This value impacts how often offsets will be committed. Committing offsets less often increases throughput but also increases the window of potential data duplication in the event of a rebalance or JVM restart between commits. This value is also related to maximum poll records and the use of a message demarcator. When using a message demarcator we can have far more uncommitted messages than when we're not as there is much less for us to keep track of in memory.
|success||FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.|
|kafka.count||The number of messages written if more than one|
|kafka.key||The key of message if present and if single message. How the key is encoded depends on the value of the 'Key Attribute Encoding' property.|
|kafka.offset||The offset of the message in the partition of the topic.|
|kafka.partition||The partition of the topic the message or message bundle is from|
|kafka.topic||The topic the message or message bundle is from|
This component does not store state.
This Processor polls Apache Kafka for data using KafkaConsumer API available with Kafka 0.10.x. When a message is received from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value of the Kafka message.