Description:

Creates AMQP Message from contents of FlowFile and sends the message to Exchange. Message is sent to Exchange will be routed based on the ‘Routing Key’.

Tags:

amqp, rabbit, put, message, send, publish

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property is considered “sensitive”, meaning that its value will be encrypted. Before entering a value in a sensitive property, ensure that the nifi.properties file has an entry for the property nifi.sensitive.props.key.

Name Default Value Allowable Values Description
Exchange Name The name of the AMQP Exchange the messages will be sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.
Routing Key The name of the Routing Key that will be used by AMQP to route messages from the exchange to a destination queue(s). Usually provided by the administrator (e.g., 'myKey')In the event when messages are sent to a default exchange this property corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set (usually by the AMQP administrator)
Host Name localhost Network address of AMQP broker (e.g., localhost)
Port 5672 Numeric value identifying Port of AMQP broker (e.g., 5671)
Virtual Host Virtual Host name which segregates AMQP system for enhanced security.
User Name guest User Name used for authentication and authorization.
Password guest Password used for authentication and authorization.
Sensitive Property: true
AMQP Version 0.9.1 * 0.9.1 AMQP Version. Currently only supports AMQP v0.9.1.

Relationships:

Name Description
failure All FlowFiles that cannot be routed to the AMQP destination are routed to this relationship
success All FlowFiles that are sent to the AMQP destination are routed to this relationship

Reads Attributes:

None specified.

Writes Attributes:

None specified.

Summary

This processor publishes the contents of the incoming FlowFile to an AMQP-based messaging system. At the time of writing this document the supported AMQP protocol version is v0.9.1.

The component is based on RabbitMQ Client API The following guide and tutorial may also help you to brush up on some of the AMQP basics.

This processor does two things. It constructs AMQP Message by extracting FlowFile contents (both body and attributes). Once message is constructed it is sent to an AMQP Exchange. AMQP Properties will be extracted from the FlowFile and converted to com.rabbitmq.client.AMQP.BasicProperties to be sent along with the message. Upon success the incoming FlowFile is transfered to success Relationship and upon failure FlowFile is penalized and transfered to the failure Relationship.

Where did my message go?

In a typical AMQP exchange model, the message that is sent to an AMQP Exchange will be routed based on the Routing Key to its final destination in the Queue. It’s called Binding. If due to some misconfiguration the binding between the Exchange, Routing Key and the Queue is not set up, the message will have no final destination and will return (i.e., the data will not make it to the queue). If that happens you will see a log in both app-log and bulletin stating to that effect. Fixing the binding (normally done by AMQP administrator) will resolve the issue.

AMQP Properties

Attributes extracted from the FlowFile are considered candidates for AMQP properties if their names are prefixed with amqp$ (e.g., amqp$contentType=text/xml). To enrich message with additional AMQP properties you may use UpdateAttribute processor between the source processor and PublishAMQP processor. The following is the list of available standard AMQP properties:(“amqp$contentType”, “amqp$contentEncoding”, “amqp$headers”, “amqp$deliveryMode”, “amqp$priority”, “amqp$correlationId”, “amqp$replyTo”, “amqp$expiration”, “amqp$messageId”, “amqp$timestamp”, “amqp$type”, “amqp$userId”, “amqp$appId”, “amqp$clusterId”)

Configuration Details

At the time of writing this document it only defines the essential configuration properties which are suitable for most cases. Other properties will be defined later as this component progresses. Configuring PublishAMQP:

  1. Exchange Name - [OPTIONAL] the name of AMQP exchange the messages will be sent to. Usually provided by the administrator (e.g., ‘amq.direct’) It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.

  2. Routing Key - [REQUIRED] the name of the routing key that will be used by AMQP to route messages from the exchange to destination queue(s). Usually provided by administrator (e.g., ‘myKey’) In the event when messages are sent to a default exchange this property corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set (usually by the AMQP administrator).

  3. Host Name - [REQUIRED] the name of the host where AMQP broker is running. Usually provided by administrator (e.g., ‘myhost.com’). Defaults to ‘localhost’.

  4. Port - [REQUIRED] the port number where AMQP broker is running. Usually provided by the administrator (e.g., ‘2453’). Defaults to ‘5672’.

  5. User Name - [REQUIRED] user name to connect to AMQP broker. Usually provided by the administrator (e.g., ‘me’). Defaults to ‘guest’.

  6. Password - [REQUIRED] password to use with user name to connect to AMQP broker. Usually provided by the administrator. Defaults to ‘guest’.

  7. Virtual Host - [OPTIONAL] Virtual Host name which segregates AMQP system for enhanced security.