Selected Processors

This section contains some selected Processors Processorswhich have special properties and are therefore described in more detail.

JSONataTransformJSON

The JSONataTransformJSON Processor is used to transform JSON using a JSONata script.

Queries or transformations can be executed on incoming content in JSON.

The Processor has the following extra functions in IGUASU:

  • $nfUuid() generates a UUID of type 4 (pseudo-random number generator). The UUID is generated using a cryptographically strong pseudo-random number generator.

  • Attributes from the input FlowFile can be read/processed (in addition to the input content): $nfGetAttribute(<name>)

  • Results can be written to attributes in addition to or instead of the output content: $nfSetAttribute(<name>,<value>

  • The NiFi Expression Language can be used: $nfEl(<expression>)

  • A Lookup Service can be used if this has been defined on the Processor: $nfLookup(<key>)

The JSONataTransformJSON Processor has a specific editor that allows easy editing of the entire script.

You can quickly try out your transformation by using the "Test/run" button

More general information about JSONata:

How this Processor implements JSONata transformations is slightly different from try.jsonata.org!

The advanced functions are now explained in detail.

Simple transformation

Input message (also for the other examples; taken from jsonata.org ):

{
    "FirstName": "Fred",
    "Surname": "Smith",
    "Age": 28,
    "Address": {
        "Street": "Hursley Park",
        "City": "Winchester",
        "Postcode": "SO21 2JN"
    }
}

Transformation of some of this data into another form of address:

{
    "name": FirstName & " " & Surname,
    "mobile": Phone[type = "mobile"].number,
    "address": Address.City
}

Result:

{
    "name": "Fred Smith",
    "mobile": "077 7700 1234",
    "address": "Winchester"
}

Write result to attributes

If you want to put the same results in attributes instead of in the output content, you can use the following function:

  • nfSetAttribute(<name>,<value>)

In addition, you can deactivate the Processor so that the result of the script is written to the output:

Write Output

false

The content is therefore left untouched. In this case, this makes sense, as only the attributes are to be created.

The script now looks like this:

$nfSetAttribute("name", FirstName & " " & Surname) &
$nfSetAttribute("mobile", Phone[type = "mobile"].number) &
$nfSetAttribute("city", Address.City)

The attributes then appear in the result:

name

Fred Smith

mobile

077 7700 1234

city

Winchester

There is also a property on the Processor to write the entire result of the transformation to an attribute:

Write to Attribute

<name of attribute>

Read Attributes

In the first case, for example, if you want to access the attribute filename to set it as an ID in the result, the script looks like this:

{
    "id": $nfGetAttribute("filename"),
    "name": FirstName & " " & Surname,
    "mobile": Phone[type = "mobile"].number,
    "address": Address.City
}

Using the NiFi Expression Language

If you want to use the NiFi Expression Language within a JSONata, this can be done using the corresponding function nfEl(<expression>).

In the following example, the NiFi Expression Language is used to check with a regular expression whether the name is correct (i.e. only contains corresponding characters).

{
    "name": FirstName & " " & Surname,
    "isValidName": $nfEl("${literal('" & FirstName & " " & Surname & "'):matches('^[\\p{L} \\p{Nd}_]+$')}"),
    "mobile": Phone[type = "mobile"].number,
    "address": Address.City
}

In addition to the expression, the function can include any number of name/value pairs. These are provided to the Expression Language for execution as temporary attributes. This means that, unlike $nfSetAttribute(<name>,<value>), they are not set beyond the execution. This can be used, for example, to provide values from the input for the $nfEl() execution as attributes.

The following could therefore also be written instead of the literal in the last example:

{
  ...
  "isValidName": $nfEl("${name:matches('^[\\p{L} \\p{Nd}_]+$')}", "name", FirstName & " " & Surname )
  ...
}

TransformXml

The TransformXml Processor is used to transform XML using an XSLT script.

The Processor has the following special features in IGUASU:

  • The latest version of the Saxon XSLT Processor with XSLT 3.0/XPath 3.1 is supported

  • The licensed Saxon EE incl. its extended features is included. its extended features is included

  • the XSLT script can be saved directly in a property (in addition to the variants of the external file or the lookup service) - this facilitates use and deployment

  • the direct processing of JSON by fn:json-to-xml() or fn:xml-to-json() is facilitated by the possibility of embedding the incoming JSON in an XML root tag

  • result documents (xsl:result-document) can be used to:

    • Create relations/outputs of the module

    • Create attributes in the success/failure output(for this, the name of the href must start with a: )

  • Use of the NiFi Expression Language in XPath expressions

    • For this, the namespace xmlns:nf="http://nifi.org" is declared

    • the method to be called is called el() - e.g. <xsl:value-of select="nf:el('${UUID()}')"/>

The TransformXml Processor has a specific editor that allows easy editing of the entire script.

The functions are explained in detail below.

Using result documents

Prerequisite:

Support result documents

true

In XSLT this looks like this:

<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="3.0">
  <xsl:output method="xml" name="xml" indent="yes"/>
  <xsl:output method="text" name="text"/>
  <xsl:template match="/">
    <xsl:result-document href="relationOne" format="xml">
      <resultOne><xsl:copy-of select="/"/></resultOne>
    </xsl:result-document>
    <xsl:result-document href="relationTwo" format="text">
      number of nodes: <xsl:value-of select="count(//*)"/>
    </xsl:result-document>
    <xsl:result-document href="a:attributeOne" format="text">something</xsl:result-document>
    </xsl:template>
</xsl:stylesheet>

The results of the result-documents of relationOne and relationTwo are written to the corresponding relations (outputs) of the Processor. These become available by creating the result-document tags in the script and then saving the script.

The result of result-document from a:attributeOne is written as an attribute in the success/failure relation due to the prefix a:.

NiFi Expression Language

The NiFi Expression Language can be used both when passing Parameters via Dynamic Properties and within XPath expressions.

EL in Parameters

By adding any Dynamic Property (via the button), the content of this property is passed to the XSLT script as a Parameter (xsl:param). The Expression Language may be used within the value. This can, for example also access the attributes of the incoming FlowFile:

testParam

the filename is ${filename}

This can then be used in the XSLT:

<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="3.0">
  <xsl:output method="text"/>
  <xsl:param name="testParam"/>
  <xsl:template match="/">
    <xsl:value-of select="$testParam"/>
  </xsl:template>
</xsl:stylesheet>

As the filename in NiFi is typically a UUID, the result is

the filename is 8ec0e87a-56dc-425f-b4c5-1de7f515ddea

EL in XPath expressions*

In order to use the NiFi Expression Language within XPath, this must first be switched on using the corresponding property:

Allow NiFi EL in XPath

true

The namespace must still be set in the XSLT script (xmlns:nf="http://nifi.org"). Then the function (nf:el()) can be called wherever XPath expressions are permitted:

<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="3.0"
xmlns:nf="http://nifi.org">
  <xsl:output method="text"/>
  <xsl:template match="/">
    <xsl:value-of select="nf:el('${UUID()}')"/>
  </xsl:template>
</xsl:stylesheet>

The result is:

2560fc8c-3581-4732-8862-6bb191eb0dcc

JSON processing

To be able to read JSON directly, the corresponding property must be set:

Surround input with <xml> tag

true

This turns the incoming JSON into an XML, to which the XPath 3.0 function can be applied:

Input JSON:

{
  "name": "Harry",
  "age": 23,
  "address": {
    "city": "London"
  }
}

XSLT script:

<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
                xmlns:fn="http://www.w3.org/2005/xpath-functions"
                exclude-result-prefixes="fn" version="3.0">
  <xsl:output indent="yes"/>
  <xsl:template match="/">
    <xsl:copy-of select="fn:json-to-xml(.)"/>
  </xsl:template>
</xsl:stylesheet>

Result:

<map xmlns="http://www.w3.org/2005/xpath-functions">
   <string key="name">Harry</string>
   <number key="age">23</number>
   <map key="address">
      <string key="city">London</string>
   </map>
</map>

To turn such an XML structure back into JSON, you can use fn:xml-to-json().

ListenBPCFlowStarter

The ListenBPCFlowStarter Processor enables the seamless linking of IGUASU with a Virtimo Business Process Center instance. Here, the BPC Services in IGUASU mentioned at the beginning are used to establish a connection based on the configurations. The ListenBPCFlowStarter then acts as the listener and starting point of a flow to which the data and input of the BPC user is transferred.

The Processor has the following special features in IGUASU:

  • The selected BPC Listener Base Path is displayed as an ID in the BPC under the IGUASU settings.

  • To better distinguish the stored ListenBPCFlowStarter Processors, the Flow Starter Name and Flow Starter Desc. are also displayed in the BPC.

  • By using different HybridRESTServerController Services, the ListenBPCFlowStarter Processors can be grouped into different components in the BPC.

Further information on connecting IGAUSU and BPC can be found in the BPC Connection Tutorial.

PutBPCProcessLog

The PutBPCProcessLog processor enables the creation of BPC Process Logs, which are transmitted from the IGUASU Processor to the desired BPC instance. For this purpose, a BPC Controller, which contains the BPC URL and the created API key, and the desired BPC Logger must be selected. It is also possible to change the Input Type and thereby determine whether the content of the FlowFile or the file in the BPC Entries JSON property should be logged.

This is the format of the expected LOG data:

{
  "entries": [
    {
      "parent": {
        "process-id": "448f1867-b4ef-4fb8-9db6-cf0f26acc384",
        ...
      },
      "childs": [
        {
          "process-id": "448f1867-b4ef-4fb8-9db6-cf0f26acc384",
          "child-id": "a18b56f8-5312-3c81-779d-4c08bd4ee29f",
          ...
        }
      ]
    }
  ]
}

The Processor has the following special features in IGUASU:

  • The loggers that were previously created in the BPC are stored in the selection options of the Choose BPC Logger property. If the BPC instance cannot be reached, the ID of the logger can also be specified.

Further information on connecting IGAUSU and BPC can be found in the BPC connection tutorial and in the BPC documentation.

PutBPCAuditLog

The PutBPCAuditLog Processor is used to write data to the BPC audit log. The BPC audit level, audit originator and action can be selected for configuration. The HybridRESTClientController Service is used to link the IGUASU flow with a BPC instance, in which the BPC URL and the generated BPC API key are stored.

The Processor has the following special features in IGUASU:

Further information on connecting IGAUSU and BPC can be found in the BPC Connection Tutorial and in the BPC documentation.

Metro Processors

Metro Processors can be used to create intermediate results of the processed FlowFiles that are to be used again later in a data flow. A distinction is made here between the GetMetro, PutMetro, MergeMetro and ExitMetro Processors, which are described below.

An application example of the Processors described can also be found in the section How-Tos under Metro.

PutMetro

FlowFiles can be cached with the PutMetro Processor.
A MetroLineController Service is required to configure the Processor, via which the Metro Processors communicate. PutMetro Processors are also displayed differently to other Processors and therefore provide a better overview of the caching process.

GetMetro

The stored FlowFiles can be retrieved later in the data flow by a GetMetro-Processor. Here it is important to select the same Metro-Controller that was used when the data was stored in the PutMetro-Processor.
In addition, the correlation attribute correlation must be stored, which can be accessed via a dynamic property this can be created using a dynamic property. An individual ID can be used as a value here, which can be used to distinguish FlowFiles within the Metro connection.

The following is an example of the intermediate storage and later retrieval of the data:

GetMetro

If the cached FlowFiles have already been retrieved by a GetMetro Processor, they are no longer available. This can result in errors during further access attempts.

ExitMetro

If several PutMetro Processors are used for caching, ExitMetro Processors can also be used to obtain all FlowFiles available in the metro. No correlation attributes are required here, as the query does not refer to individual FlowFiles, but to all of t hem.

_One use case f_or the ExitMetro-Processor is when FlowFiles that are forwarded by many different Processors have to be handled in a common process (e.g. for error handling). in error handling). The simplest solution would be to route the matching relations (e.g. error relations) of all Processors to a single Processor, which takes over and initiates the higher-level process.

