Description:
Provides a mechanism for reading unstructured text data, such as log files, and structuring the data so that it can be processed. The service is configured using Grok patterns. The service reads from a stream of data and splits each message that it finds into a separate Record, each containing the fields that are configured. If a line in the input does not match the expected message pattern, the line of text is either considered to be part of the previous message or is skipped, depending on the configuration, with the exception of stack traces. A stack trace that is found at the end of a log message is considered to be part of the previous message but is added to the ‘stackTrace’ field of the Record. If a record has no stack trace, it will have a NULL value for the stackTrace field (assuming that the schema does in fact include a stackTrace field of type String).
Tags:
grok, logs, logfiles, parse, unstructured, text, record, reader, regex, pattern, logstash
Properties:
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the Expression Language Guide.
Name |
Default Value |
Allowable Values |
Description |
Schema Access Strategy | string-fields-from-grok-expression |
* Use String Fields From Grok Expression * Use 'Schema Name' Property * Use 'Schema Text' Property * HWX Schema Reference Attributes * HWX Content-Encoded Schema Reference |
Specifies how to obtain the schema that is to be used for interpreting the data. |
Schema Name | ${schema.name} |
Specifies the name of the schema to lookup in the Schema Registry property Supports Expression Language: true |
|
Schema Text | ${avro.schema} |
The text of an Avro-formatted Schema Supports Expression Language: true |
|
Grok Pattern File |
Path to a file that contains Grok Patterns to use for parsing logs. If not specified, a built-in default Pattern file will be used. If specified, all patterns in the given pattern file will override the default patterns. See the Controller Service's Additional Details for a list of pre-defined patterns. Supports Expression Language: true |
||
Grok Expression |
Specifies the format of a log line in Grok format. This allows the Record Reader to understand how to parse each log line. If a line in the log file does not match this pattern, the line will be assumed to belong to the previous log message. |
||
No Match Behavior | append-to-previous-message |
* Append to Previous Message * Skip Line |
If a line of text is encountered and it does not match the given Grok Expression, and it is not part of a stack trace, this property specifies how the text should be processed. |
State management:
This component does not store state.
Restricted:
This component is not restricted.
Summary:
The GrokReader Controller Service provides a means for parsing and structuring input that is made up of unstructured text, such as log files. Grok allows users to add a naming construct to Regular Expressions such that they can be composed in order to create expressions that are easier to manage and work with. This Controller Service consists of one Required Property and a few Optional Properties. The is named Grok Pattern File property specifies the filename of a file that contains Grok Patterns that can be used for parsing log data. If not specified, a default patterns file will be used. Its contents are provided below. There are also properties for specifying the schema to use when parsing data. The schema is not required. However, when data is parsed a Record is created that contains all of the fields present in the Grok Expression (explained below), and all fields are of type String. If a schema is chosen, the field can be declared to be a different, compatible type, such as number. Additionally, if the schema does not contain one of the fields in the parsed data, that field will be ignored. This can be used to filter out fields that are not of interest.
The Required Property is named Grok Expression and specifies how to parse each incoming record. This is done by providing a Grok Expression such as: %{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} [%{DATA:thread}] %{DATA:class} %{GREEDYDATA:message}. This Expression will parse Apache NiFi log messages. This is accomplished by specifying that a line begins with the TIMESTAMP_ISO8601 pattern (which is a Regular Expression defined in the default Grok Patterns File). The value that matches this pattern is then given the name timestamp. As a result, the value that matches this pattern will be assigned to a field named timestamp in the Record that produced by this Controller Service.
If a line is encountered in the FlowFile that does not match the configured Grok Expression, it is assumed that the line is part of the previous message. If the line is the start of a stack trace, then the entire stack trace is read in and assigned to a field named STACK_TRACE. Otherwise, the line is appended to the last field defined in the Grok Expression. This is done because typically the last field is a ‘message’ type of field, which can consist of new-lines.
Schemas and Type Coercion
When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in the schema, that field is omitted from the Record. If the field is found in the schema, the data type of the received data is compared against the data type specified in the schema. If the types match, the value of that field is used as-is. If the schema indicates that the field should be of a different type, then the Controller Service will attempt to coerce the data into the type specified by the schema. If the field cannot be coerced into the specified type, an Exception will be thrown.
The following rules apply when attempting to coerce a field value from one data type to another:
- Any data type can be coerced into a String type.
- Any numeric data type (Byte, Short, Int, Long, Float, Double) can be coerced into any other numeric data type.
- Any numeric value can be coerced into a Date, Time, or Timestamp type, by assuming that the Long value is the number of milliseconds since epoch Midnight GMT, January 1, 1970).
- A String value can be coerced into a Date, Time, or Timestamp type, if its format matches the configured "Date Format," "Time Format," or "Timestamp Format."
- A String value can be coerced into a numeric value if the value is of the appropriate type. For example, the String value 8 can be coerced into any numeric type. However, the String value 8.2 can be coerced into a Double or Float type but not an Integer.
- A String value of "true" or "false" (regardless of case) can be coerced into a Boolean value.
- A String value that is not empty can be coerced into a Char type. If the String contains more than 1 character, the first character is used and the rest of the characters are ignored.
- Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.
- Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).
- Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding property (Date Format, Time Format, Timestamp Format property).
If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception will be thrown.
Examples
As an example, consider that this Controller Service is configured with the following properties:
Property Name | Property Value |
Grok Expression | %{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \[%{DATA:thread}\] %{DATA:class} %{GREEDYDATA:message} |
Additionally, let’s consider a FlowFile whose contents consists of the following:
2016-08-04 13:26:32,473 INFO [Leader Election Notification Thread-1] o.a.n.c.l.e.CuratorLeaderElectionManager org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager$ElectionListener@1fa27ea5 has been interrupted; no longer leader for role 'Cluster Coordinator'
2016-08-04 13:26:32,474 ERROR [Leader Election Notification Thread-2] o.apache.nifi.controller.FlowController One
Two
Three
org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces
at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185)
at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185)
at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185)
at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185)
at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185)
at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185)
at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
Caused by: org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces
at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185)
at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185)
at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185)
at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185)
at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185)
... 12 common frames omitted
2016-08-04 13:26:35,475 WARN [Curator-Framework-0] org.apache.curator.ConnectionState Connection attempt unsuccessful after 3008 (greater than max timeout of 3000). Resetting connection and trying again with a new connection.
In this case, the result will be that this FlowFile consists of 3 different records. The first record will contain the following values:
Field Name | Field Value |
timestamp | 2016-08-04 13:26:32,473 |
level | INFO |
thread | Leader Election Notification Thread-1 |
class | o.a.n.c.l.e.CuratorLeaderElectionManager |
message | org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager$ElectionListener@1fa27ea5 has been interrupted; no longer leader for role 'Cluster Coordinator' |
STACK_TRACE | null |
The second record will contain the following values:
Field Name | Field Value |
timestamp | 2016-08-04 13:26:32,474 |
level | ERROR |
thread | Leader Election Notification Thread-2 |
class | o.apache.nifi.controller.FlowController |
message |
One Two Three |
STACK_TRACE | org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_45] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_45] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_45] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_45] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] Caused by: org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) ... 12 common frames omitted |
The third record will contain the following values:
Field Name | Field Value |
timestamp | 2016-08-04 13:26:35,475 |
level | WARN |
thread | Curator-Framework-0 |
class | org.apache.curator.ConnectionState |
message | Connection attempt unsuccessful after 3008 (greater than max timeout of 3000). Resetting connection and trying again with a new connection. |
STACK_TRACE | null |
Default Patterns
The following patterns are available in the default Grok Pattern File:
# Log Levels
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)|FINE|FINER|FINEST|CONFIG
# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG (?:[\w._/%-]+)
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US_MONTH_DAY_YEAR %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_US_YEAR_MONTH_DAY %{YEAR}[/-]%{MONTHNUM}[/-]%{MONTHDAY}
DATE_US %{DATE_US_MONTH_DAY_YEAR}|%{DATE_US_YEAR_MONTH_DAY}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[PMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}
POSINT \b(?:[1-9][0-9]*)\b
NONNEGINT \b(?:[0-9]+)\b
WORD \b\w+\b
NOTSPACE \S+
SPACE \s*
DATA .*?
GREEDYDATA .*
QUOTEDSTRING (?>(?"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?/(?>[\w_%!$@:.,-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
# Shortcuts
QS %{QUOTEDSTRING}
# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}