Grundlagen-Tutorial

Einführung

In diesem Tutorial werden wir einen Flow für folgenden Anwendungsfall erstellen:

Ein Anbieter von Werbetafeln möchte eine monatliche Auflistung neuer Bahnhöfe erhalten, an denen er sein Produkt anbieten kann.

  • Vorhanden ist ein REST-Service der Bahn, der alle Bahnhöfe als CSV (Komma-separierte Values) zurückliefert, bei denen einige als neu gekennzeichnet sind (Bahnhöfe-DB).

  • Es soll eine Zusammenfassung aller neuen Bahnhöfe im aktuellen Monat als gepacktes XML im Filesystem gesichert werden.

  • Zusätzlich sollen die neuen Bahnhöfe einzeln als JSON an einen internen REST-Service gesendet werden.

Der fertige Flow:

Endergebnis
Abbildung 1. Endergebnis

Ein Beispiel des abgeschlossenen Tutorials kann zudem mit folgendem Link heruntergeladen werden: 30min-tutorial.json.

Wir werden diesen Flow nach und nach zusammenstellen.
Dabei gehen wir zunächst auf die ersten zwei Punkte der Aufgabenstellung ein und richten anschließend den REST-Service für den letzten Punkt ein.

Teil 1: Bahnhöfe abrufen, filtern und speichern

In diesem Teil werden wir uns die neuen Bahnhöfe heraussuchen und diese in eine gepackte XML-Datei speichern. Das Ergebnis (Flow für Teil 1) hiervon ist nur der erste Teil des fertigen Flows. Im Verlauf gehen wir auf einige wichtige Konzepte und Features näher ein.

Flow für Teil 1
Abbildung 2. Flow für Teil 1

Zunächst werden wir einen Processor anlegen der die CSV-Datei mit den Bahnhöfen herunterlädt. Da uns nur die neuen Bahnhöfe interessieren, werden wir die gelesene CSV in einen weiteren Processor leiten, der die Filterung durchführt.

Im weiteren Verlauf werden dann die neuen Bahnhöfe Mithilfe der Processors

  • in ein XML Format überführt,

  • mit dem aktuellen Datum versehen,

  • komprimiert,

  • auf die Festplatte gespeichert.

Die Filterung und XML Konvertierung wird in einem Processor durchgeführt.

Für dieses Tutorial wird zunächst eine eigene Process Group erstellt, in der die Processors und Konfigurationen vorgenommen werden.

Process Group anlegen

Durch einen Doppelklick auf das erstellte Element gelangt man nun in die erstellte Process Group.

1.1 InvokeHTTP Processor (Stationen laden)

invokeHTTP-Processor anlegen
Abbildung 3. invokeHTTP-Processor anlegen

Als Erstes werden wir einen InvokeHTTP-Processor anlegen.
Der InvokeHTTP-Processor kann beliebige HTTP-Requests ausführen. Mit einem GET-Request auf die entsprechende URL werden wir die CSV-Datei mit den Bahnhöfen in den Processor laden.

Die Datei wird intern in ein FlowFile übersetzt und kann an weitere Processors weitergeleitet werden.

Ein FlowFile ist ein Container, welcher aus dem Content und Attributen besteht.

Im Content können beliebige Informationen beliebigen Typs referenziert werden, beispielsweise CSV-Daten, Bilddaten, Musik, JSON oder einfach nur Text.

In den Attributen stehen Metadaten, beispielsweise die Zeit der Erstellung, Typ des Contents, Dateiname usw. Es gibt Standard-Attribute, die jedes FlowFile enthält - wie etwa die uuid (Universally Unique Identifier), durch die man zwei FlowFiles voneinander unterscheiden kann.

Processor Konfiguration

Die Konfiguration der Processors erfolgt in den Properties-/Settings-Tabs des Moduleditors.

tutorial db invokehttp configuration
Abbildung 4. Processor-Properties und Processor-Settings

IGUASU validiert die Processor-Konfiguration und zeigt im Moduleditor, welche Properties/Settings noch vervollständigt werden müssen. Die noch nicht fertig konfigurierten Processors werden im Diagramm mit einem gelben Warndreieck (warn icon) markiert.

Processor-Warnungen
Abbildung 5. Processor-Warnungen

Im oberen Teil werden die fehlenden Einstellungen für den jeweils anderen Tab gezeigt. Wenn wir also im Processor Properties-Tab sind, werden oben die noch fehlenden Einstellungen für Processor Settings zusammengefasst dargestellt. Auf dem Screenshot ist zu sehen, dass 5 Warnungen in den Processor Settings behoben werden müssen.

Für das derzeit ausgewählte Tab werden die Meldungen direkt unter dem Feld angezeigt, wie auf dem obigen Screenshot für das Feld "Remote URL" zu erkennen ist.

In den Processor Properties wollen wir die Remote URL auf die URL der CSV setzen:

https://docs.virtimo.net/de/iguasu-docs/user-guide/_attachments/tutorial-files/D_Bahnhof_2020_alle.csv

Der Request Type ist schon standardmäßig auf GET eingestellt, daher müssen wir in den Properties nichts weiter ändern.

