Basic Tutorial

Introduction

In this tutorial we will build a simple IGUASU flow. This will

  • retrieve a file over the Internet,

  • convert data between different formats,

  • offer a REST Service,

  • and then call it.

In addition, we will make use of the analysis tools available in IGUASU.

The finished flow:

Final result
Illustration 1. Final result

An example of the completed tutorial can be downloaded using the following link: 30min-tutorial.json.

We will now compile the flow available there step by step.

Prerequisites

A basic understanding of data formats such as CSV, XML and JSON is helpful for this tutorial. Knowledge of HTTP methods (GET/POST), the concept of REST APIs and simple SQL queries is also an advantage. IGUASU-specific terms and concepts will be explained throughout the tutorial.

We often use the English terms for user interface elements (e.g. 'Properties', 'Settings', 'Run Schedule') in this tutorial, as this is how they are displayed in IGUASU. It makes it easier to follow the steps in the software.

Part 1: Retrieving, filtering and saving stations

The two most important components in IGUASU are the FlowFiles and the Processors.
A FlowFile is an object that flows along relations from one Processor to the next. It consists of Content (any data) and Attributes (metadata).
Attributes are key-value pairs that contain information such as the name of a file or the URL of a request. FlowFiles are created, modified, forwarded to other Processors or dropped by Processors.

In this part of the tutorial, we will first use an InvokeHTTP Processor to retrieve a CSV file with data on train stations in Germany from the Internet. This data is stored in the FlowFile content.

In the second step, the FlowFile is forwarded to the QueryRecord Processor. This filters out the entries for stations marked as new from the content (i.e. the station data as CSV) and at the same time converts the entries from CSV to XML. In doing so, the Processor makes use of a CSVReader and an XMLRecordSetWriter Controller Service.

With the next Processor, the UpdateAttribute Processor, we set a FlowFile attribute to determine the file name for an export.

Finally, we use the CompressContent Processor to compress the FlowFile content, i.e. the XML. We will look at the compressed file at the end - in a real scenario, however, there could be a Processor here that forwards the file to an external system.

Flow for part 1
Illustration 2. Flow for part 1

First we create a new Process Group in which we then implement our flow using Processors.

Creating a Process Group
Illustration 3. Creating a Process Group

Double-clicking on the created element takes us to the created Process Group.

1.1 InvokeHTTP Processor (load stations)

Creating an InvokeHTTP Processor
Illustration 4. Create InvokeHTTP Processor

First we will create an InvokeHTTP Processor.
The InvokeHTTP Processor can execute any HTTP requests. We download the CSV file with a GET request to the corresponding URL. The file is translated internally into a FlowFile and can be forwarded to other Processors.

Processor configuration

The Processors are configured in the Properties and Settings tabs of the module editor.

Processor Properties and Processor Settings

IGUASU validates the Processor configuration and shows in the module editor which properties/settings still need to be completed. Processors that have not yet been fully configured are marked in the Diagram with a yellow warning triangle (warning orange).

Processor warnings

Errors for a field are displayed directly below the field, as can be seen in the screenshot above for the "HTTP URL" field. In the Processor Properties, we set the HTTP URL to the URL of the CSV:
https://docs.virtimo.net/de/iguasu-docs/user-guide/_attachments/tutorial-files/D_Bahnhof_2020_alle.csv

The request type is already set to GET by default, so we do not need to change anything else in the properties.
In the Processor Settings, we set Run Schedule to "60 sec" so that we check for updates every minute.
"1 day" would also suffice for the task - the Scheduler always restarts when the Processor is set to the RUNNING state via the "Start" command.

In the event of a successful request (2xx status code), the Relation Response is triggered and the downloaded file is forwarded along the path as a FlowFile. By default, all relations are automatically terminated.

If a relation is used to create a link to another element in the Diagram, this automatic termination is canceled.
In this tutorial, we can leave all unused relations terminated unchanged.

Processor Relations
Illustration 5. Configuration of relations

Our InvokeHTTP Processor is ready. It fetches the CSV file and forwards it via the response relation. The creation of the Processor that we connect to this response relation is described at 1.2 QueryRecord Processor (filter stations).

