How-Tos
This document presents various problems with their solutions. There is a compact description, as well as the downloadable IGUASU Process Group.
The JSON downloaded in this way can then be uploaded to create a new Process Group. You can view the example in the product, extend it, etc.
The Processor GenerateFlowFile
is often used in Process Groups, as many Processors require FlowFiles to be submitted for execution.
This is only used to start a strand in the test - typically via RUN ONCE.
Sometimes it can also be useful to set it to the RUNNING state for some time using the Start command. However, depending on the setting, this can generate a large number of FlowFiles and is then typically explained in the text.
After import, these Processors are disabled
so that they are not started by mistake.
The respective Processor must then be set to enabled
before starting (usually RUN ONCE).
The Funnels are often connected for the possible outputs of the Processors. This is intended to clarify the outputs used and make the data directly visible.
Database
Process Group als JSON herunterladen: Database.json
It is shown how simple operations on SQL databases are implemented in IGUASU.
The database to be operated on is defined in a DBPCPConnectionPool
Service.
This is available at the Process Group ("H2 file-based Database").
A simple, file-based H2 database is used for the test purposes within this How-To, as this can be used directly.
Download the latest version of the H2 driver as a 'Binary JAR'.
A corresponding authorization is required for uploading database drivers. |
You then upload this JAR in the settings under Drivers in IGUASU.
It is then available in the DBPCConnectionPool
service.
Services for reading and writing JSON can also be found in the Process Group. These are used to convert the internal records into JSON or to create the records from JSON for Processors that work on "Records".
There are various ways of working with SQL databases in IGUASU. Only a few selected variants are presented here. |
The various operations presented are assigned numbers in IGUASU via Labels:
1*.*Create ADDRESS table
Creates the ADDRESS table via a CREATE query permanently stored in the ExecuteSQL
Processor.
The field containing the SQL ("SQL select query") is a field that allows the use of the Expression Language.
The query could therefore also be constructed dynamically.
2*. remove table ADDRESS*
As before, a static SQL is used to remove the table.
3*. list tables in the database*
The Processor ListDatabaseTables
is used to list all tables in the database.
It should be noted here that the Processor does not allow any inputs.
It therefore depends on how often it runs - this is set under "Run Schedule" in the settings.
By default, this parameter is set to "0", so it runs constantly.
If it is left at "0", the load on the system is likely to be high when it is in the RUNNING state.
4*. inserting data into ADDRESS*
GenerateFlowFile
contains a fixed JSON with an address.
The names of the fields correspond to the names of the fields in the database - this is used for mapping.
The PutDatabaseRecord
Processor is configured with a "Record Reader", which allows a JSON to be read as a record.
The configured table is ADDRESS.
Many addresses could also be created in this way, as a JSON array is read.
5*. reading data from ADDRESS*
All data from the ADDRESS table is read via a fixed SQL in ExecuteSQLRecord
.
The difference between ExecuteSQLRecord
and ExecuteSQL
used in 1. and 2. is that the result is output directly via a RecordWriter.
A Writer is configured here, which passes it on as JSON.
6*database access via Groovy*
This shows the possibility of accessing databases directly from Groovy scripts. This should be done with caution and only if it is not possible with the other means. This is more prone to errors.
The special feature here is that dynamic properties can be configured on the ExecuteGroovyScript
Processor, which fill out special functions with defined prefixes.
If a prefix SQL.* is used, the accessible DBPCPConnectionPool
Services are available for selection there.
This Service can then be used just as easily in the script.
The description of these and other properties can be found in the "Additional Details…", which are linked to the description of the Processors in the Properties view.
Enrichment
Download Process Group as JSON: Enrichment.json
A typical challenge when creating flows is that data is to be further processed and enriched or converted on the one hand, but is also required later in its original form.
IGUASU has the two Processors ForkEnrichment
and JoinEnrichment
for this purpose.
They form a pair, in that ForkEnrichment
has the outputs original and enrichment.
These then represent the two branches, which are merged again in JoinEnrichment
.
JoinEnrichment
uses records and corresponding readers/writers for this purpose.
For example, it can be used to merge JSON or XML (other record formats include Avro, CEF, CSV, Parquet).
There are various options for merging - in the simplest case, the documents of the two branches are simply merged completely into a new document.
In the download example, a simple JSON with the content "data": "value one"
is passed in.
The length of the value of data
is then counted in the enrichment branch.
The result is another JSON with the content "count": 9
.
These two JSONs are then transferred from JoinEnrichment
into a document so that both the data and the calculated length can be accessed.
Instead of simply determining the length, there could also be a call to a Web Service, a database query or similar to collect further data that you want to process together with the original.
You can also use MergeContent for the same result.
However, there is much more to configure and consider here - for example, specific attributes that must be set via UpdateAttribute or similar.
However, there may be more complex scenarios in which MergeContent is also suitable for data enrichment.
|
Metro
Download Process Group as JSON: Metro.json
The Process Group contains three examples that illustrate how the Metro Processors work. While the individual Metro Processors are described in detail in the section Metro Processors section, this How-To explains how to use them.
The three examples that can be imported using the JSON file are described below.
1*. Caching and retrieving FlowFiles*
In the first example, FlowFiles are generated, cached and retrieved later.
However, in order for the generated data to be cached, a FlowFile attribute must first be defined for the correlation, which can be used to retrieve the data later.
Once the attribute has been defined with the UpdateAttribute Processor, the FlowFile is cached with the PutMetro Processor and further processed in the data flow.
During further processing, the existing content is replaced with new content so that it is clear that the metro connection was successful when the cached data is retrieved.
This retrieval of the cached data takes place in the last step with the GetMetro Processor, in which the previously defined attribute is used to reload the desired data.
The connection between the PutMetro and GetMetro Processors takes place via the MetroLineController Service U1
.
2*.*Multiple retrieval of data in the Metro
In the second case, a very similar data flow to the first example can be seen, whereby here two GetMetro-Processors want to access the same data. The retrieval of data can only take place once with Metro-Processors, whereby in this example the first GetMetro-Processor successfully receives data, while the second Processor can no longer find any and thus routes to the failure relation.
3*exiting the metro line*
In addition to retrieving the cached data via the GetMetro-Processor, the ExitMetro-Processor can be used to obtain data at a central location. FlowFiles that have been cached via the same MetroLineController Service can be merged via the ExitMetro-Processor, for example to handle a specific error message. This example shows this functionality by receiving two FlowFiles via different PutFile-Processors at the same ExitMetro-Processor.