In den Processor Settings setzen wir Run Schedule auf "60 sec", damit wir jede Minute nach Updates prüfen (für die Aufgabenstellung würde auch "1 day" genügen - der Scheduler startet immer neu, wenn der Processor über das Kommando Start in den Zustand RUNNING gesetzt wird).

Im Falle eines erfolgreichen Requests (2xx Status Code), wird die Relation Response ausgelöst und die heruntergeladene Datei entlang des Pfades als FlowFile weiter geschickt. Die anderen Fälle werden wir nicht beachten, wodurch diese Relationen automatisch terminiert werden können. Standardmäßig werden alle Relationen automatisch terminiert. Wird eine Relation genutzt um eine Verknüpfung zu einem anderen Element im Diagramm zu erstellen, wird diese automatische Terminierung aufgehoben.

Alle Relationen terminieren

Unser InvokeHTTP-Processor ist soweit fertig. Er holt die CSV-Datei und leitet diese mit in einer Response-Relation weiter.

Die Erstellung des Processors, den wir mit dieser Response-Relation verbinden, ist in 1.2 QueryRecord Processor (Bahnhöfe filtern) beschrieben.

Isolierte Processor Ausführung

IGUASU bietet die Möglichkeit, Processors isoliert von der Ausführung im gesamten Flows laufen zu lassen und damit die konfigurierten Eigenschaften zu testen.
Dabei wird die gesetzte Konfiguration verwendet, ohne sie zu speichern. Zwischen den Ausführungen kann sie beliebig verändert werden.

Wir nutzen die Ausführung des InvokeHTTP-Processors jetzt, um zu sehen, ob der REST-Service der Bahn unter der angegebenen URL erreichbar ist und wie die von ihm zurückgegebenen Daten aussehen.

Die Ausführung im isolierten Zustand kann durch den play orange-Button im Moduleditor gestartet werden.

Nach der Ausführung wird die Ausgabe des Processors in der unteren Hälfte des Moduleditors eingeblendet. Man kann nun die FlowFiles, die entlang ihrer Relation geschickt werden, betrachten.

InvokeHTTP-Testausführung
Abbildung 6. InvokeHTTP-Testausführung

Der Processor konnte die CSV-Datei erfolgreich herunterladen.
Wir wählen die Response-Relation aus:

  • In "Result content" sehen wir die heruntergeladenen CSV Datei

  • In "Result attributes" sind die Attribute des HTTP-Response sowie weitere Standard-Attribute

Wir können feststellen, dass die erste Zeile die Header enthält und die Daten mit einem Semikolon getrennt werden. Aus den Daten ist zu entnehmen, dass uns die Spalte "Status" die Information über die neuen Stationen verrät. Die neuen Stationen werden mit "neu" markiert.

Wir werden im nächsten Kapitel noch ein weiteres Beispiel für den Einsatz der Testausführung betrachten (Isolierte Prozessausführung (Fortführung)).

1.2 QueryRecord Processor (Bahnhöfe filtern)

In diesem Abschnitt werden wir einen QueryRecord-Processor anlegen, der die CSV Datei entgegennimmt und nach neuen Bahnhöfen filtert.
Wir wissen aus der Testausführung, in welchem Format die Daten vorliegen und müssen den nachfolgenden Processor für die Verarbeitung bestimmen.

Einer der Processors, den wir für unser Anliegen verwenden können, ist der QueryRecord-Processor.

Es ist nützlich, zu wissen, welche Art von Processors es gibt und wie sie arbeiten. Man kann sich hierfür die Liste der verfügbaren Processors anschauen und nach Schlagwörtern suchen.

Der QueryRecord-Processor selektiert über SQL-Queries den Content eingehender FlowFiles. Dazu müssen die Daten in einem Format vorliegen, dass NiFi als Records interpretieren kann - das CSV-Format gehört dazu.

Über das SQL kann man dann filtern, transformieren oder aggregieren.

InvokeProcessor

Die Ergebnisse der Queries werden als FlowFiles entlang einer selbstdefinierten Relation weitergeschickt.

Zuerst werden wir den Processor ins Diagramm einfügen und die Response Relation von dem InvokeHTTP-Processor erstellen. Als Nächstes müssen wir den QueryRecord-Processor konfigurieren, damit er mit den eingehenden Daten wie gewünscht umgeht.

Relation zwischen Http-Processor und QueryRecord-Processor anlegen

Controller Services

Der Content, der in den QueryRecord-Processor eingeht, kann grundsätzlich beliebig sein. Daher muss man für den konkreten Typ einen passenden Record Reader spezifizieren.
Die Record Reader werden als Controller Service an der Process Group konfiguriert. Danach können sie von beliebigen Processors innerhalb dieser Process Group verwendet werden.

Der Typ der Ausgabe des QueryRecord-Processors wird durch einen Record Writer spezifiziert.

Processor Properties
Abbildung 7. Processor Properties

Da wir in unserem Beispiel eine CSV-Datei untersuchen wollen, braucht unser QueryRecord-Processor einen CSV Reader als Record Reader und einen XML Writer als Record Writer, weil die Datei letztendlich im XML Format gespeichert werden soll.