Isolated Processor execution

IGUASU offers the option of running Processors isolated from the execution in the entire flow and thus testing the configured properties.
This uses the set configuration without saving it. It can be changed as required between executions.

We now use the execution of the InvokeHTTP Processor to see whether the list of stations can be reached at the specified URL and what the returned data looks like.

The execution in the isolated state can be started by clicking the play orange button in the module editor.

After the execution, the output of the Processor is displayed in the lower half of the configuration area. You can now view the FlowFiles that are sent via the respective relation.

InvokeHTTP test execution
Illustration 6. InvokeHTTP test execution

The Processor was able to download the CSV file successfully.
We select the response relation:

  • In "Result content" we see the downloaded CSV file.

  • "Result attributes" contains the attributes of the HTTP response as well as other standard attributes.

We can see that the first line contains the headers and the data is separated by a semicolon. From the data, we can see that the "Status" column tells us the information about the new stations. The new stations are marked with "new".

We will look at an example of the test execution later in this tutorial (see Isolated process execution (continuation)).

1.2 QueryRecord Processor (filter stations)

In this section, we will create a Processor that receives the CSV file and filters it for new stations.
We know from the test execution what format the data is in and need to determine the subsequent Processor for processing.

One of the Processors we can use for our concern is the QueryRecord Processor.

It is useful to know what kind of Processors there are and how they work. You can look at the list of available Processors and search for keywords.

The QueryRecord Processor selects the content of incoming FlowFiles via SQL queries. To do this, the data must be available in a format that NiFi can interpret as records - the CSV format is one of these.

You can then filter, transform or aggregate via the SQL.

Create QueryRecord Processors
Illustration 7. Create QueryRecord Processor

The results of the queries are forwarded as FlowFiles along a self-defined relation.

First we will add the Processor to the Diagram and connect it to the Response Relation of the InvokeHTTP Processor.

Create relation between Http Processor and QueryRecord Processor
Illustration 8. Connecting the Processors

Next, we need to configure the QueryRecord Processor so that it handles the incoming data as desired.

Controller Services

Before we can fully configure the QueryRecord Processor, we need to deal with Controller Services. Controller Services are reusable components in IGUASU that provide central services for processors or other services. Examples include reading/writing certain data formats or managing database connections.

They are configured at Process Group level and can then be used by all Processors within this Process Group or in subordinate Process Groups. The advantage lies in the central administration and reusability of these configurations.

The QueryRecord Processor itself is independent of the input and output format of the data. It therefore requires two auxiliary components:

  1. A Record Reader Controller Service:
    This reads the incoming data (e.g. CSV) and prepares it for the Processor.

  2. A Record Writer Controller Service:
    This receives the processed data and writes it to the desired target format (e.g. XML).

The input format of the Record Reader and the output format of the Record Writer do not have to be identical. This means that a Processor with Record Reader and Record Writer, such as QueryRecord, is also suitable for transferring data from one format to another in addition to its actual function.

The QueryRecord Processor receives data in CSV format from the InvokeHTTP Processor, as we have seen in the test execution, for example. A CSV Reader Controller Service is suitable as a Record Reader for reading in this data.
In the following step, however, we want the filtered data to be available as XML, so we use an XMLRecordSetWriter Controller Service as the Record Writer.

Processor Properties
Illustration 9. Processor Properties

We now create these Controller Services in the Process Group settings.
In order for the Processor to have access to the respective Service, it must either be in the same or a higher Process Group. Consequently, all Processors have access to the Services that are in the Root Process Group.

To access the Process Groups configuration menu, we click on the Service-Icon. The Process Groups tree is then listed on the left-hand side.
Otherwise, the current Process Group is always displayed as the last element in the path next to the home icon icon.

Services overview
Illustration 10. Overview of Services

The Process Groups tree starts at the top with the current Process Group and ends with the "[Root]". We can add a Controller Service to the Process Group via the icon:

Adding a Controller Service
Illustration 11. Controller Service

For our example, we will add a CSVReader and XMLRecordSetWriter to the current Process Group. After adding, we have the option of starting/stopping or configuring the Services.

