Grundlagen-Tutorial
Einführung
In diesem Tutorial werden wir einen Flow für folgenden Anwendungsfall erstellen:
Ein Anbieter von Werbetafeln möchte eine monatliche Auflistung neuer Bahnhöfe erhalten, an denen er sein Produkt anbieten kann.
-
Vorhanden ist ein REST-Service der Bahn, der alle Bahnhöfe als CSV (Komma-separierte Values) zurückliefert, bei denen einige als neu gekennzeichnet sind (Bahnhöfe-DB).
-
Es soll eine Zusammenfassung aller neuen Bahnhöfe im aktuellen Monat als gepacktes XML im Filesystem gesichert werden.
-
Zusätzlich sollen die neuen Bahnhöfe einzeln als JSON an einen internen REST-Service gesendet werden.
Der fertige Flow:
Ein Beispiel des abgeschlossenen Tutorials kann zudem mit folgendem Link heruntergeladen werden: 30min-tutorial.json.
Wir werden diesen Flow nach und nach zusammenstellen.
Dabei gehen wir zunächst auf die ersten zwei Punkte der Aufgabenstellung ein und richten anschließend den REST-Service für den letzten Punkt ein.
Teil 1: Bahnhöfe abrufen, filtern und speichern
In diesem Teil werden wir uns die neuen Bahnhöfe heraussuchen und diese in eine gepackte XML-Datei speichern. Das Ergebnis (Flow für Teil 1) hiervon ist nur der erste Teil des fertigen Flows. Im Verlauf gehen wir auf einige wichtige Konzepte und Features näher ein.
Zunächst werden wir einen Prozessor anlegen der die CSV-Datei mit den Bahnhöfen herunterlädt. Da uns nur die neuen Bahnhöfe interessieren, werden wir die gelesene CSV in einen weiteren Prozessor leiten, der die Filterung durchführt.
Im weiteren Verlauf werden dann die neuen Bahnhöfe Mithilfe der Prozessoren
-
in ein XML Format überführt,
-
mit dem aktuellen Datum versehen,
-
komprimiert,
-
auf die Festplatte gespeichert.
Die Filterung und XML Konvertierung wird in einem Prozessor durchgeführt.
Für dieses Tutorial wird zunächst eine eigene Prozessgruppe erstellt, in der die Prozessoren und Konfigurationen vorgenommen werden.
Durch einen Doppelklick auf das erstellte Element gelangt man nun in die erstellte Prozessgruppe.
1.1 InvokeHTTP Prozessor (Stationen laden)
Als Erstes werden wir einen InvokeHTTP-Prozessor anlegen.
Der InvokeHTTP-Prozessor kann beliebige HTTP-Requests ausführen.
Mit einem GET-Request auf die entsprechende URL werden wir die CSV-Datei mit den Bahnhöfen in den Prozessor laden.
Die Datei wird intern in ein FlowFile übersetzt und kann an weitere Prozessoren weitergeleitet werden.
Ein FlowFile ist ein Container, welcher aus dem Content und Attributen besteht.
Im Content können beliebige Informationen beliebigen Typs referenziert werden, beispielsweise CSV-Daten, Bilddaten, Musik, JSON oder einfach nur Text.
In den Attributen stehen Metadaten, beispielsweise die Zeit der Erstellung, Typ des Contents, Dateiname usw. Es gibt Standard-Attribute, die jedes FlowFile enthält - wie etwa die uuid (Universally Unique Identifier), durch die man zwei FlowFiles voneinander unterscheiden kann.
Prozessor Konfiguration
Die Konfiguration der Prozessoren erfolgt in den Properties-/Settings-Tabs des Moduleditors.
IGUASU validiert die Prozessor-Konfiguration und zeigt im Moduleditor, welche Properties/Settings noch vervollständigt werden müssen. Die noch nicht fertig konfigurierten Prozessoren werden im Diagramm mit einem gelben Warndreieck () markiert.
Im oberen Teil werden die fehlenden Einstellungen für den jeweils anderen Tab gezeigt. Wenn wir also im Prozessor Properties-Tab sind, werden oben die noch fehlenden Einstellungen für Prozessor Settings zusammengefasst dargestellt. Auf dem Screenshot ist zu sehen, dass 5 Warnungen in den Prozessor Settings behoben werden müssen.
Für das derzeit ausgewählte Tab werden die Meldungen direkt unter dem Feld angezeigt, wie auf dem obigen Screenshot für das Feld "Remote URL" zu erkennen ist.
In den Prozessor Properties wollen wir die Remote URL auf die URL der CSV setzen:
https://docs.virtimo.net/de/iguasu-docs/user-guide/_attachments/tutorial-files/D_Bahnhof_2020_alle.csv
Der Request Type ist schon standardmäßig auf GET eingestellt, daher müssen wir in den Properties nichts weiter ändern.
In den Prozessor Settings setzen wir Run Schedule auf "60 sec", damit wir jede Minute nach Updates prüfen (für die Aufgabenstellung würde auch "1 day" genügen - der Scheduler startet immer neu, wenn der Prozessor über das Kommando Start in den Zustand RUNNING gesetzt wird).
Im Falle eines erfolgreichen Requests (2xx Status Code), wird die Relation Response ausgelöst und die heruntergeladene Datei entlang des Pfades als FlowFile weiter geschickt. Die anderen Fälle werden wir nicht beachten, wodurch diese Relationen automatisch terminiert werden können. Standardmäßig werden alle Relationen automatisch terminiert. Wird eine Relation genutzt um eine Verknüpfung zu einem anderen Element im Diagramm zu erstellen, wird diese automatische Terminierung aufgehoben.
Unser InvokeHTTP-Prozessor ist soweit fertig. Er holt die CSV-Datei und leitet diese mit in einer Response-Relation weiter.
Die Erstellung des Prozessors, den wir mit dieser Response-Relation verbinden, ist in 1.2 QueryRecord Prozessor (Bahnhöfe filtern) beschrieben.
Isolierte Prozessor Ausführung
IGUASU bietet die Möglichkeit, Prozessoren isoliert von der Ausführung im gesamten Flows laufen zu lassen und damit die konfigurierten Eigenschaften zu testen.
Dabei wird die gesetzte Konfiguration verwendet, ohne sie zu speichern.
Zwischen den Ausführungen kann sie beliebig verändert werden.
Wir nutzen die Ausführung des InvokeHTTP-Prozessors jetzt, um zu sehen, ob der REST-Service der Bahn unter der angegebenen URL erreichbar ist und wie die von ihm zurückgegebenen Daten aussehen.
Die Ausführung im isolierten Zustand kann durch den -Button im Moduleditor gestartet werden.
Nach der Ausführung wird die Ausgabe des Prozessors in der unteren Hälfte des Moduleditors eingeblendet. Man kann nun die FlowFiles, die entlang ihrer Relation geschickt werden, betrachten.
Der Prozessor konnte die CSV-Datei erfolgreich herunterladen.
Wir wählen die Response-Relation aus:
-
In "Result content" sehen wir die heruntergeladenen CSV Datei
-
In "Result attributes" sind die Attribute des HTTP-Response sowie weitere Standard-Attribute
Wir können feststellen, dass die erste Zeile die Header enthält und die Daten mit einem Semikolon getrennt werden. Aus den Daten ist zu entnehmen, dass uns die Spalte "Status" die Information über die neuen Stationen verrät. Die neuen Stationen werden mit "neu" markiert.
Wir werden im nächsten Kapitel noch ein weiteres Beispiel für den Einsatz der Testausführung betrachten (Isolierte Prozessausführung (Fortführung)).
1.2 QueryRecord Prozessor (Bahnhöfe filtern)
In diesem Abschnitt werden wir einen QueryRecord-Prozessor anlegen, der die CSV Datei entgegennimmt und nach neuen Bahnhöfen filtert.
Wir wissen aus der Testausführung, in welchem Format die Daten vorliegen und müssen den nachfolgenden Prozessor für die Verarbeitung bestimmen.
Einer der Prozessoren, den wir für unser Anliegen verwenden können, ist der QueryRecord-Prozessor.
Es ist nützlich, zu wissen, welche Art von Prozessoren es gibt und wie sie arbeiten. Man kann sich hierfür die Liste der verfügbaren Prozessoren anschauen und nach Schlagwörtern suchen. |
Der QueryRecord-Prozessor selektiert über SQL-Queries den Content eingehender FlowFiles. Dazu müssen die Daten in einem Format vorliegen, dass NiFi als Records interpretieren kann - das CSV-Format gehört dazu.
Über das SQL kann man dann filtern, transformieren oder aggregieren.
Die Ergebnisse der Queries werden als FlowFiles entlang einer selbstdefinierten Relation weitergeschickt.
Zuerst werden wir den Prozessor ins Diagramm einfügen und die Response Relation von dem InvokeHTTP-Prozessor erstellen. Als Nächstes müssen wir den QueryRecord-Prozessor konfigurieren, damit er mit den eingehenden Daten wie gewünscht umgeht.
Controller Services
Der Content, der in den QueryRecord-Prozessor eingeht, kann grundsätzlich beliebig sein.
Daher muss man für den konkreten Typ einen passenden Record Reader spezifizieren.
Die Record Reader werden als Controller Service an der Prozessgruppe konfiguriert.
Danach können sie von beliebigen Prozessoren innerhalb dieser Prozessgruppe verwendet werden.
Der Typ der Ausgabe des QueryRecord-Prozessors wird durch einen Record Writer spezifiziert.
Da wir in unserem Beispiel eine CSV-Datei untersuchen wollen, braucht unser QueryRecord-Prozessor einen CSV Reader als Record Reader und einen XML Writer als Record Writer, weil die Datei letztendlich im XML Format gespeichert werden soll.
Wir legen jetzt diese Controller Services in den Einstellungen der Prozessgruppe an.
Damit der Prozessor Zugriff auf den jeweiligen Service bekommt, muss sich dieser entweder in derselben oder darüberliegenden Prozessgruppe befinden.
Folglich haben alle Prozessoren Zugriff auf die Services, die in der Root-Prozessgruppe liegen.
Um ins Menü für die Einstellungen der Prozessgruppen zu gelangen, muss man auf das -Icon klicken, dabei wird dann der Prozessgruppenbaum auf der linken Seite aufgelistet. Ansonsten wird die aktuelle Prozessgruppe immer zuletzt am Pfad des -Icon dargestellt.
Der Prozessgruppenbaum beginnt mit der aktuellen Prozessgruppe und endet mit dem "[Root]". Mit dem "Plus"-Button () kann man einen Controller Service zur Prozessgruppe hinzufügen, wie unten veranschaulicht.
Für unser Beispiel werden wir einen CSVReader und XMLRecordSetWriter zur aktuellen Prozessgruppe hinzufügen. Nach dem Einfügen haben wir die Möglichkeit die Services zu starten/stoppen bzw. zu konfigurieren.
In den Einstellungen für den CSVReader müssen wir das Trennzeichen (Value Separator) als Semikolon spezifizieren, die restlichen Einstellungen können auf den Standardwerten bleiben.
Bei dem XMLRecordSetWriter müssen wir einen Namen für den Root-Knoten und seine Einträge spezifizieren(Root Tag & Record Tag), beispielsweise mit "Root" und "Record". Diese Angaben sind optionale Felder, die durch den Stern-Button in der Toolbar ein- und ausgeblendet werden können. Die restlichen Einstellungen können so bleiben, wie sie sind. Wir sind also fertig mit der Konfiguration der Record Reader/Writer und können sie nun aktivieren, sodass sie von anderen Prozessoren eingesetzt werden können.
Die Services werden jeweils mit dem -Button gestartet:
Die Controller Services werden nicht periodisch ausgeführt und haben auch keine Relationen, da sie nicht direkt im Flow hängen.
Sie werden von Prozessoren, Reporting Tasks oder anderen Controller Services verwendet. |
Nun werden wir den CSVReader dem QueryRecord Prozessor als Record Reader und den XMLRecordSetWriter als Record Writer zuweisen (Processor Properties).
Die Ein- und Ausgabedatentypen wurden nun definiert.
Daher verbleibt die Aufgabe, die Query zu schreiben, welche die neuen Bahnhöfe raussucht.
Bevor wir die Query schreiben, möchten wir an dieser Stelle ein weiteres Feature von IGUASU zeigen, welches uns bei der Erstellung und Konfiguration von Prozessoren behilflich sein wird. Die Fortführung des Tutorials findet unter SQL Query statt.
Datenflow Analyze - Inspection Panel (Exkurs)
IGUASU bietet die Möglichkeit, die durch die Prozessoren gelaufenen Daten anzuzeigen. Um dies in unserem Beispiel zu demonstrieren, müssen wir den Flow kurzzeitig starten, um Daten zu generieren. Für diesen Zweck kann durch einen Rechtsklick auf einer leeren Fläche im Diagram das Kontextmenü genutzt werden, um alle Prozessoren in der aktuellen Prozessgruppe in den Zustand RUNNING zu versetzen, indem das Kommando Start ausgewählt wird. Anschließend können alle Prozessoren über das Menü ebenfalls gestoppt werden.
Wenn nun die Event-Tabelle eingeschaltet ist, wird durch das Selektieren eines Prozessors, der bereits Daten verarbeitet hat, unterhalb des Diagramms der Verlauf aller vom Prozessor in der Ausführung erstellten Events angezeigt. Solch ein Event beschreibt einen Verarbeitungsschritt bezogen auf FlowFiles im Prozessor.
Beispiele solcher Events:
Fork |
Das FlowFile wurde von einem anderen FlowFile abgeleitet |
Route |
Das FlowFile wird entlang einer Relation weitergeleitet (mit Begründung) |
Drop |
Das FlowFile wird nicht mehr weiterverarbeitet |
In der NiFi Dokumentation kann man dazu weitere Informationen finden.
Nach der Selektion eines Events wird der Verlauf der dazugehörigen Prozessor-Ausführungen im Diagram hervorgehoben.
Die Nummern stellen die Ausführungsreihenfolge dar: Man sieht, welche Prozessoren in welcher Reihenfolge ausgeführt wurden.
Hier sind es genau die zwei Prozessoren, die wir bisher haben - in komplexeren Szenarien sieht man, welche Zweige genommen wurden.
Zudem wird nach der Selektion eines Events das Inspection Panel geöffnet.
Dieses enthält detaillierte Informationen über das Event und enthält die Eingangs- und Ausgangs-Daten der Ausführung des Prozessors.
Die Eingangsdaten können auch für die Testausführung eines Prozessors verwendet werden (siehe Isolierte Prozessausführung (Fortführung)).
Die Daten der Selektion des letzten Events bleiben im Inspection Panel und können dadurch für die Testausführung beliebiger Prozessoren verwendet werden. Wir selektieren das Event im QueryRecord-Prozessor und es wird der CSV-Input im Inspection Panel angezeigt - diesen Input werden wir später (Isolierte Prozessausführung (Fortführung)) für die Testausführung verwenden.
SQL Query
Nun wollen wir die CSV-Datei Mithilfe eines SQL-Befehls nach neuen Einträgen filtern.
Im Folgenden sind zwei Beispieleinträge aus dem CSV dargestellt - die neuen Bahnhöfe werden im Feld "Status" mit dem Wert "neu" gekennzeichnet.
EVA_NR,DS100,IFOPT,NAME,Verkehr,Laenge,Breite,Betreiber_Name,Betreiber_Nr,Status
8002551,AELB,de:02000:11943,Hamburg Elbbrücken,RV,10.0245,53.5345,DB Station und Service AG,,neu
8000032,NS,de:09662:28058,Schweinfurt Hbf,RV,10.212919,50.035313,DB Station und Service AG,5742,
Der erste Eintrag mit der EVA_NR "8002551" ist somit ein neuer Bahnhof und der zweite mit der Nummer "8000032" nicht.
Mit der folgenden Query werden wir nach den neuen Bahnhöfen filtern:
SELECT * FROM FLOWFILE WHERE Status = 'neu'
Die Queries werden als Dynamic Property mit dem -Button hinzugefügt.
Die Namen der Properties entsprechen den Namen der ausgehenden Relationen.
Wir legen eine Dynamic Property mit dem Namen "neue_alle" und der Query als Wert an.
Nun gibt es eine neue Relation mit dem Namen "neue_alle", die das Ergebnis der Query als FlowFile an weitere Prozessoren weiterleitet.
In den Prozessor Settings lassen wir wieder die Relation automatisch terminieren.
An dieser Stelle ist die Konfiguration von QueryRecord abgeschlossen und wir könnten zum nächsten Prozessor übergehen.
Allerdings bietet es sich an, zu prüfen, ob alles richtig konfiguriert wurde und der Prozessor sich erwartungsgemäß verhält.
Isolierte Prozessausführung (Fortführung)
Im Folgenden werden wir die Korrektheit des QueryRecord-Prozessors mit der bereits erwähnten Test-Ausführung überprüfen (siehe Isolierte Prozessor Ausführung). Wir werden sehen, ob der Prozessor die Daten erwartungsgemäß verarbeitet, indem wir ihn auf ein FlowFile ausführen und das Ergebnis prüfen.
Beim InvokeHTTP war die Bereitstellung von Eingangsdaten (einem FlowFile) nicht nötig, da er zum Abrufen der Daten von einer URL keine Eingabedaten notwendig sind. |
Um die Daten einer früheren Ausführung zu verwenden, selektieren wir einen der Events in der Tabelle unter dem Diagramm.
Wird dann die isolierte Ausführung ausgewählt, werden die Ergebnisse im Konfigurationsbereich angezeigt.
Die Tabelle erscheint, wenn man einen Prozessor im Diagramm selektiert und dieser schon einmal im Zuge des Flows ausgeführt wurde. Die isolierten bzw. Test-Ausführungen spielen dabei keine Rolle. |
Nach der Ausführung (über den orangen Button) sehen wir, dass der QueryRecord-Prozessor für zwei Relationen die Ergebnisse zeigt:
-
In der "original"-Relation stehen die originalen Eingabedaten - hier schickt also der Prozessor einfach die Eingabedatei weiter.
-
In der "neue_alle"-Relation werden die durch die SQL-Query selektierten Stationen (mit Status='neu') als XML ausgegeben.
Die Record-Tags des XML haben wir im XMLRecordSetWriter spezifiziert. Wenn wir durch den "Result content" scrollen, können wir uns vergewissern, dass nur die neuen Bahnhöfe gefiltert worden sind.
Wir können die Eingabedaten für den Test auch manuell anpassen/eingeben. Um dies zu zeigen, merken wir uns vorerst, dass in "Result content" der erste neue Bahnhof die ID "8002551" besitzt.
Wir können überprüfen, ob korrekt gefiltert wird, wenn wir den Status dieses Bahnhofs in den Eingangsdaten auf alt ändern.
Dazu müssen wir ins Data-Panel wechseln und im Input data bei der Bahnhof ID 8002551 den Status von "neu" auf "alt" ändern.
Da in unserem Fall die Datei zu groß ist, müssen wir die Datei herunterladen und in einem Editor unserer Wahl bearbeiten. Nachdem wir den Status geändert haben, können wir die Datei wieder hochladen.
Nach erneutem Ausführen sehen wir, dass die "neue_alle"-Relation nun keinen Eintrag mit der Bahnhof-ID "8002551" enthält.
Genauso können wir die Query beliebig ändern und z.B. auf alle Trierer Bahnhöfe filtern:
SELECT * FROM FLOWFILE where NAME LIKE '%Trier%'.
Man kann also in IGUASU mit den Prozessoren experimentieren und ihre Ausgaben testen, bevor man sie im möglicherweise komplexen Flow ausführt.
1.3 UpdateAttribute-Prozessor (Dateinamen setzen)
Zur Erinnerung: Unser Ziel ist es, neue Bahnhöfe in einer komprimierten XML Datei abzuspeichern.
Der Dateiname soll dem Datum des aktuellen Tages entsprechen.
Das Abspeichern wird mit dem PutFile-Prozessor durchgeführt.
In der Dokumentation zum PutFile-Prozessor können wir nachlesen, dass die Spezifizierung gewünschter Dateinamen mit einer Zuweisung eines "filename"-Attributes zum FlowFile erfolgt.
Wir müssen also unserem FlowFile das Attribut "filename" mit dem gewünschten Wert hinzufügen.
Mit dem UpdateAttribute-Prozessor können wir beliebige Attribute der FlowFiles anlegen und ändern. Wir werden mit diesem Prozessor also ein "filename"-Attribut mit dem Wert des Datums anlegen. Dieses Attribut wird im dann folgenden PutFile-Prozessor für den Ort der Datei verwendet, die gespeichert wird.
Als Erstes werden wir einen UpdateAttribute-Prozessor zum Diagramm hinzufügen und diesen mit der "neue alle"-Relation aus dem QueryRecord-Prozessor verbinden.
Das Anlegen neuer Attribute über den UpdateAttribute-Prozessor erfolgt durch das Erstellen eines dynamischen Properties.
Der Name dieses dynamischen Properties entspricht dann dem Namen des Attributes, welches angelegt (oder geändert) werden soll. Der Inhalt entspricht dann dem (neuen) Wert.
Den Inhalt möchten wir auf das aktuelle Datum setzen. Da sich das Datum täglich ändern wird, können wir den String nicht statisch eintragen, sondern müssen diesen dynamisch passend zum Tag generieren.
Dies geschieht über die Expression Language. Sie kann in jedem Property verwendet werden, das mit einem '$' hinter dem Namen gekennzeichnet ist. Die darin enthaltenen Funktionen können uns passenderweise das aktuelle Systemdatum liefern.
Die Expression ${now():format('yyyy-MM-dd')}
würde ein aktuelles Datum wie "2020-11-19" ergeben.
Da wir die Datei als XML abspeichern wollen, können wir an die Expression noch das statische ".xml" anhängen und mit einem Prefix "Stations_" versehen: Stations_${now():format('yyyy-MM-dd')}.xml
.
Das ergibt dann "Stations_2020-11-19.xml".
Der UpdateAttribute-Prozessor ändert nur die definierten Attribute des FlowFiles, nicht den Content. Das resultierende FlowFile mit dem nun angepassten "filename" werden wir an nächsten Prozessor weiterleiten.
1.4 CompressContent-Prozessor (Komprimieren)
In diesem Schritt komprimieren wir das FlowFile. Dazu setzen wir den CompressContent-Prozessor ein. Wir müssen den Prozessor zum Diagramm hinzufügen und nur noch die Relation aus dem UpdateAttribute anlegen. In den Einstellungen sollten wir:
-
den Haken bei "Update Filename" setzen, damit die Dateiendung des Attributs "filename" entsprechend angepasst wird
-
die Kompressionsrate auf 9 setzen
-
das Format auf "gzip" setzen.
Das komprimierte FlowFile leiten wir an den letzten Prozessor weiter.
1.5 PutFile-Prozessor (Speichern)
Zum Schluss speichern wir die komprimierte Datei in ein Verzeichnis.
Dieser Prozessor kann nur mit den entsprechenden Filesystem-Berechtigungen aufgerufen werden. |
Dazu legen wir einen zunächst einen PutFile-Prozessor an. Dann verbinden wir den CompressContent-Prozessor mit dem neu angelegten PutFile-Prozessor. In den Einstellungen tragen wir das Zielverzeichnis ein.
Beide ausgehenden Relationen können wir terminieren, da der Prozess hier endet.
Um uns Fehlerfälle anzeigen zu lassen, werden wir aber nur die success-Relation terminieren.
Die failure Relation leiten wir in ein Funnel-Element weiter, um diese späterer für die Fehleranzeige einzusetzen.
Mit einer passender Umbenennung der Prozessoren sieht der resultierende Flow wie folgt aus:
Der Flow kann nun gestartet werden, womit die ersten beiden Punkte der Aufgabenstellung erfüllt sind. Nach dem ersten Durchlauf wird die XML-Datei ins Zielverzeichnis gespeichert.
Beim zweiten Durchlauf wird ein Konflikt mit der schon existierenden Datei entstehen und das FlowFile wird in die failure-Relation geleitet. Der Konflikt kann durch die "Conflict Resolution Strategy"-Einstellung im PutFile-Prozessor mit ignore oder replace aufgelöst werden.
Teil 2: Daten einzeln an REST-Service schicken
Dieser Abschnitt behandelt die Aufgabenstellung, die neuen Bahnhöfe einzeln als JSON an einen internen REST-Service zu senden.
Wir werden zunächst unseren Flow mit einem neuen Teil erweitern, welcher die Bahnhöfe einzeln in JSON an einen internen REST-Service schickt. Anschließend implementieren wir in Teil 3 auch diesen REST-Service selbst in Flow.
Hier ist die Erweiterung, die wir an unserem bestehenden Flow vornehmen werden:
Der "REST Service" ist dabei eine Prozessgruppe, die aus einem weiteren Flow besteht. Mit diesem beschäftigen wir uns aber zuletzt. Davor werden wir die Daten für diesen REST-Service durch die vorhergehenden Prozessoren vorbereiten.
Wir werden zum bestehenden Flow also einen neuen Teil hinzufügen, der die Bahnhöfe einzeln an den Rest-Service schickt. Zuerst werden wir die Daten, die wir senden, etwas reduzieren, da wir nicht alle Felder brauchen. Zudem fügen wir ein neues Feld mit einem Datum hinzu und schicken anschließend alle Bahnhöfe einzeln an den REST-Service.
2.1 QueryRecord anpassen (auf Bahnhofnummer und Bahnhofname reduzieren)
In der Aufgabe wurde nicht spezifiziert, welche Felder für die Speicherung relevant sind. Zur besseren Übersicht werden wir nur die Bahnhofnummer (EVA_NR als NR), den Bahnhofnamen (NAME) und das aktuelle Datum (CREATED) abspeichern.
Um die irrelevanten Informationen zu entfernen, werden wir eine neue SQL Query "neue" im QueryRecord hinzufügen und nur die "EVA_NR"- bzw."NAME"-Felder selektieren.
SELECT EVA_NR as NR, NAME FROM FLOWFILE
where Status = 'neu'
Bei der Testausführung auf den CSV-Daten (siehe Isolierte Prozessausführung (Fortführung)) wird "NR" und "NAME" in unseren XML-Einträgen gespeichert.
Der QueryRecord ist somit fertig konfiguriert und wir gehen zum nächsten Verarbeitungsschritt über.
2.2 UpdateRecord Prozessor (Datum hinzufügen)
Mit dem UpdateRecord-Prozessor wollen wir zu den jeweiligen Bahnhöfen das aktuelle Datum hinzufügen.
Hier müssen wir auch einen Record Reader und Record Writer spezifizieren.
Da wir unsere Daten im XML-Format von QueryRecord bekommen, brauchen wir einen XMLReader.
Diesen müssen wir, wie im ersten Teil, als Service zur Prozessgruppe hinzufügen und in den Einstellungen die "Expect Records as Array" auf "true" setzen, da die Daten als Array im XML ankommen (siehe "Result content" Testausführung des QueryRecord-Prozessors).
Für den Record Writer werden wir einen JsonRecordSetWriter mit Standardeinstellungen verwenden. Damit werden die Stationen aus dem XML- in das JSON-Format umgewandelt, da wir letztendlich ein JSON an die REST-Schnittstelle schicken wollen.
Anschließend fügen wir ein Dynamic Property über den -Button hinzu. Wir nutzen wieder die Expression für das aktuelle Datum und fügen dieses damit zu jedem Record hinzu. Der Name des Dynamic Property gibt dabei den Namen des zusätzlichen Feldes im Record vor. (Dieses Vorgehen ähnelt dem vorhergehenden Schritt mit dem UpdateAttribute-Prozessor im ersten Teil des Tutorials, siehe 1.3 UpdateAttribute-Prozessor (Dateinamen setzen).)
Da ein Record-Path Wert angegeben werden soll, fängt das Attribut diesmal mit einem "/" an und lautet demnach |
Bei der Ausführung werden die Stationen jeweils aus XML in JSON umgewandelt und mit dem aktuellen Datum hinter dem Namen "CREATED" versehen:
2.3 Splitrecord-Prozessor (Bahnhöfe einzeln aufteilen)
Mit dem Splitrecord-Prozessor werden wir die Liste der Bahnhöfe in einzelne Elemente aufteilen, sodass pro Bahnhof ein Aufruf des REST-Services erfolgen kann.
Da die Daten im JSON-Format ankommen, brauchen wir einen "JSONTreeReader" als Record Reader, den wir wieder als Controller Service erzeugen müssen.
Die Standardeinstellungen können beibehalten werden.
Für den Record Writer können wir den "JsonRecordSetWriter" verwenden, den wir vorher für den UpdateRecord-Prozessor angelegt haben.
Record Reader: JsonTreeReader Record Writer: JsonRecordSetWriter Records Per Split: 1
2.4 InvokeHTTP-Prozessor (Bahnhöfe abschicken)
Mit dem InvokeHTTP-Prozessor schicken wir die einzelnen Bahnhöfe an den REST-Service.
Post Method: POST Remote URL: http://localhost:3333
Nach fertigem Verbinden und Terminieren von Relationen sollte der fertige Flow wie folgt aussehen:
Auf dem Bild ist zusätzlich zu erkennen, welche Verbindungen terminiert werden sollten.
Teil 3: Implementierung des REST-Services
Für den eigentlichen REST-Service werden wir in dem Flow, wie zu Beginn dieses Tutorials, eine neue Prozessgruppe anlegen.
Mit dem -Button wird durch Drag-and-Drop im Diagramm eine leere Prozessorgruppe angelegt. Wir benennen sie als "REST Service" (siehe Screenshot Fertiger Flow aus Teil 2) und werden darin fünf weitere Prozessoren anlegen, die unsere Stationen empfangen und in die Datenbank schreiben.
Zuerst müssen wir in die Prozessgruppe (durch einen Doppelklick oder per Selektion in der Toolbar) navigieren.
Hier ist der Flow (REST Service), welchen wir nun in dieser Prozessgruppe letztendlich anlegen werden:
Der HandleHttpRequest-Prozessor wird die eingehenden Bahnhöfe entgegennehmen und diese an den ValidateRecord-Prozessor zur Validierung übermitteln.
-
Nicht valide Daten führen zu einer Response mit HTTP-Statuscode 400.
-
Die validen Einträge werden an den PutDatabaseRecord-Prozessor übermittelt, wo sie in die Datenbank geschrieben werden. Nach erfolgreichem Schreiben wird mit einem HTTP-Statuscode 200 geantwortet.
Alle Prozessoren machen zudem von weiteren Controller Services Gebrauch. Es bietet sich an, bei der Erstellung der Controller Services die Namen mit einem Prefix "REST" zu versehen. Somit kommt es bei der Auswahl nicht zu Verwirrung, da man die Controller Services übergeordneter Prozessgruppe sehen kann.
3.1 Handle HTTP
Das Erste, was wir für unseren REST-Service brauchen, ist ein Prozessor, der die HTTP Anfragen bearbeitet. Dafür eignet sich der HandleHttpRequest-Prozessor.
Für die Konfiguration müssen wir den "Listening Port" auf "3333" setzen und einen neuen StandardHttpContextMap Controller Service anlegen, der dafür sorgt, dass die HTTP-Session zwischen den Prozessoren beibehalten wird.
Im StandardHttpContextMap Controller Service können die Standardeinstellungen beibehalten werden, der Name könnte aber entsprechend als "REST StandardHttpContextMap" gewählt werden.
Um die Anfragen zu beantworten, benötigen wir außerdem zwei HandleHttpResponse-Prozessoren: Einen für den erfolgreichen Fall (Code 200) und einen für den fehlerhaften Fall (Code 400). Die beiden Prozessoren benutzen denselben "StandardHttpContextMap" Controller Service.
Entsprechend sehen auch die Einstellungen für den zweiten HandleHttpResponse-Prozessor aus, nur der "HTTP Status Code" sollte 400 sein.
Schließlich sollten sich nun drei Prozessoren im Diagramm befinden.
Im Abbild schon entsprechend benannt:
3.2 Anfragen validieren
Bevor wir in die Datenbank schreiben, haben wir die Möglichkeit, die eingehenden Daten zu validieren.
Dafür eignet sich der ValidateRecord-Prozessor.
Die Validierung wird mit einem "Avro Schema" durchgeführt.
Dieses beschreibt, wie die Daten aussehen müssen.
Das Schema kann aus vorhandenen JSON-Daten generiert werden.
So sieht es für unsere Station aus:
{
"name": "MyClass",
"type": "record",
"namespace": "com.acme.avro",
"fields": [
{
"name": "NR",
"type": "int"
},
{
"name": "NAME",
"type": "string"
},
{
"name": "CREATED",
"type": "string"
}
]
}
Dieses Schema muss in dem "Schema Text" Property eingetragen werden. Damit unser vordefiniertes Schema benutzt wird, muss in "Schema Access Strategy" der Eintrag "Use Schema Text Property" ausgewählt werden.
Für den Record Reader und Record Writer brauchen wir jeweils einen JsonTreeReader und JsonRecordSetWriter Controller Service. Die Einstellungen beider Controller Services dürfen auf den Standardwerten bleiben.
Die resultierende Konfiguration des ValidateRecord-Prozessors sollte schlussendlich so aussehen:
Die validen Einträge werden entlang der "valid"-Relation zum PutDatabaseRecord-Prozessor geschickt. Dieser Prozessor wird die Station in die Postgres Datenbank schreiben.
Für die Konfiguration brauchen wir zwei Controller Services: JsonTreeReader und Database Connection Pooling Service.
Wir müssen beim JsonTreeReader nur eine Änderung im Date Format vornehmen. Zusätzlich bietet es sich an, den Namen entsprechend zu setzen, damit wir ihn von anderen Readern bei der Auswahl unterscheiden können.
3.3 Anfragen in die Datenbank schreiben
Für den Database Connection Pooling Service legen wir einen DBCPConnectionPool Controller Service an. Wir müssen die "Database Connection URL" und die "Database Driver Class Name" in seinen Einstellungen setzen:
Database Connection URL: jdbc:postgresql:bahn Database Driver Class Name: org.postgresql.Driver
Die "success Relationship" verbinden wir mit dem HandleHttpResponse-Prozessor mit dem Code 200. Die Fehlerfälle verbinden wir folglich in dem anderen.
Am Ende sollte sich folgender Flow ergeben:
Nun können wir den REST-Service ausführen, sobald der Postgres-Datenbankserver läuft.
Postgres installieren, testen und Tabellen anzeigen
Mit einer lokalen Postgres-Installation können wir unseren REST-Service direkt testen. Der Server muss eine Datenbank "bahn" enthalten und eine Tabelle "stations".
Für die Installation kann dieses Tutorial für das MAC-Betriebssystem verwendet werden.
Postgres-Server starten:
pg_ctl -D /usr/local/var/postgres start && brew services start postgresql
Postgres command line aufrufen:
psql postgres
Tabelle namens "bahn" anlegen:
create database bahn;
Alle Datenbanken anzeigen:
\dt
Mit der "bahn"-Datenbank verbinden:
\dc bahn
und dort die Tabelle "stations" mit drei Feldern anlegen:
CREATE TABLE
stations(NR int, NAME varchar(255), CREATED DATE);
Nachdem der Flow ausgeführt wurde, kann man sich die Liste der neuen Bahnhöfe anzeigen lassen:
Select * from stations;
3.5 Ergebnisse des REST Service in der Datenbank prüfen
Wir können unseren REST-Service nun testen, indem wir den Flow (siehe Fertiger Flow aus Teil 2) ausführen. Nach einem Durchlauf sollte sich die "stations"-Tabelle mit den neuen Bahnhöfen füllen:
bahn=# select * from stations; nr | name | created ---------+---------------------------------------+------------ 8002535 | Halver-Oberbrügge | 2020-11-17 8012273 | Leipzig Mockauer Straße | 2020-11-17 8012797 | Roßleben | 2020-11-17 8001510 | Dornstetten-Aach | 2020-11-17 8003105 | Jaderberg | 2020-11-17 8001944 | Eutingen Nord | 2020-11-17 8001966 | Feldolling | 2020-11-17 8011404 | Donndorf(Unstrut) | 2020-11-17 8003305 | Reken-Klein Reken | 2020-11-17 8003074 | Ingolstadt Audi | 2020-11-17 8002551 | Hamburg Elbbrücken | 2020-11-17 8071096 | Rheinsberg Stechlinsee | 2020-11-17 8001723 | Einbeck Otto-Hahn-Straße | 2020-11-17 8010340 | Straßgräbchen-Bernsdorf | 2020-11-17 8005169 | Rosenheim Aicherpark | 2020-11-17 8002060 | Frankfurt(Main)-Gateway Gardens | 2020-11-17 8003297 | Kierspe | 2020-11-17 8005933 | Stettfeld-Weiher | 2020-11-17 8013241 | Waßmannsdorf | 2020-11-17 8004371 | Nörvenich-Rommelsheim | 2020-11-17 8011605 | Bernburg-Roschwitz | 2020-11-17 8013301 | Wiednitz | 2020-11-17 8011201 | Flughafen BER - Terminal 1-2 | 2020-11-17 8089201 | Flughafen BER - Terminal 1-2 (S-Bahn) | 2020-11-17 (25 rows)