Wir legen jetzt diese Controller Services in den Einstellungen der Process Group an.
Damit der Processor Zugriff auf den jeweiligen Service bekommt, muss sich dieser entweder in derselben oder darüberliegenden Process Group befinden. Folglich haben alle Processors Zugriff auf die Services, die in der Root-Process Group liegen.

Um ins Menü für die Einstellungen der Process Groups zu gelangen, muss man auf das Service-Icon klicken, dabei wird dann der Process Groupsbaum auf der linken Seite aufgelistet. Ansonsten wird die aktuelle Process Group immer zuletzt am Pfad des home icon-Icon dargestellt.

Controller-Service hinzufügen

Der Process Groupsbaum beginnt mit der aktuellen Process Group und endet mit dem "[Root]". Mit dem "Plus"-Button (plus icon) kann man einen Controller Service zur Process Group hinzufügen, wie unten veranschaulicht.

Controller-Service hinzufügen
Abbildung 8. Controller Service hinzufügen

Für unser Beispiel werden wir einen CSVReader und XMLRecordSetWriter zur aktuellen Process Group hinzufügen. Nach dem Einfügen haben wir die Möglichkeit die Services zu starten/stoppen bzw. zu konfigurieren.

CSVReader Controller Service in die Root-Process Group hinzufügen
Abbildung 9. CSVReader Controller Service in die Root-Process Group hinzufügen

In den Einstellungen für den CSVReader müssen wir das Trennzeichen (Value Separator) als Semikolon spezifizieren, die restlichen Einstellungen können auf den Standardwerten bleiben.

Bei dem XMLRecordSetWriter müssen wir einen Namen für den Root-Knoten und seine Einträge spezifizieren(Root Tag & Record Tag), beispielsweise mit "Root" und "Record". Diese Angaben sind optionale Felder, die durch den Stern-Button in der Toolbar ein- und ausgeblendet werden können. Die restlichen Einstellungen können so bleiben, wie sie sind. Wir sind also fertig mit der Konfiguration der Record Reader/Writer und können sie nun aktivieren, sodass sie von anderen Processors eingesetzt werden können.

Die Services werden jeweils mit dem bolt icon-Button gestartet:

CSVReader-Service aktivieren
Abbildung 10. CSVReader-Service aktivieren

Die Controller Services werden nicht periodisch ausgeführt und haben auch keine Relationen, da sie nicht direkt im Flow hängen. Sie werden von Processors, Reporting Tasks oder anderen Controller Services verwendet.
Ein sinnvoller Einsatz für einen Kontroll-Service ist beispielweise ein Pool von Datenbank-Connections, der von vielen Processors genutzt werden kann.

Nun werden wir den CSVReader dem QueryRecord Processor als Record Reader und den XMLRecordSetWriter als Record Writer zuweisen (Processor Properties).
Die Ein- und Ausgabedatentypen wurden nun definiert. Daher verbleibt die Aufgabe, die Query zu schreiben, welche die neuen Bahnhöfe raussucht.

Bevor wir die Query schreiben, möchten wir an dieser Stelle ein weiteres Feature von IGUASU zeigen, welches uns bei der Erstellung und Konfiguration von Processors behilflich sein wird. Die Fortführung des Tutorials findet unter SQL Query statt.

Datenflow Analyze - Inspection Panel (Exkurs)

IGUASU bietet die Möglichkeit, die durch die Processors gelaufenen Daten anzuzeigen. Um dies in unserem Beispiel zu demonstrieren, müssen wir den Flow kurzzeitig starten, um Daten zu generieren. Für diesen Zweck kann durch einen Rechtsklick auf einer leeren Fläche im Diagram das Kontextmenü genutzt werden, um alle Processors in der aktuellen Process Group in den Zustand RUNNING zu versetzen, indem das Kommando Start ausgewählt wird. Anschließend können alle Processors über das Menü ebenfalls gestoppt werden.

IGUASU kurzzeitig starten
Abbildung 11. IGUASU kurzzeitig starten

Wenn nun die Event-Tabelle eingeschaltet ist, wird durch das Selektieren eines Processors, der bereits Daten verarbeitet hat, unterhalb des Diagramms der Verlauf aller vom Processor in der Ausführung erstellten Events angezeigt. Solch ein Event beschreibt einen Verarbeitungsschritt bezogen auf FlowFiles im Processor.

Beispiele solcher Events:

Fork

Das FlowFile wurde von einem anderen FlowFile abgeleitet

Route

Das FlowFile wird entlang einer Relation weitergeleitet (mit Begründung)

Drop

Das FlowFile wird nicht mehr weiterverarbeitet

In der NiFi Dokumentation kann man dazu weitere Informationen finden.

Events

Nach der Selektion eines Events wird der Verlauf der dazugehörigen Processor-Ausführungen im Diagram hervorgehoben. Die Nummern stellen die Ausführungsreihenfolge dar: Man sieht, welche Processors in welcher Reihenfolge ausgeführt wurden.
Hier sind es genau die zwei Processors, die wir bisher haben - in komplexeren Szenarien sieht man, welche Zweige genommen wurden.