Add CSVReader Controller Service to the Root-Process Group
Illustration 12. CSVReader Add Controller Service to the Root Process Group

In the configuration of the CSVReader, we must specify the separator (value separator) as a semicolon; the remaining settings can remain at the default values.

In the XMLRecordSetWriter, we must specify a name for the root node and its entries (root tag & record tag), for example with "Root" and "Record". These entries are optional fields that can be shown or hidden using the star icon in the toolbar.

The remaining settings can remain unchanged.
We have therefore finished configuring the Record Reader/Writer and can now activate it so that it can be used by other Processors.

The Services are each started with the bolt icon button:

Activate CSVReader Service
Illustration 13. CSVReader service

The Controller Services are not executed periodically and do not have any relations as they are not directly involved in the flow. They are used by Processors, Reporting Tasks or other Controller Services.
A useful application for a Controller Service is, for example, a pool of database Connections that can be used by many Processors.

Now we will assign the CSVReader to the QueryRecord Processor as a Record Reader and the XMLRecordSetWriter as a Record Writer (see Processor Properties).
The input and output data types have been defined. Therefore, the task of writing the query that determines the new stations remains.

Before we write the query, we would like to take a look at another feature of IGUASU that will help us create and configure Processors. The continuation of the tutorial can be found at SQL Query.

Data Flow Analyze - Inspection Panel (Excursus)

IGUASU offers the option of analyzing the data processed by the Processors or the data flowing through the Processors. To demonstrate this in our example, we need to start the flow briefly in order to generate data.

The context menu can be used for this purpose by right-clicking on an empty area. There we can set all Processors of the current Process Group to the RUNNING state by selecting the Start command. All Processors can then also be stopped via the menu.

Start IGUASU for a short time
Illustration 14. Start IGUASU briefly

If the event table is now switched on ( button in the toolbar), selecting a Processor that has already processed data will display the history of all events created by the Processor during execution below the Diagram. Such an event describes a processing step related to FlowFiles in the Processor.

Examples of such events:

  • Fork: The FlowFile was derived from another FlowFile

  • Route: The FlowFile is forwarded along a relation (with reason)

  • Drop: The FlowFile is no longer processed

Event table
Illustration 15. Event table

After selecting an event, the history of the associated Processor executions is highlighted in the diagram. The numbers represent the execution sequence - you can see which Processors were executed in which order.
Here are exactly the two Processors we have so far. In more complex scenarios, you can also see which branches have been taken.

In addition, the Inspection Panel is opened after selecting an event. This contains detailed information about the event as well as the input and output data of the Processor’s execution.
The input data can also be used for the test execution of a Processor (see Isolated process execution (continuation)).

Select event - Inspection Panel
Illustration 16. Select event - Inspection Panel

The data of the selected event remains stored in the Inspection Panel and can be used for test executions. We select the event in the QueryRecord Processor and the CSV input is displayed in the Inspection Panel - we will use this input later (Isolated process execution (continuation)) for the test execution.

SQL Query

Now we want to filter the CSV file for new entries using an SQL command.

Two example entries from the CSV are shown below - the new stations are marked with the value "new" in the "Status" field.

Example entries from the CSV file
EVA_NR,DS100,IFOPT,NAME,Verkehr,Laenge,Breite,Betreiber_Name,Betreiber_Nr,Status
8002551,AELB,de:02000:11943,Hamburg Elbbrücken,RV,10.0245,53.5345,DB Station und Service AG,,neu
8000032,NS,de:09662:28058,Schweinfurt Hbf,RV,10.212919,50.035313,DB Station und Service AG,5742,

The first entry with the EVA_NR "8002551" is therefore a new station and the second with the number "8000032" is not.

We will filter for the new stations with the following query:

SELECT * FROM FLOWFILE WHERE Status = 'neu'

The queries are added as dynamic properties with the button. The names of the outgoing relations correspond to the names of the properties.
We create a dynamic property with the name "new_all" and the query as the value.

QueryRecord configuration
Illustration 17. QueryRecord settings

Now there is a new relation with the name " new_all", which forwards the result of the query as a FlowFile to other Processors.
In the Processor Settings, we configure the relations that are not required for automatic termination again.

