Basic tutorial
Introduction
In this tutorial, we will create a flow for the following use case:
A provider of advertising boards would like to receive a monthly list of new train stations where he can offer his product.
-
There is a REST service from the railroad company that returns all stations as CSV (comma-separated values), some of which are marked as new(stations-DB).
-
A summary of all new stations in the current month is to be saved as packed XML in the file system.
-
In addition, the new stations are to be sent individually as JSON to an internal REST service.
The finished flow:
An example of the completed tutorial can also be downloaded using the following link: 30min-tutorial.json.
We will put this flow together bit by bit.
We will first go through the first two points of the task and then set up the REST service for the last point.
Part 1: Retrieving, filtering and saving stations
In this part, we will pick out the new stations and save them in a packed XML file. The result (Flow for part 1) of this is only the first part of the finished flow. We will go into more detail about some important concepts and features as we go along.
First, we will create a Processor that downloads the CSV file with the stations. Since we are only interested in the new stations, we will feed the CSV file into another Processor that performs the filtering.
The new stations are then converted into an XML format with the help of the Processors
*, * is provided with the current date, * is compressed, * is saved to the hard disk.
The filtering and XML conversion is carried out in a Processor.
For this tutorial, a separate Process Group is first created in which the Processors and configurations are made.
Double-click on the created element to access the created Process Group.
1.1 InvokeHTTP Processor (load stations)
First, we will create an InvokeHTTP Processor.
The InvokeHTTP processor can execute any HTTP requests.
With a GET request to the corresponding URL, we will load the CSV file with the stations into the Processor.
The file is translated internally into a FlowFile and can be forwarded to other Processors.
A FlowFile is a container consisting of the content and attributes.
Any information of any type can be referenced in the content, for example CSV data, image data, music, JSON or simply text.
The attributes contain metadata, for example the time of creation, type of content, file name, etc. There are standard attributes. There are standard attributes that every FlowFile contains - such as the uuid (Universally Unique Identifier), which can be used to distinguish between two FlowFiles.
Processor configuration
Processors are configured in the properties/settings tabs of the module editor.
IGUASU validates the Processor configuration and shows in the module editor which properties/settings still need to be completed. Processors that have not yet been fully configured are marked in the Diagram with a yellow warning triangle ().
The missing settings for the other tab are shown at the top. So if we are in the Processor Properties tab, the missing settings for Processor Settings are summarized at the top. The screenshot shows that 5 warnings in the Processor Settings need to be fixed.
For the currently selected tab, the messages are displayed directly below the field, as can be seen in the screenshot above for the "Remote URL" field.
In the Processor Properties, we want to set the Remote URL to the URL of the CSV:
https://docs.virtimo.net/de/iguasu-docs/user-guide/_attachments/tutorial-files/D_Bahnhof_2020_alle.csv
The Request Type is already set to GET by default, so we don’t need to change anything else in the properties.
In the Processor Settings, we set Run Schedule to "60 sec" so that we check for updates every minute (1 day would also suffice for the task - the scheduler always restarts when the Processor is set to the RUNNING state via the Start command).
In the event of a successful request (2xx status code), the Relation Response is triggered and the downloaded file is sent along the path as a FlowFile. We will ignore the other cases, whereby these relations can be automatically terminated. By default, all relations are automatically terminated. If a relation is used to create a link to another element in the Diagram, this automatic termination is canceled.
Our InvokeHTTP processor is ready. It fetches the CSV file and forwards it in a response relation.
The creation of the Processor, which we connect to this response relation, is described in 1.2 QueryRecord Processor (filtering stations).
Isolated Processor execution
IGUASU offers the option of running Processors isolated from the execution in the entire flow and thus testing the configured properties.
The set configuration is used without saving it.
It can be changed as required between executions.
We now use the execution of the InvokeHTTP Processor to see whether the REST service of the web can be reached at the specified URL and what the data returned by it looks like.
The execution in the isolated state can be started by clicking the button in the module editor.
After the execution, the output of the Processor is displayed in the lower half of the module editor. You can now view the FlowFiles that are sent along their relation.
The Processor was able to download the CSV file successfully.
We select the response relation:
-
In "Result content" we see the downloaded CSV file
-
In "Result attributes" are the attributes of the HTTP response as well as other standard attributes
We can see that the first line contains the headers and the data is separated with a semicolon. From the data, we can see that the "Status" column tells us the information about the new stations. The new stations are marked with "new".
We will look at another example of the test execution in the next chapter (Isolated process execution (continuation)).
1.2 QueryRecord Processor (filtering stations)
In this section, we will create a QueryRecord Processor that receives the CSV file and filters it for new stations.
We know from the test execution what format the data is in and need to determine the subsequent Processor for processing.
One of the Processors we can use for our concern is the QueryRecord-Processor.
It is useful to know what kind of Processors there are and how they work. You can look at the list of available Processors and search for keywords. |
The QueryRecord-Processor selects the content of incoming FlowFiles via SQL queries. To do this, the data must be available in a format that NiFi can interpret as records - the CSV format is one of these.
You can then filter, transform or aggregate via the SQL.
The results of the queries are forwarded as FlowFiles along a self-defined relation.
First, we will insert the Processor into the Diagram and create the response relation from the InvokeHTTP-Processor. Next, we need to configure the QueryRecord-Processor so that it handles the incoming data as desired.
Controller Services
The content that goes into the QueryRecord-Processor can basically be arbitrary.
Therefore, a suitable record reader must be specified for the specific type.
The Record Readers are configured as a Controller Service at the Process Group.
They can then be used by any Processors within this Process Group.
The type of output of the QueryRecord-Processor is specified by a Record Writer.
As we want to examine a CSV file in our example, our QueryRecord-Processor needs a CSV Reader as Record Reader and an XML Writer as Record Writer, because the file is ultimately to be saved in XML format.
We now create these Controller Services in the Process Group settings.
In order for the Processor to have access to the respective Service, it must either be in the same or a higher Process Group.
Consequently, all Processors have access to the Services that are in the root Process Group.
To access the menu for the Process Group settings, you must click on the -Icon, the Process Group tree is then listed on the left-hand side. Otherwise, the current Process Group is always displayed last on the path of the icon.
The Process Group tree starts with the current Process Group and ends with the "[Root]". You can add a Controller Service to the Process Group using the "Plus" button (), as illustrated below.
For our example, we will add a CSVReader and XMLRecordSetWriter to the current Process Group. After adding, we have the option to start/stop or configure the Services.
In the settings for the CSVReader, we must specify the separator (value separator) as a semicolon, the remaining settings can remain at the default values.
For the XMLRecordSetWriter, we must specify a name for the root node and its entries*(Root Tag & Record Tag)*, for example with "Root" and "Record". These specifications are optional fields that can be shown or hidden using the star button in the toolbar. The remaining settings can be left as they are. We have now finished configuring the Record Reader/Writer and can activate it so that it can be used by other Processors.
The Services are each started using the button:
The Controller Services are not executed periodically and have no relations as they are not directly involved in the flow.
They are used by Processors, Reporting Tasks or other Controller Services. |
Now we will assign the CSVReader to the QueryRecord Processor as a Record Reader and the XMLRecordSetWriter as a Record Writer (Processor Properties).
The input and output data types have now been defined.
The task that remains is to write the query that searches for the new stations.
Before we write the query, we would like to show you another feature of IGUASU that will help us to create and configure Processors. The continuation of the tutorial can be found at SQL Query.
Data Flow Analyze - Inspection Panel (Excursus)
IGUASU offers the option of displaying the data run through the Processors. To demonstrate this in our example, we need to start the flow briefly in order to generate data. For this purpose, the context menu can be used by right-clicking on an empty area in the diagram to set all Processors in the current Process Group to the RUNNING state by selecting the Start command. All Processors can then also be stopped via the menu.
If the event table is now switched on, the history of all events created by the Processor during execution is displayed below the Diagram by selecting a Processor that has already processed data. Such an event describes a processing step related to FlowFiles in the Processor.
Examples of such events:
Fork |
The FlowFile was derived from another FlowFile |
Route |
The FlowFile is forwarded along a relation (with reason) |
Drop |
The FlowFile is no longer processed |
Further information can be found in the NiFi documentation.
After selecting an event, the history of the associated Processor executions is highlighted in the diagram.
The numbers represent the execution sequence: You can see which Processors were executed in which order.
Here are exactly the two Processors we have so far - in more complex scenarios, you can see which branches have been taken.
In addition, the Inspection Panel is opened after selecting an event.
This contains detailed information about the event and contains the input and output data of the Processor execution.
The input data can also be used for the test execution of a Processor (see Isolated process execution (continuation)).
The data from the selection of the last event remains in the Inspection Panel and can therefore be used for the test execution of any Processors. We select the event in the QueryRecord-Processor and the CSV input is displayed in the Inspection Panel - we will use this input later (Isolated process execution (continuation)) for the test execution.
SQL Query
Now we want to filter the CSV file for new entries using an SQL command.
Two example entries from the CSV are shown below - the new stations are marked with the value "new" in the "Status" field.
EVA_NR,DS100,IFOPT,NAME,Verkehr,Laenge,Breite,Betreiber_Name,Betreiber_Nr,Status
8002551,AELB,de:02000:11943,Hamburg Elbbrücken,RV,10.0245,53.5345,DB Station und Service AG,,neu
8000032,NS,de:09662:28058,Schweinfurt Hbf,RV,10.212919,50.035313,DB Station und Service AG,5742,
The first entry with the EVA_NR "8002551" is therefore a new station and the second with the number "8000032" is not.
We will filter for the new stations with the following query:
SELECT * FROM FLOWFILE WHERE Status = 'neu'
The queries are added as dynamic properties with the button.
The names of the properties correspond to the names of the outgoing relations.
We create a dynamic property with the name "new_all" and the query as the value.
Now there is a new relation with the name " new_all", which forwards the result of the query as a FlowFile to other Processors.
In the Processor Settings, we automatically terminate the relation again.
At this point, the configuration of QueryRecord is complete and we can move on to the next Processor.
However, it is a good idea to check whether everything has been configured correctly and the Processor is behaving as expected.
Isolated process execution (continuation)
In the following, we will check the correctness of the QueryRecord Processor with the test execution already mentioned (see Isolated Processor execution). We will see if the Processor processes the data as expected by executing it on a FlowFile and checking the result.
With InvokeHTTP, it was not necessary to provide input data (a FlowFile), as it does not require input data to retrieve the data from a URL. |
To use the data from a previous execution, we select one of the events in the table below the Diagram.
If the isolated execution is then selected, the results are displayed in the configuration area.
The table appears if you select a Processor in the Diagram and it has already been executed once in the course of the flow. The isolated or test executions do not play a role here. |
After execution (via the orange button), we see that the QueryRecord Processor shows the results for two relations:
-
The "original" relation contains the original input data - so here the Processor simply sends the input file on.
-
In the "new_all" relation, the stations selected by the SQL query (with status='new') are output as XML.
We have specified the record tags of the XML in the XMLRecordSetWriter. If we scroll through the "Result content", we can make sure that only the new stations have been filtered.
We can also manually adjust/enter the input data for the test. To show this, we first note that the first new station in "Result content" has the ID "8002551".
We can check whether filtering is correct if we change the status of this station in the input data to old.
To do this, we need to switch to the data panel and change the status of station ID 8002551 from "new" to "old" in the input data.
As the file is too large in our case, we need to download the file and edit it in an editor of our choice. Once we have changed the status, we can upload the file again.
After running it again, we see that the "new_all" relation now does not contain an entry with the station ID "8002551".
We can change the query in the same way and, for example to filter for all Trier stations:
SELECT * FROM FLOWFILE where NAME LIKE '%Trier%'.
You can therefore experiment with the Processors in IGUASU and test their output before executing them in the potentially complex flow.
1.3 UpdateAttribute-Processor (set file name)
As a reminder: Our goal is to save new stations in a compressed XML file.
The file name should correspond to the date of the current day.
Saving is carried out using the PutFile Processor.
In the documentation for the PutFile Processor, we can read that the specification of desired file names is carried out by assigning a "filename" attribute to the FlowFile.
We must therefore add the "filename" attribute with the desired value to our FlowFile.
We can use the UpdateAttribute-Processor to create and change any attributes of the FlowFiles. We will therefore use this Processor to create a "filename" attribute with the value of the date. This attribute is used in the subsequent PutFile-Processor for the location of the file that is saved.
First, we will add an UpdateAttribute-Processor to the Diagram and connect it to the "new all" relation from the QueryRecord-Processor.
New attributes are created via the UpdateAttribute-Processor by creating a dynamic property.
The name of this dynamic property then corresponds to the name of the attribute that is to be created (or changed). The content then corresponds to the (new) value.
We want to set the content to the current date. As the date will change daily, we cannot enter the string statically, but must generate it dynamically to match the day.
This is done using the Expression Language. It can be used in any property that is marked with a '$' after the name. The functions contained therein can appropriately provide us with the current system date.
The expression ${now():format('yyyy-MM-dd')}
would result in a current date such as "2020-11-19".
As we want to save the file as XML, we can append the static ".xml" to the expression and prefix it with "Stations_": Stations_${now():format('yyyy-MM-dd')}.xml
.
This then results in "Stations_2020-11-19.xml".
The UpdateAttribute-Processor only changes the defined attributes of the FlowFile, not the content. We will forward the resulting FlowFile with the now adjusted "filename" to the next Processor.
1.4 CompressContent-Processor (Compress)
In this step, we compress the FlowFile. To do this, we use the CompressContent-Processor. We need to add the Processor to the Diagram and only create the relation from the UpdateAttribute. In the settings, we should:
-
check the "Update Filename" box so that the file extension of the "filename" attribute is adjusted accordingly
-
set the compression rate to 9
-
set the format to "gzip".
We forward the compressed FlowFile to the last Processor.
1.5 PutFile-Processor (Save)
Finally, we save the compressed file in a directory.
This Processor can only be called with the corresponding file system authorizations. |
To do this, we first create a PutFile-Processor. Then we connect the CompressContent-Processor with the newly created PutFile-Processor. We enter the target directory in the settings.
We can terminate both outgoing relations as the process ends here.
However, we will only terminate the success relation in order to display error cases.
We forward the failure relation to a Funnels element in order to use it later for the error display.
With a suitable renaming of the Processors, the resulting flow looks like this:
The flow can now be started, which fulfills the first two points of the task. After the first run, the XML file is saved in the target directory.
During the second run, a conflict will arise with the existing file and the FlowFile will be routed to the failure relation. The conflict can be resolved using the "Conflict Resolution Strategy" setting in the PutFile Processor with ignore or replace.
Part 2: Sending data individually to REST Service
This section deals with the task of sending the new stations individually as JSON to an internal REST Service.
We will first extend our flow with a new part that sends the stations individually in JSON to an internal REST service. We will then implement this REST service itself in Flow in Part 3.
Here is the extension that we will make to our existing flow:
The "REST Service" is a Process Group that consists of another flow. However, we will deal with this last. Before that, we will prepare the data for this REST Service using the previous Processors.
We will therefore add a new part to the existing flow that sends the stations individually to the rest service. First, we will reduce the data we send a little, as we do not need all fields. We will also add a new field with a date and then send all stations individually to the REST service.
2.1 Customize QueryRecord (reduce to station number and station name)
The task did not specify which fields are relevant for storage. For a better overview, we will only save the station number (EVA_NR as NR), the station name (NAME) and the current date (CREATED).
To remove the irrelevant information, we will add a new SQL query "new" in the QueryRecord and only select the "EVA_NR" and "NAME" fields."NAME" fields.
SELECT EVA_NR as NR, NAME FROM FLOWFILE
where Status = 'neu'
During the test execution on the CSV data (see Isolated process execution (continuation)), "NR" and "NAME" are saved in our XML entries.
The QueryRecord is now configured and we move on to the next processing step.
2.2 UpdateRecord Processor (add date)
With the UpdateRecord Processor we want to add the current date to the respective stations.
Here we must also specify a Record Reader and Record Writer.
As we receive our data in XML format from QueryRecord, we need an XMLReader.
As in the first part, we must add this as a Service to the Process Group and set the "Expect Records as Array" to "true" in the settings, as the data arrives as an array in the XML (see "Result content" Test execution of the QueryRecord Processor).
For the Record Writer, we will use a JsonRecordSetWriter with default settings. This converts the stations from XML to JSON format, as we ultimately want to send a JSON to the REST interface.
We then add a dynamic property via the button. We again use the expression for the current date and add this to each record. The name of the dynamic property specifies the name of the additional field in the record. (This procedure is similar to the previous step with the UpdateAttribute Processor in the first part of the tutorial, see 1.3 UpdateAttribute-Processor (set file name).)
As a record path value is to be specified, the attribute starts with a "/" this time and is therefore |
During execution, the stations are converted from XML to JSON and provided with the current date after the name "CREATED":
2.3 Splitrecord-Processor (Split stations individually)
With the Splitrecord-Processor we will split the list of stations into individual elements so that one call of the REST Service can be made per station.
Since the data arrives in JSON format, we need a "JSONTreeReader" as a Record Reader, which we must again create as a Controller Service.
The default settings can be retained.
For the Record Writer, we can use the "JsonRecordSetWriter" that we previously created for the UpdateRecord Processor.
Record Reader: JsonTreeReader Record Writer: JsonRecordSetWriter Records Per Split: 1
2.4 InvokeHTTP-Processor (send stations)
With the InvokeHTTP-Processor we send the individual stations to the REST-Service.
Post Method: POST Remote URL: http://localhost:3333
Once we have finished connecting and terminating relations, the finished flow should look like this:
REST flow
The image also shows which connections should be terminated.
Part 3: Implementing the REST Service
For the actual REST Service, we will create a new Process Group in the flow, as at the beginning of this tutorial.
Using the button, we create an empty Process Group by dragging and dropping in the Diagram. We name it "REST Service" (see screenshot Completed flow from Part 2) and will create five more Processors in it, which will receive our stations and write them to the database.
First, we need to navigate to the Process Group (by double-clicking or selecting it in the toolbar).
Here is the flow (REST Service) that we will ultimately create in this Process Group:
RESTimage::tutorials/rest-finished-flow.png[Service Process Group]
The HandleHttpRequest Processor will receive the incoming stations and send them to the ValidateRecord Processor for validation.
-
Non-valid data leads to a response with HTTP status code 400.
-
The valid entries are transferred to the PutDatabaseRecord processor, where they are written to the database. After a successful write, a response is sent with HTTP status code 200.
All Processors also make use of additional Controller Services. It is a good idea to prefix the names of the Controller Services with "REST" when creating them. This prevents confusion during selection, as you can see the Controller Services of higher-level Process Groups.
3.1 Handle HTTP
The first thing we need for our REST service is a Processor that processes the HTTP requests. The HandleHttpRequest processor is suitable for this.
For the configuration, we need to set the "Listening Port" to "3333" and create a new StandardHttpContextMap Controller Service, which ensures that the HTTP session is maintained between the Processors.
The default settings can be retained in the StandardHttpContextMap Controller Service, but the name could be selected accordingly as "REST StandardHttpContextMap".
To answer the requests, we also need two HandleHttpResponse-Processors: One for the successful case (code 200) and one for the failed case (code 400). The two Processors use the same "StandardHttpContextMap" Controller Service.
The settings for the second HandleHttpResponse Processor look the same, only the "HTTP Status Code" should be 400.
Finally, there should now be three Processors in the Diagram.
Already named accordingly in the image:
REST-HTTP-Processors
3.2 Validating requests
Before we write to the database, we have the option of validating the incoming data.
The ValidateRecord-Processor is suitable for this.
The validation is carried out with an "Avro Schema".
This describes how the data must look.
The schema can be generated from existing JSON data.
This is how it looks for our station:
{
"name": "MyClass",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "NR",
"type": "int"
},
{
"name": "NAME",
"type": "string"
},
{
"name": "CREATED",
"type": "string"
}
]
}
This schema must be entered in the "Schema Text" property. In order for our predefined schema to be used, the entry "Use Schema Text Property" must be selected in "Schema Access Strategy".
For the Record Reader and Record Writer, we need a JsonTreeReader and JsonRecordSetWriter Controller Service respectively. The settings of both Controller Services may remain at the default values.
The resulting configuration of the ValidateRecord-Processor should ultimately look like this:
The valid entries are sent along the "valid" relation to the PutDatabaseRecord-Processor. This Processor will write the station to the Postgres database.
We need two Controller Services for the configuration: JsonTreeReader and Database Connection Pooling Service.
We only need to make one change to the date format of the JsonTreeReader. In addition, it makes sense to set the name accordingly so that we can distinguish it from other readers when selecting it.
JsonTreeReaderimage::tutorials/tutorial-db-jsonTreeReader.png[configuration]
3.3 Writing queries to the database
We create a DBCPConnectionPool Controller Service for the Database Connection Pooling Service. We need to set the "Database Connection URL" and the "Database Driver Class Name" in its settings:
Database Connection URL: jdbc:postgresql:bahn Database Driver Class Name: org.postgresql.Driver
DBCPimage::tutorials/tutorial-db-dbcp-configuration.png[Settings]
We connect the "success Relationship" to the HandleHttpResponse Processor with the code 200. We consequently connect the error cases in the other one.
In the end, the following flow should result:
Now we can execute the REST service as soon as the Postgres database server is running.
Install Postgres, test and display tables
With a local Postgres installation, we can test our REST service directly. The server must contain a database "bahn" and a table "stations".
This tutorial for the MAC operating system can be used for the installation.
Start Postgres server:
pg_ctl -D /usr/local/var/postgres start && brew services start postgresql
Call Postgres command line:
psql postgres
Create table named "bahn":
create database bahn;
Show all databases:
\dt
Connect to the "bahn" database:
\dc bahn
and create the "stations" table there with three fields:
CREATE TABLE
stations(NR int, NAME varchar(255), CREATED DATE);
After the flow has been*executed*, you can display the list of new stations:
Select * from stations;
3.5 Checking the results of the REST Service in the database
We can now test our REST Service by executing the flow (see Completed flow from Part 2). After one run, the "stations" table should be filled with the new stations:
bahn=# select * from stations; nr | name | created ---------+---------------------------------------+------------ 8002535 | Halver-Oberbrügge | 2020-11-17 8012273 | Leipzig Mockauer Straße | 2020-11-17 8012797 | Roßleben | 2020-11-17 8001510 | Dornstetten-Aach | 2020-11-17 8003105 | Jaderberg | 2020-11-17 8001944 | Eutingen Nord | 2020-11-17 8001966 | Feldolling | 2020-11-17 8011404 | Donndorf(Unstrut) | 2020-11-17 8003305 | Reken-Klein Reken | 2020-11-17 8003074 | Ingolstadt Audi | 2020-11-17 8002551 | Hamburg Elbbrücken | 2020-11-17 8071096 | Rheinsberg Stechlinsee | 2020-11-17 8001723 | Einbeck Otto-Hahn-Straße | 2020-11-17 8010340 | Straßgräbchen-Bernsdorf | 2020-11-17 8005169 | Rosenheim Aicherpark | 2020-11-17 8002060 | Frankfurt(Main)-Gateway Gardens | 2020-11-17 8003297 | Kierspe | 2020-11-17 8005933 | Stettfeld-Weiher | 2020-11-17 8013241 | Waßmannsdorf | 2020-11-17 8004371 | Nörvenich-Rommelsheim | 2020-11-17 8011605 | Bernburg-Roschwitz | 2020-11-17 8013301 | Wiednitz | 2020-11-17 8011201 | Airport BER - Terminal 1-2 | 2020-11-17 8089201 | Airport BER - Terminal 1-2 (S-Bahn) | 2020-11-17 (25 rows)