Zudem wird nach der Selektion eines Events das Inspection Panel geöffnet. Dieses enthält detaillierte Informationen über das Event und enthält die Eingangs- und Ausgangs-Daten der Ausführung des Processors.
Die Eingangsdaten können auch für die Testausführung eines Processors verwendet werden (siehe Isolierte Prozessausführung (Fortführung)).

Event Selektieren
Abbildung 12. Event selektieren - Inspection Panel

Die Daten der Selektion des letzten Events bleiben im Inspection Panel und können dadurch für die Testausführung beliebiger Processors verwendet werden. Wir selektieren das Event im QueryRecord-Processor und es wird der CSV-Input im Inspection Panel angezeigt - diesen Input werden wir später (Isolierte Prozessausführung (Fortführung)) für die Testausführung verwenden.

SQL Query

Nun wollen wir die CSV-Datei Mithilfe eines SQL-Befehls nach neuen Einträgen filtern.

Im Folgenden sind zwei Beispieleinträge aus dem CSV dargestellt - die neuen Bahnhöfe werden im Feld "Status" mit dem Wert "neu" gekennzeichnet.

Zwei Beispiel-Einträge aus der CSV-Datei
EVA_NR,DS100,IFOPT,NAME,Verkehr,Laenge,Breite,Betreiber_Name,Betreiber_Nr,Status
8002551,AELB,de:02000:11943,Hamburg Elbbrücken,RV,10.0245,53.5345,DB Station und Service AG,,neu
8000032,NS,de:09662:28058,Schweinfurt Hbf,RV,10.212919,50.035313,DB Station und Service AG,5742,

Der erste Eintrag mit der EVA_NR "8002551" ist somit ein neuer Bahnhof und der zweite mit der Nummer "8000032" nicht.

Mit der folgenden Query werden wir nach den neuen Bahnhöfen filtern:

SELECT * FROM FLOWFILE WHERE Status = 'neu'

Die Queries werden als Dynamic Property mit dem plus round icon-Button hinzugefügt. Die Namen der Properties entsprechen den Namen der ausgehenden Relationen.
Wir legen eine Dynamic Property mit dem Namen "neue_alle" und der Query als Wert an.

QueryRecord-Einstellungen
Abbildung 13. QueryRecord-Einstellungen

Nun gibt es eine neue Relation mit dem Namen "neue_alle", die das Ergebnis der Query als FlowFile an weitere Processors weiterleitet.
In den Processor Settings lassen wir wieder die Relation automatisch terminieren.

An dieser Stelle ist die Konfiguration von QueryRecord abgeschlossen und wir könnten zum nächsten Processor übergehen.
Allerdings bietet es sich an, zu prüfen, ob alles richtig konfiguriert wurde und der Processor sich erwartungsgemäß verhält.

Isolierte Prozessausführung (Fortführung)

Im Folgenden werden wir die Korrektheit des QueryRecord-Processors mit der bereits erwähnten Test-Ausführung überprüfen (siehe Isolierte Processor Ausführung). Wir werden sehen, ob der Processor die Daten erwartungsgemäß verarbeitet, indem wir ihn auf ein FlowFile ausführen und das Ergebnis prüfen.

Beim InvokeHTTP war die Bereitstellung von Eingangsdaten (einem FlowFile) nicht nötig, da er zum Abrufen der Daten von einer URL keine Eingabedaten notwendig sind.

Um die Daten einer früheren Ausführung zu verwenden, selektieren wir einen der Events in der Tabelle unter dem Diagramm.
Wird dann die isolierte Ausführung ausgewählt, werden die Ergebnisse im Konfigurationsbereich angezeigt.

Die Tabelle erscheint, wenn man einen Processor im Diagramm selektiert und dieser schon einmal im Zuge des Flows ausgeführt wurde. Die isolierten bzw. Test-Ausführungen spielen dabei keine Rolle.

Processor testen

Nach der Ausführung (über den orangen Button) sehen wir, dass der QueryRecord-Processor für zwei Relationen die Ergebnisse zeigt:

  • In der "original"-Relation stehen die originalen Eingabedaten - hier schickt also der Processor einfach die Eingabedatei weiter.

  • In der "neue_alle"-Relation werden die durch die SQL-Query selektierten Stationen (mit Status='neu') als XML ausgegeben.

Die Record-Tags des XML haben wir im XMLRecordSetWriter spezifiziert. Wenn wir durch den "Result content" scrollen, können wir uns vergewissern, dass nur die neuen Bahnhöfe gefiltert worden sind.

Wir können die Eingabedaten für den Test auch manuell anpassen/eingeben. Um dies zu zeigen, merken wir uns vorerst, dass in "Result content" der erste neue Bahnhof die ID "8002551" besitzt.

Wir können überprüfen, ob korrekt gefiltert wird, wenn wir den Status dieses Bahnhofs in den Eingangsdaten auf alt ändern.
Dazu müssen wir ins Data-Panel wechseln und im Input data bei der Bahnhof ID 8002551 den Status von "neu" auf "alt" ändern. Da in unserem Fall die Datei zu groß ist, müssen wir die Datei herunterladen und in einem Editor unserer Wahl bearbeiten. Nachdem wir den Status geändert haben, können wir die Datei wieder hochladen.