At this point, the configuration of QueryRecord is complete and we can move on to the next Processor.
However, it is a good idea to check whether everything has been configured correctly and the Processor is behaving as expected.

Isolated process execution (continuation)

In the following, we will check the correctness of the QueryRecord Processor with the test execution already mentioned (see Isolated Processor execution). We will see whether the Processor processes the data as expected by executing it on a FlowFile and checking the result.

The InvokeHTTP Processor did not need to provide input data (a FlowFile) as it does not require input data to retrieve the data from a URL.

To use the data from a previous execution, we select one of the events in the table below the Diagram.
If the isolated execution is then selected, the results are displayed in the configuration area.

The table appears if you select a Processor in the Diagram and it has already been executed once in the course of the flow. The isolated or test executions do not play a role here.

Test execution with event data
Illustration 18. Test execution with event data

After execution via the play orange button, we see that the QueryRecord Processor shows the results for two relations:

  • The "original" relation contains the original input data - so here the Processor simply sends the input file on.

  • In the "new_all" relation, the stations selected by the SQL query (with status='new') are output as XML.

We have specified the record tags of the XML in the XMLRecordSetWriter. If we scroll through the Result content, we can make sure that only the new stations have been filtered.

We can also manually adjust/enter the input data for the test. To show this, we first note that the first new station in Result content has the ID "8002551".

We can check whether filtering is correct if we change the status of this station in the input data to old.
To do this, we need to switch to the Data Panel and change the status of station ID 8002551 from "new" to "old" in Input data. As the file is too large in our case, we need to download the file and edit it in an editor of our choice. Once we have changed the status, we can upload the file again.

Data Panel
Illustration 19. Data Panel

After running it again, we see that the "new_all" relation now does not contain an entry with the station ID "8002551".
In the same way, we can change the query as required and, for example, filter for all Trier stations:

SELECT * FROM FLOWFILE where NAME LIKE '%Trier%'.

You can therefore experiment with the Processors in IGUASU and test their output before executing them in the potentially complex flow.

1.3 UpdateAttribute Processor (set file name)

Our goal for part 1 is to save a compressed XML with information about all new stations. We could then transfer this compressed XML as a file to external systems via various Processors. For example, we could send the file as an attachment using the PutEmail Processor or store it on a file server using PutSFTP. In many cases, we transfer the content as a file.

We can determine the name of this file by setting the "filename" attribute, which is then read by the Processors. Even if we only download the content via the IGUASU user interface in this tutorial, the name of the download file is also set by this attribute.
We can use the UpdateAttribute Processor to create and change any attributes of the FlowFiles.

We add an UpdateAttribute Processor to the Diagram and connect it to the "new_all" relation from the QueryRecord Processor.

Previous flow
Illustration 20. Previous Flow

Creating new attributes via the UpdateAttribute Processor is done by creating a dynamic property.
The name of this dynamic property then corresponds to the name of the attribute that is to be created (or changed). The content then corresponds to the (new) value.

We want to set the content to the current date. As the date will change daily, we cannot enter the string statically, but must generate it dynamically to match the day.

To do this, we use the Expression Language, with which we can enter dynamic functions in expressions for properties.

The expression ${now():format('yyyy-MM-dd')} would result in a current date such as "2020-11-19". As we want to save the file as XML, we can append the static ".xml" to the expression and prefix it with "Stations_": Stations_${now():format('yyyy-MM-dd')}.xml.
This then results in "Stations_2020-11-19.xml".

UpdateAttribute Processor configuration
Illustration 21. UpdateAttribute Processor settings

The UpdateAttribute Processor only changes the defined attributes of the FlowFile, not the content. We will forward the resulting FlowFile with the now adjusted "filename" to the next Processor.

1.4 CompressContent Processor (Compress)

In this step, we compress the FlowFile. We use the CompressContent Processor for this. We need to add the Processor to the Diagram and only create the relation from the UpdateAttribute.

In the settings, we should:

  • check the "Update Filename" box so that the file extension of the "filename" attribute is adjusted accordingly

  • set the compression rate to 9

  • set the format to "gzip".