However, especially in complex flows, this can lead to too many long processes being drawn out and the flow overflowing. To work around this problem, use several PutMetro Processors and an ExitMetro Processor, for example, which transfers the FlowFiles to the Processor, which in turn handles the error handling.

ExitMetro
Illustration 1. The dashed lines in the diagram illustrate how FlowFiles are transferred from the PutMetros to the ExitMetro - but are not visible in the diagram.

MergeMetro

The MergeMetro Processor can be used to merge a FlowFile with one or more FlowFiles that have been temporarily stored by PutMetro Processors. The FlowFiles are matched using an attribute defined in the "Correlation Attribute Name" property.

MergeMetro

To merge XMLs, it is easiest if the XML content does not contain an XML declaration. The desired declaration for the merged XML can be added via the "Header" property.

To merge JSON objects in a JSON array, you can set the "Header", "Footer" and "Demarcator" properties to [, ] and , respectively.

Merge-Processors

Different Processors are available to merge independent or previously separated FlowFiles.

MergeContent-Processor example

Depending on the Processor, different strategies and formats are offered, which can be customized according to individual requirements. This section provides an overview of some Processors that can be used to merge FlowFiles

MergeContent and MergeRecord

The two Processors MergeContent and MergeRecord have many setting options that can be used to merge FlowFiles. Many of the options are available in both Processors, although there are small differences. For example, a Reader and a Writer can also be defined for record-oriented processing by the MergeRecord Processor, which means that a conversion could also take place during merging. The configuration options and how the options work are described below:

  • Merge Strategy
    The Merge Strategy can be used to define the criteria according to which the FlowFiles are to be combined. Two different procedures are available that can be selected for this purpose.

    • Bin-Packing Algorithm
      The Bin-Packing Algorithm is the strategy that is initially selected by default. FlowFiles are collected in individual containers (bins) until the defined threshold values are reached. The Parameter Minimum Number of Entries can be used to define how many FlowFiles must be present for them to be combined into one FlowFile. If the configuration Maximum Group Size is not defined, the size of the bins generated is not restricted and each time the Minimum Number of Entries threshold is reached, all FlowFiles in the Processor’s queue are integrated into one FlowFile.

      In addition to the quantitative definition of the desired size of the bins, it is also possible to perform the merging of FlowFiles depending on the time. With the option Max Bin Age, a positive integer value can be defined as a duration or a time unit in seconds, minutes or hours, according to which the FlowFiles are combined.

      In addition, a Correlation Attribute Name can be defined to group FlowFiles in the queue according to each other. This makes it possible to exclude independent FlowFiles from the combination and to create thematically related FlowFiles by combining them. However, it should be noted that only one correlation attribute can be defined. All FlowFiles that do not have the defined attribute are not processed and therefore remain in the queue.

    • Defragment
      The second strategy offered for combining individual FlowFiles is the defragment strategy, in which specific attributes are used for merging. If the individual FlowFiles were previously contained in just one FlowFile and were separated during the flow by a split Processor, for example, there are individual attributes that are used with this strategy. The attributes generated by splitting fragment.identifier, fragment.count and fragment.index are used to merge the associated FlowFiles again. This strategy can be used well together with _ Split_ Processors (e.g. SplitJson), which then add the required attributes automatically.

  • Merge Format
    The Merge Format setting can be used to specify the format in which the individual FlowFiles are to be merged. By default, this option is set to Binary Concatenation, which combines different FlowFiles into a single one. Optionally, the data can also be merged in ZIP, TAR or other formats.

  • Attribute Strategy
    The Processors also offer the option of selecting a strategy for the existing attributes. Two options are available here, which are described below:

    • Keep All Unique Attributes
      With this strategy, all unique attributes are retained in the result FlowFile. If several individual FlowFiles share an attribute that also has the same value before processing, the attribute is also adopted if not all FlowFiles have the attribute.

      If the FlowFiles have different values for a common attribute, this attribute is not adopted.
    • Keep Only Common Attributes
      Attributes that are to be kept must be present in all individual FlowFiles that are combined. If the attribute is missing in one of the FlowFiles, the attribute is not integrated in the result.