Status ändern

Nach erneutem Ausführen sehen wir, dass die "neue_alle"-Relation nun keinen Eintrag mit der Bahnhof-ID "8002551" enthält.

Genauso können wir die Query beliebig ändern und z.B. auf alle Trierer Bahnhöfe filtern:

SELECT * FROM FLOWFILE where NAME LIKE '%Trier%'.

Man kann also in IGUASU mit den Processors experimentieren und ihre Ausgaben testen, bevor man sie im möglicherweise komplexen Flow ausführt.

1.3 UpdateAttribute-Processor (Dateinamen setzen)

Zur Erinnerung: Unser Ziel ist es, neue Bahnhöfe in einer komprimierten XML Datei abzuspeichern.
Der Dateiname soll dem Datum des aktuellen Tages entsprechen. Das Abspeichern wird mit dem PutFile-Processor durchgeführt. In der Dokumentation zum PutFile-Processor können wir nachlesen, dass die Spezifizierung gewünschter Dateinamen mit einer Zuweisung eines "filename"-Attributes zum FlowFile erfolgt. Wir müssen also unserem FlowFile das Attribut "filename" mit dem gewünschten Wert hinzufügen.

Mit dem UpdateAttribute-Processor können wir beliebige Attribute der FlowFiles anlegen und ändern. Wir werden mit diesem Processor also ein "filename"-Attribut mit dem Wert des Datums anlegen. Dieses Attribut wird im dann folgenden PutFile-Processor für den Ort der Datei verwendet, die gespeichert wird.

Als Erstes werden wir einen UpdateAttribute-Processor zum Diagramm hinzufügen und diesen mit der "neue alle"-Relation aus dem QueryRecord-Processor verbinden.

UpdateAttribut-Processor hinzufügen

Das Anlegen neuer Attribute über den UpdateAttribute-Processor erfolgt durch das Erstellen eines dynamischen Properties.

Der Name dieses dynamischen Properties entspricht dann dem Namen des Attributes, welches angelegt (oder geändert) werden soll. Der Inhalt entspricht dann dem (neuen) Wert.

Den Inhalt möchten wir auf das aktuelle Datum setzen. Da sich das Datum täglich ändern wird, können wir den String nicht statisch eintragen, sondern müssen diesen dynamisch passend zum Tag generieren.

Dies geschieht über die Expression Language. Sie kann in jedem Property verwendet werden, das mit einem '$' hinter dem Namen gekennzeichnet ist. Die darin enthaltenen Funktionen können uns passenderweise das aktuelle Systemdatum liefern.

Die Expression ${now():format('yyyy-MM-dd')} würde ein aktuelles Datum wie "2020-11-19" ergeben. Da wir die Datei als XML abspeichern wollen, können wir an die Expression noch das statische ".xml" anhängen und mit einem Prefix "Stations_" versehen: Stations_${now():format('yyyy-MM-dd')}.xml.
Das ergibt dann "Stations_2020-11-19.xml".

UpdateAttribute-Processor Einstellungen
Abbildung 14. UpdateAttribute-Processor Einstellungen

Der UpdateAttribute-Processor ändert nur die definierten Attribute des FlowFiles, nicht den Content. Das resultierende FlowFile mit dem nun angepassten "filename" werden wir an nächsten Processor weiterleiten.

1.4 CompressContent-Processor (Komprimieren)

In diesem Schritt komprimieren wir das FlowFile. Dazu setzen wir den CompressContent-Processor ein. Wir müssen den Processor zum Diagramm hinzufügen und nur noch die Relation aus dem UpdateAttribute anlegen. In den Einstellungen sollten wir:

  • den Haken bei "Update Filename" setzen, damit die Dateiendung des Attributs "filename" entsprechend angepasst wird

  • die Kompressionsrate auf 9 setzen

  • das Format auf "gzip" setzen.

Das komprimierte FlowFile leiten wir an den letzten Processor weiter.

1.5 PutFile-Processor (Speichern)

Zum Schluss speichern wir die komprimierte Datei in ein Verzeichnis.

Dieser Processor kann nur mit den entsprechenden Filesystem-Berechtigungen aufgerufen werden.

Dazu legen wir einen zunächst einen PutFile-Processor an. Dann verbinden wir den CompressContent-Processor mit dem neu angelegten PutFile-Processor. In den Einstellungen tragen wir das Zielverzeichnis ein.

Beide ausgehenden Relationen können wir terminieren, da der Prozess hier endet. Um uns Fehlerfälle anzeigen zu lassen, werden wir aber nur die success-Relation terminieren.
Die failure Relation leiten wir in ein Funnel-Element weiter, um diese späterer für die Fehleranzeige einzusetzen.

Mit einer passender Umbenennung der Processors sieht der resultierende Flow wie folgt aus:

Fertiger Flow aus Teil 1
Abbildung 15. Fertiger Flow aus Teil 1

Der Flow kann nun gestartet werden, womit die ersten beiden Punkte der Aufgabenstellung erfüllt sind. Nach dem ersten Durchlauf wird die XML-Datei ins Zielverzeichnis gespeichert.