We forward the compressed FlowFile to the last Processor.

1.5 Testing the process

We can imagine that the compressed XML file will be forwarded to an external system or saved for backup in the next step. We simulate this step by using the IGUASU analysis tools to download the content, unpack the file and finally view the XML.
To do this, we run the FlowFiles into a Funnel element to view them later.

A Funnel is a simple element in IGUASU. It is used to collect FlowFiles from one or more incoming connections, but terminates the data flow at this point. This is useful for terminating branches of a flow or for 'catching' FlowFiles specifically for manual inspection via the user interface without another Processor processing them.

By right-clicking in the Diagram and selecting Start, we can start all Processors in the Process Group. If we have done everything correctly, we should see after a short time that a FlowFile has run through our Processors and is now waiting in front of the Funnels.

Executed flow from part 1
Illustration 22. Executed Flow from Part 1

We click on this waiting FlowFile to see the FlowFile and an overview of the queue on this relation on the right-hand side.

We can download the content of the content via the icon in the content area.

Download FlowFile content
Illustration 23. Download FlowFile content

When we download the file, we should see that the name is "Stations_[yyyy-MM-dd].xml.gz" - as previously determined by us. The file extension ".gz" indicates a file compressed with gzip. If we unzip the file and open the resulting XML in an editor, we can now view the new stations as XML.

Part 2: Implementing the REST Service

To maintain the REST Service separately, we create a new Process Group for it as a subgroup to the current one. We do this by dragging the -button from the bar at the top of the diagram area in edit mode. We can call up the new Process Group, which we call "Validation via REST", by double-clicking on the corresponding element.

Our goal in this part is to create a REST endpoint that accepts information about stations as JSON, checks the format and sends back a response accordingly.

To realize this, we first need a HandleHttpRequest Processor that listens on a specific Port and accepts the requests.
The received requests are forwarded to a ValidateJson Processor, which checks whether the received message corresponds to a defined format.
Finally, we send a response back with one of two HandleHttpResponse Processors, depending on whether or not there were errors in one of the previous Processors.

Validation via REST Process Group
Illustration 24. Validation via REST Process Group

2.1 Processing HTTP requests

First, we need a Processor that receives and forwards the HTTP requests. The HandleHttpRequest Processor is suitable for this.
In the configuration, we set the listening port to "3333". This is the port on which the Processor will listen for requests.

We also need to create a new standard HttpContextMap Controller Service, which ensures that the HTTP session between the Processors is maintained.

As known from part 1, we create a new standard HttpContextMap Controller Service, but this time within the current Process Group validation via REST.
We could also create the Controller Service in the parent Process Group "Deutsche Bahn Tutorial" or even in the Root Process Group, as the Controller Services of a Process Group are also available to all Sub-Process Groups. However, we avoid this here, as we only want to use the Controller Service in this Process Group.

When creating this Controller Service, it is advisable to prefix the name with "REST" to avoid confusion with another standard HttpContextMap Controller Services.

We fill the optional Allowed Paths property with "/validate".
As a result, the Processor only accepts requests with this path in the URL. All requests that do not go to localhost:3333/validate are automatically answered with a 404 (Not Found) code.

Because we offer a REST Service to which data should be sent for verification, the client should use the "POST" HTTP method when sending the data. We therefore uncheck the "Allow GET", "Allow PUT" and "Allow DELETE" settings so that only "Allow POST" is activated.

HandleHttpRequest Processor configuration
Illustration 25. Configuration of the HandleHttpRequest Processor

To answer the requests, we also need two additional HandleHttpResponse Processors.
The first should return a code 200 to the client as a success message if validation is successful. The other, in turn, should inform the client with a code 400 if an error occurs.

In a real scenario, it would be appropriate to answer the different error cases (no valid JSON transferred, JSON schema validation failed) differently and also provide further information about the error. For this tutorial, we will limit ourselves to the response with the code 400 for both error cases.

We create two HandleHttpResponse Processors and set our "REST StandardHttpContextMap" Controller Service in the HTTP Context Map field in both cases.

Previous flow in part 2
Illustration 26. Previous flow in part 2

We set the HTTP Status Code field to 200 and 400 respectively.

HandleHttpResponse Processor configuration (code 200)
Illustration 27. HandleHttpResponse Processor configuration (code 200)

2.2 Validate requests

In the next step, we want to check the JSON data received for the correct format. The ValidateJson Processor, which can validate JSONs using JSON schemas, is suitable for this.

A JSON schema is itself a JSON that specifies the structure and content of another JSON, i.e. defines a schema. The ValidateJson Processor checks whether our JSON meets the specifications of the JSON schema or not.

With the following JSON schema, we define a JSON object that represents an entry for a station. It has exactly three entries:

  • "NR", an integer

  • "NAME", a string

  • "CREATED", a string that must also be formatted as a date

{
  "type": "object",
  "properties": {
    "NR": {
      "type": "integer"
    },
    "NAME": {
      "type": "string"
    },
    "CREATED": {
      "type": "string",
      "format": "date"
    }
  },
  "required": ["NR", "NAME", "CREATED"]
}

This schema must be entered in the "JSON Schema" property. Otherwise, the Processor’s default settings can be retained.

The resulting configuration of the ValidateJson Processor should ultimately look like this:

ValidateJson Processor configuration
Illustration 28. Configuration of the ValidateJson Processor

By clicking on the -symbol, we can call up the options bar for the window with the JSON schema. This allows us to choose between this object view and the text view of the JSON schema, for example.

The valid entries are sent along the "valid" relation to the HandleHttpResponse Processor with the successful response code 200, while in the event of a validation error, the other HandleHttpResponse Processor sends back the response code 400.

In the end, the following flow should result:

Finished flow part 2
Illustration 29. Finished flow for part 2

Our REST Service implemented as a subordinate Process Group for checking the data is now ready and can be started.

Part 3: Sending data individually to REST Service

In this part, we take the existing data for new stations and add today’s date as a property to each entry using the UpdateRecord Processor. At the same time, we use the Processor to convert the data from XML to JSON.

The FlowFile is forwarded to a SplitRecord Processor. This packs each entry for a station into a separate FlowFile and forwards it so that the next Processor, an InvokeHTTP Processor, receives several FlowFiles with one entry (record) each instead of one FlowFile. Each FlowFile is treated independently of the others.

The task of this Processor is to pass the JSON to the REST Service created in Part 2: Implementing the REST Service, which checks the entry and returns a response depending on the result of the check.

In this tutorial, we use our own REST Service to demonstrate both sides of the communication. In real scenarios, you would usually call an external REST Service.

Finished Flow Part 3
Illustration 30. Finished Flow for Part 3

3.1 Customize QueryRecord (reduce to station number and station name)

In this part, we want to keep only the station number (EVA_NR as NR) and the station name (NAME) from the CSV data of the stations for a better overview. We can use the previously created QueryRecord Processor to remove all other information and at the same time, as before, transfer the data into an XML format.
To do this, we create a new Dynamic Property with the name "new" and set the appropriate SQL statement as the value.

SQL Query for "new" relation
SELECT EVA_NR as NR, NAME FROM FLOWFILE
where Status = 'neu'
New QueryRecord settings with two Dynamic Properties
Illustration 31. New QueryRecord settings with two Dynamic Properties

Both "new" and "new_all" are now outputs of the QueryRecord Processor. To check the function, we can do a test execution with the existing CSV data (see Isolated process execution (continuation)).

Test execution of the QueryRecord Processor
Illustration 32. Test execution of the QueryRecord Processor

3.2 UpdateRecord Processor (add date)

We want both part 1 and part 2 to be run every time the flow is executed. Therefore, we will not change part 1, but let both parts exist and run in parallel.

To do this, we connect our next new Processor, the UpdateRecord Processor, to the "new" output of the QueryRecord Processor that has just been created. This thus forwards two different FlowFiles to two different outputs, which are handled independently of each other.

Configuration of the Connection
Illustration 33. Configuration of the Connection

We now want to add the current date to each entry in addition to the number and name of the station. This is to show that the information corresponds to today’s position. We use the UpdateRecord Processor for this.