Beim zweiten Durchlauf wird ein Konflikt mit der schon existierenden Datei entstehen und das FlowFile wird in die failure-Relation geleitet. Der Konflikt kann durch die "Conflict Resolution Strategy"-Einstellung im PutFile-Processor mit ignore oder replace aufgelöst werden.

Teil 2: Daten einzeln an REST-Service schicken

Dieser Abschnitt behandelt die Aufgabenstellung, die neuen Bahnhöfe einzeln als JSON an einen internen REST-Service zu senden.

Wir werden zunächst unseren Flow mit einem neuen Teil erweitern, welcher die Bahnhöfe einzeln in JSON an einen internen REST-Service schickt. Anschließend implementieren wir in Teil 3 auch diesen REST-Service selbst in Flow.

Hier ist die Erweiterung, die wir an unserem bestehenden Flow vornehmen werden:

Fertiger Flow aus Teil 2
Abbildung 16. Fertiger Flow aus Teil 2

Der "REST Service" ist dabei eine Process Group, die aus einem weiteren Flow besteht. Mit diesem beschäftigen wir uns aber zuletzt. Davor werden wir die Daten für diesen REST-Service durch die vorhergehenden Processors vorbereiten.

Wir werden zum bestehenden Flow also einen neuen Teil hinzufügen, der die Bahnhöfe einzeln an den Rest-Service schickt. Zuerst werden wir die Daten, die wir senden, etwas reduzieren, da wir nicht alle Felder brauchen. Zudem fügen wir ein neues Feld mit einem Datum hinzu und schicken anschließend alle Bahnhöfe einzeln an den REST-Service.

2.1 QueryRecord anpassen (auf Bahnhofnummer und Bahnhofname reduzieren)

In der Aufgabe wurde nicht spezifiziert, welche Felder für die Speicherung relevant sind. Zur besseren Übersicht werden wir nur die Bahnhofnummer (EVA_NR als NR), den Bahnhofnamen (NAME) und das aktuelle Datum (CREATED) abspeichern.

Um die irrelevanten Informationen zu entfernen, werden wir eine neue SQL Query "neue" im QueryRecord hinzufügen und nur die "EVA_NR"- bzw."NAME"-Felder selektieren.

SQL Query für "neue" Relation
SELECT EVA_NR as NR, NAME FROM FLOWFILE
where Status = 'neu'
Neue QueryRecord-Einstellungen mit zwei Dynamic Properties
Abbildung 17. Neue QueryRecord-Einstellungen mit zwei Dynamic Properties

Bei der Testausführung auf den CSV-Daten (siehe Isolierte Prozessausführung (Fortführung)) wird "NR" und "NAME" in unseren XML-Einträgen gespeichert.

Testausführung des QueryRecord-Processors
Abbildung 18. Testausführung des QueryRecord-Processors

Der QueryRecord ist somit fertig konfiguriert und wir gehen zum nächsten Verarbeitungsschritt über.

2.2 UpdateRecord Processor (Datum hinzufügen)

Mit dem UpdateRecord-Processor wollen wir zu den jeweiligen Bahnhöfen das aktuelle Datum hinzufügen.
Hier müssen wir auch einen Record Reader und Record Writer spezifizieren. Da wir unsere Daten im XML-Format von QueryRecord bekommen, brauchen wir einen XMLReader. Diesen müssen wir, wie im ersten Teil, als Service zur Process Group hinzufügen und in den Einstellungen die "Expect Records as Array" auf "true" setzen, da die Daten als Array im XML ankommen (siehe "Result content" Testausführung des QueryRecord-Processors).

XMLReader Einstellungen
Abbildung 19. XMLReader Einstellungen

Für den Record Writer werden wir einen JsonRecordSetWriter mit Standardeinstellungen verwenden. Damit werden die Stationen aus dem XML- in das JSON-Format umgewandelt, da wir letztendlich ein JSON an die REST-Schnittstelle schicken wollen.

Anschließend fügen wir ein Dynamic Property über den plus round icon-Button hinzu. Wir nutzen wieder die Expression für das aktuelle Datum und fügen dieses damit zu jedem Record hinzu. Der Name des Dynamic Property gibt dabei den Namen des zusätzlichen Feldes im Record vor. (Dieses Vorgehen ähnelt dem vorhergehenden Schritt mit dem UpdateAttribute-Processor im ersten Teil des Tutorials, siehe 1.3 UpdateAttribute-Processor (Dateinamen setzen).)

Da ein Record-Path Wert angegeben werden soll, fängt das Attribut diesmal mit einem "/" an und lautet demnach /CREATED.

UpdateRecord-Processor Settings
Abbildung 20. UpdateRecord-Processor Settings

Bei der Ausführung werden die Stationen jeweils aus XML in JSON umgewandelt und mit dem aktuellen Datum hinter dem Namen "CREATED" versehen:

Ein- und Ausgabe des UpdateRecord-Processors
Abbildung 21. Ein- und Ausgabe des UpdateRecord-Processors

2.3 Splitrecord-Processor (Bahnhöfe einzeln aufteilen)