As with the QueryRecord Processor in part 1, we need to specify a Record Reader Controller Service and a Record Writer Controller Service. As we receive our data in XML format from QueryRecord, we need an XMLReader.

As in the first part, we need to add this as a Service to the Process Group and set the Expect Records as Array setting to "true" in the configuration, as the data arrives as an array in XML (see "Result content" at Test execution of the QueryRecord Processor).

XMLReader configuration
Illustration 34. XMLReader configuration

We will use a JsonRecordSetWriter Processor with default settings for the Record Writer. This converts the stations from XML to JSON format, as we ultimately want to send a JSON to the REST interface.

We then add a dynamic property using the button. We use the expression for the current date again and add this to each record. The name of the dynamic property specifies the name of the additional field in the record.
(This procedure is similar to the previous step with the UpdateAttribute Processor in the first part of the tutorial, see 1.3 UpdateAttribute Processor (set file name).)

As the value is specified via a Record Path, the name of the Dynamic Property begins with a slash ('/'), e.g. /CREATED.

UpdateRecord Processor Settings
Illustration 35. UpdateRecord Processor Settings

During execution, the stations are converted from XML to JSON and provided with the current date after the name "CREATED":

Input and output of the UpdateRecord Processor
Illustration 36. Input and output of the UpdateRecord Processor

3.3 SplitRecord Processor (split stations individually)

We now forward the FlowFile with the JSON to a SplitRecord Processor. We will use this Processor to split the list of stations into individual elements so that one call to the REST Service can be made per station.

As the data arrives in JSON format, we need a JSONTreeReader as a Record Reader, which we must again create as a Controller Service.
The default settings can be retained.

For the Record Writer, we can use the JsonRecordSetWriter that we previously created for the UpdateRecord Processor.

Settings for the Splitrecord Processor
    Record Reader: JsonTreeReader
    Record Writer: JsonRecordSetWriter
Records Per Split: 1

3.4 InvokeHTTP Processor (send stations)

Each FlowFile from the SplitRecord Processor is now forwarded to another InvokeHTTP Processor. We use this Processor to call the REST Service to check the JSON data and transfer the FlowFile content, i.e. the JSON with the entry for a station, as the body.

Settings for the InvokeHTTP Processor
Post Method: POST
Remote URL: http://localhost:3333/validate

The flow now looks as follows:

Finished flow from part 3
Illustration 37. Finished flow from part 3

3.5 Executing the finished flow

By right-clicking in the Diagram and selecting Start, we start all Processors.

The upper InvokeHTTP Processor sends an HTTP request to receive the CSV file. As soon as it receives this with the response, the FlowFile is forwarded to the QueryRecord Processor.

Here the flow splits into two branches.
Links, an XML of all new stations with all existing information is passed to the UpdateAttribute Processor via the "new_all" relation. The UpdateAttribute Processor is used to set the "filename" attribute, which determines the name of the file output later.

The FlowFile comprising content and attributes is passed on to the CompressContent Processor, where the XML is turned into a data stream compressed with gzip. This finally ends up in the relation to the Funnel, where we can view and download the content.

On the other branch of the flow, the QueryRecord Processor also passes an XML with data on the new stations, but only the name and number, via the "new" relation. The UpdateRecord Processor then adds further information to each of these entries, namely today’s date.

At the same time, it converts the data into the JSON format expected by the REST endpoint. This JSON with entries for stations then goes into the SplitRecord Processor, where each entry is individually packed into a FlowFile, so that now several FlowFiles leave the Processor instead of one.

Each of these FlowFiles runs individually via the "splits" response into the InvokeHTTP Processor, which sends a request with the JSON from the content to the REST Service we have implemented for each of the FlowFiles.

Each FlowFile with valid JSON data produces a response from the REST Service with status code 200 and is routed to the "Response" relation accordingly.

Flow after successful execution
Illustration 38. Flow after successful execution

Outlook

This tutorial has shown you the basics of flow creation with IGUASU. IGUASU offers far more possibilities for real use cases.
These include, for example, advanced error handling strategies and many other specialized Processors and Controller Services that go beyond the scope of this introduction.