Mit dem Splitrecord-Processor werden wir die Liste der Bahnhöfe in einzelne Elemente aufteilen, sodass pro Bahnhof ein Aufruf des REST-Services erfolgen kann.

Da die Daten im JSON-Format ankommen, brauchen wir einen "JSONTreeReader" als Record Reader, den wir wieder als Controller Service erzeugen müssen.
Die Standardeinstellungen können beibehalten werden.

Für den Record Writer können wir den "JsonRecordSetWriter" verwenden, den wir vorher für den UpdateRecord-Processor angelegt haben.

Splitrecord Processor Einstellungen
    Record Reader: JsonTreeReader
    Record Writer: JsonRecordSetWriter
Records Per Split: 1

2.4 InvokeHTTP-Processor (Bahnhöfe abschicken)

Mit dem InvokeHTTP-Processor schicken wir die einzelnen Bahnhöfe an den REST-Service.

InvokeHTTP-Processor Einstellungen
Post Method: POST
 Remote URL: http://localhost:3333

Nach fertigem Verbinden und Terminieren von Relationen sollte der fertige Flow wie folgt aussehen:

REST-Flow

Auf dem Bild ist zusätzlich zu erkennen, welche Verbindungen terminiert werden sollten.

Teil 3: Implementierung des REST-Services

Für den eigentlichen REST-Service werden wir in dem Flow, wie zu Beginn dieses Tutorials, eine neue Process Group anlegen.

Mit dem sitemap icon-Button wird durch Drag-and-Drop im Diagramm eine leere Process Group angelegt. Wir benennen sie als "REST Service" (siehe Screenshot Fertiger Flow aus Teil 2) und werden darin fünf weitere Processors anlegen, die unsere Stationen empfangen und in die Datenbank schreiben.

Zuerst müssen wir in die Process Group (durch einen Doppelklick oder per Selektion in der Toolbar) navigieren.

Hier ist der Flow (REST Service), welchen wir nun in dieser Process Group letztendlich anlegen werden:

REST Service Process Group
Abbildung 22. REST Service Process Group

Der HandleHttpRequest-Processor wird die eingehenden Bahnhöfe entgegennehmen und diese an den ValidateRecord-Processor zur Validierung übermitteln.

  • Nicht valide Daten führen zu einer Response mit HTTP-Statuscode 400.

  • Die validen Einträge werden an den PutDatabaseRecord-Processor übermittelt, wo sie in die Datenbank geschrieben werden. Nach erfolgreichem Schreiben wird mit einem HTTP-Statuscode 200 geantwortet.

Alle Processors machen zudem von weiteren Controller Services Gebrauch. Es bietet sich an, bei der Erstellung der Controller Services die Namen mit einem Prefix "REST" zu versehen. Somit kommt es bei der Auswahl nicht zu Verwirrung, da man die Controller Services übergeordneter Process Group sehen kann.

Präfix Controller Services

3.1 Handle HTTP

Das Erste, was wir für unseren REST-Service brauchen, ist ein Processor, der die HTTP Anfragen bearbeitet. Dafür eignet sich der HandleHttpRequest-Processor.

Für die Konfiguration müssen wir den "Listening Port" auf "3333" setzen und einen neuen StandardHttpContextMap Controller Service anlegen, der dafür sorgt, dass die HTTP-Session zwischen den Processors beibehalten wird.
Im StandardHttpContextMap Controller Service können die Standardeinstellungen beibehalten werden, der Name könnte aber entsprechend als "REST StandardHttpContextMap" gewählt werden.

HandleHttpRequest-Processor Einstellungen
Abbildung 23. HandleHttpRequest-Processor Einstellungen

Um die Anfragen zu beantworten, benötigen wir außerdem zwei HandleHttpResponse-Processors: Einen für den erfolgreichen Fall (Code 200) und einen für den fehlerhaften Fall (Code 400). Die beiden Processors benutzen denselben "StandardHttpContextMap" Controller Service.

rest httpresponse settings
Abbildung 24. HandleHttpResponse Processor Einstellungen (200 Code)

Entsprechend sehen auch die Einstellungen für den zweiten HandleHttpResponse-Processor aus, nur der "HTTP Status Code" sollte 400 sein.

Schließlich sollten sich nun drei Processors im Diagramm befinden.
Im Abbild schon entsprechend benannt:

REST-HTTP-Processors

3.2 Anfragen validieren

Bevor wir in die Datenbank schreiben, haben wir die Möglichkeit, die eingehenden Daten zu validieren. Dafür eignet sich der ValidateRecord-Processor. Die Validierung wird mit einem "Avro Schema" durchgeführt. Dieses beschreibt, wie die Daten aussehen müssen. Das Schema kann aus vorhandenen JSON-Daten generiert werden.
So sieht es für unsere Station aus:

Schema Text im ValidateRecord Processor
{
    "name": "MyClass",
    "type": "record",
    "namespace": "com.acme.avro",
    "fields": [
        {
            "name": "NR",
            "type": "int"
        },
        {
            "name": "NAME",
            "type": "string"
        },
        {
            "name": "CREATED",
            "type": "string"
        }
    ]
}

Dieses Schema muss in dem "Schema Text" Property eingetragen werden. Damit unser vordefiniertes Schema benutzt wird, muss in "Schema Access Strategy" der Eintrag "Use Schema Text Property" ausgewählt werden.

Für den Record Reader und Record Writer brauchen wir jeweils einen JsonTreeReader und JsonRecordSetWriter Controller Service. Die Einstellungen beider Controller Services dürfen auf den Standardwerten bleiben.

Die resultierende Konfiguration des ValidateRecord-Processors sollte schlussendlich so aussehen:

ValidateRecord-Processor Konfiguration
Abbildung 25. ValidateRecord-Processor Konfiguration

Die validen Einträge werden entlang der "valid"-Relation zum PutDatabaseRecord-Processor geschickt. Dieser Processor wird die Station in die Postgres Datenbank schreiben.

Für die Konfiguration brauchen wir zwei Controller Services: JsonTreeReader und Database Connection Pooling Service.
Wir müssen beim JsonTreeReader nur eine Änderung im Date Format vornehmen. Zusätzlich bietet es sich an, den Namen entsprechend zu setzen, damit wir ihn von anderen Readern bei der Auswahl unterscheiden können.

JsonTreeReader Konfiguration
Abbildung 26. JsonTreeReader Konfiguration

3.3 Anfragen in die Datenbank schreiben

Für den Database Connection Pooling Service legen wir einen DBCPConnectionPool Controller Service an. Wir müssen die "Database Connection URL" und die "Database Driver Class Name" in seinen Einstellungen setzen:

Database Connection URL: jdbc:postgresql:bahn
Database Driver Class Name: org.postgresql.Driver
DBCP Einstellungen

Die "success Relationship" verbinden wir mit dem HandleHttpResponse-Processor mit dem Code 200. Die Fehlerfälle verbinden wir folglich in dem anderen.

Am Ende sollte sich folgender Flow ergeben:

Fertiger Flow

Nun können wir den REST-Service ausführen, sobald der Postgres-Datenbankserver läuft.

Postgres installieren, testen und Tabellen anzeigen

Mit einer lokalen Postgres-Installation können wir unseren REST-Service direkt testen. Der Server muss eine Datenbank "bahn" enthalten und eine Tabelle "stations".

Für die Installation kann dieses Tutorial für das MAC-Betriebssystem verwendet werden.

Postgres-Server starten:

pg_ctl -D /usr/local/var/postgres start && brew services start postgresql

Postgres command line aufrufen:

psql postgres

Tabelle namens "bahn" anlegen:

create database bahn;

Alle Datenbanken anzeigen:

\dt

Mit der "bahn"-Datenbank verbinden:

\dc bahn

und dort die Tabelle "stations" mit drei Feldern anlegen:

CREATE TABLE
stations(NR int, NAME varchar(255), CREATED DATE);

Nachdem der Flow ausgeführt wurde, kann man sich die Liste der neuen Bahnhöfe anzeigen lassen:

Select * from stations;

3.5 Ergebnisse des REST Service in der Datenbank prüfen

Wir können unseren REST-Service nun testen, indem wir den Flow (siehe Fertiger Flow aus Teil 2) ausführen. Nach einem Durchlauf sollte sich die "stations"-Tabelle mit den neuen Bahnhöfen füllen:

Ausgabe der neuen Stationen
bahn=# select * from stations;
nr    |                 name                  |  created
---------+---------------------------------------+------------
8002535 | Halver-Oberbrügge                     | 2020-11-17
8012273 | Leipzig Mockauer Straße               | 2020-11-17
8012797 | Roßleben                              | 2020-11-17
8001510 | Dornstetten-Aach                      | 2020-11-17
8003105 | Jaderberg                             | 2020-11-17
8001944 | Eutingen Nord                         | 2020-11-17
8001966 | Feldolling                            | 2020-11-17
8011404 | Donndorf(Unstrut)                     | 2020-11-17
8003305 | Reken-Klein Reken                     | 2020-11-17
8003074 | Ingolstadt Audi                       | 2020-11-17
8002551 | Hamburg Elbbrücken                    | 2020-11-17
8071096 | Rheinsberg Stechlinsee                | 2020-11-17
8001723 | Einbeck Otto-Hahn-Straße              | 2020-11-17
8010340 | Straßgräbchen-Bernsdorf               | 2020-11-17
8005169 | Rosenheim Aicherpark                  | 2020-11-17
8002060 | Frankfurt(Main)-Gateway Gardens       | 2020-11-17
8003297 | Kierspe                               | 2020-11-17
8005933 | Stettfeld-Weiher                      | 2020-11-17
8013241 | Waßmannsdorf                          | 2020-11-17
8004371 | Nörvenich-Rommelsheim                 | 2020-11-17
8011605 | Bernburg-Roschwitz                    | 2020-11-17
8013301 | Wiednitz                              | 2020-11-17
8011201 | Flughafen BER - Terminal 1-2          | 2020-11-17
8089201 | Flughafen BER - Terminal 1-2 (S-Bahn) | 2020-11-17
(25 rows)