How-Tos

In diesem Dokument werden verschiedene Problemstellungen mit ihren Lösungen präsentiert. Es gibt eine kompakte Beschreibung, sowie die herunterladbare IGUASU-Prozessgruppe.

Das so heruntergeladene JSON kann dann für das Erstellen einer neuen Prozessgruppe hochgeladen werden. Sie können sich das Beispiel im Produkt ansehen, erweitern etc.

In den Prozessgruppen wird oft der Prozessor GenerateFlowFile verwendet, da viele Prozessoren übermittelte FlowFiles für die Ausführung benötigt. Dieser dient nur dazu, einen Strang im Test - typischerweise über 'RUN ONCE' - zu starten.
Manchmal kann es auch sinnvoll sein, ihn für einige Zeit in 'RUNNING' zu setzen. Dies kann aber je nach Einstellung sehr viele FlowFiles erzeugen und ist dann typischerweise im Text erläutert.

Nach dem Import sind diese Prozessoren 'disabled', damit man sie nicht versehentlich startet. Der jeweilige Prozessor muss dann vor dem Start (meist 'RUN ONCE') noch auf enabled gesetzt werden.

Für die möglichen Ausgänge der Prozessoren wird oft das Funnel verbunden. Dies soll zur Verdeutlichung der verwendeten Ausgänge dienen und die Daten direkt einsehbar machen.

Database

Prozessgruppe als JSON herunterladen: Database.json

Es wird gezeigt, wie einfache Operationen auf SQL-Datenbanken in IGUASU umgesetzt werden. Die Datenbank, auf der operiert werden soll, wird in einem DBPCPConnectionPool-Service definiert. Dieser liegt an der Prozessgruppe vor ("H2 file-based Database").

Für die Testzwecke innerhalb dieses How-Tos wird eine einfache, File-basierte H2-Datenbank verwendet, da diese direkt nutzbar ist.
Laden Sie dazu die neueste Version des H2 Treibers als 'Binary JAR' herunter.

Für das Hochladen von Datenbank-Treibern ist eine entsprechende Berechtigung notwendig.

Dieses JAR laden sie dann in den Settings unter Drivers in IGUASU hoch. Sie steht danach in dem DBPCConnectionPool Service zur Verfügung.

Zusätzlich sind an der Prozessgruppe noch Services zum Lesen und Schreiben von JSON zu finden. Diese dienen dazu, für Prozessoren, die auf "Records" arbeiten, die internen Records in JSON zu überführen bzw. die Records aus JSON zu erstellen.

Es gibt in IGUASU verschiedene Möglichkeiten mit SQL-Datenbanken zu arbeiten. Hier werden nur ein paar ausgesuchte Varianten vorgestellt.

Die verschiedenen vorgestellten Operationen sind im IGUASU über Labels mit Nummern versehen:

1. Tabelle ADDRESS anlegen

Erzeugt über eine fest im ExecuteSQL-Prozessor hinterlegte CREATE-Query die ADDRESS-Tabelle. Das Feld, in dem das SQL steht ("SQL select query"), ist ein Feld, das die Nutzung der Expression Language erlaubt. Die Query könnte also auch dynamisch aufgebaut werden.

2. Tabelle ADDRESS entfernen

Wie zuvor wird ein statisches SQL verwendet, um die Tabelle zu entfernen.

3. Tabellen in der Datenbank auflisten

Der Prozessor ListDatabaseTables wird verwendet, um alle Tabellen in der Datenbank aufzulisten. Hier ist zu beachten, dass der Prozessor keine Eingänge erlaubt. Es kommt also darauf an, wie oft er läuft - dies wird unter "Run Schedule" in den Settings eingestellt. Per Default ist dieser Parameter auf "0" eingestellt, läuft also ständig. Würde er auf "0" belassen, dürfte die Belastung des Systems hoch sein, wenn er im RUNNING ist.

4. Einfügen von Daten in ADDRESS

Im GenerateFlowFile steht ein fixes JSON mit einer Adresse. Die Namen der Felder entsprechen den Namen der Felder in der Datenbank - darüber erfolgt die Zuordnung. Der PutDatabaseRecord-Prozessor ist mit einem "Record Reader" konfiguriert, der das Lesen eines JSON als Records erlaubt. Die konfigurierte Tabelle ist ADDRESS. Es könnten so auch viele Adressen angelegt werden, da ein JSON Array gelesen wird.

5. Lesen von Daten aus ADDRESS

Es werden alle Daten der Tabelle ADDRESS über ein fixes SQL im ExecuteSQLRecord gelesen. Der Unterschied des ExecuteSQLRecord zum in 1. und 2. verwendeten ExecuteSQL besteht darin, dass das Ergebnis direkt über einen RecordWriter ausgegeben wird. Hier ist ein Writer konfiguriert, der es als JSON weiterreicht.

6. Datenbank-Zugriff über Groovy

Hier wird die Möglichkeit gezeigt, direkt aus Groovy-Skripten heraus auf Datenbanken zuzugreifen. Dies sollte mit Bedacht und nur dann gemacht werden, wenn es mit den sonstigen Mitteln nicht möglich ist. Die Fehleranfälligkeit ist dabei größer.

Das besondere ist hier, das an dem ExecuteGroovyScript-Prozessor dynamische Properties konfiguriert werden können, die mit definierten Prefixes besondere Funktionen ausfüllen. Wird ein Prefix SQL.* verwendet, sind dort die erreichbaren DBPCPConnectionPool-Services zur Auswahl. Ebenso einfach kann dieser Service dann im Skript verwendet werden.
Die Beschreibung dieser und weiterer Properties sind in den "Additional Details…​" zu finden, die an der Beschreibung des Prozessors in der Properties-Sicht verlinkt sind.

Enrichment

Prozessgruppe als JSON herunterladen: Enrichment.json

Eine typische Herausforderung bei der Erstellung von Flows ist es, dass Daten einerseits weiterverarbeitet und angereichert oder umgewandelt werden sollen, andererseits aber später auch in der Ursprungsform benötigt werden.

Hierzu gibt es in IGUASU die beiden Prozessoren ForkEnrichment, sowie JoinEnrichment. Sie bilden ein Paar, indem ForkEnrichment die Ausgänge original und enrichment hat. Diese stellen dann die beiden Branches dar, die in JoinEnrichment wieder zusammengeführt werden.

JoinEnrichment verwendet dazu Records und entsprechende Reader/Writer. Beispielsweise kann er damit JSON oder XML zusammenfügen (weitere Record-Formate sind etwa Avro, CEF, CSV, Parquet). Zum Zusammenfügen gibt es verschiedene Optionen - im einfachsten Fall werden die Dokumente der beiden Branches einfach komplett in ein neues Dokument zusammengefügt.

In dem Download-Beispiel wird ein einfaches JSON mit dem Inhalt "data": "value one" hereingereicht. Im enrichment-Zweig wird dann gezählt, wie lang der Wert von data ist. Das Ergebnis ist ein weiteres JSON mit dem Inhalt "count": 9.
Diese beiden JSONs werden dann vom JoinEnrichment in ein Dokument überführt, sodass sowohl auf die Daten als auch auf die errechnete Länge zugegriffen werden kann.

Anstelle der einfachen Ermittlung der Länge könnte es auch den Aufruf eines Web Services, eine Datenbankanfrage oder ähnliches geben, um weitere Daten zu erheben, die man zusammen mit dem Original verarbeiten möchte.

Man kann für das gleiche Ergebnis auch MergeContent verwenden. Hier ist aber wesentlich mehr zu konfigurieren und beachten - beispielsweise spezifische Attribute, die über UpdateAttribute oder ähnliches gesetzt werden müssen. Es kann aber komplexere Szenarien geben, in denen auch MergeContent für ein Data Enrichment geeignet ist.

Metro

Prozessgruppe als JSON herunterladen: Metro.json

In der Prozessgruppe sind drei Beispiele abgebildet, in denen die Funktionsweise der Metro-Prozessoren veranschaulicht werden. Während die einzelnen Metro-Prozessoren umfangreich im Abschnitt selected-elements/selected-processors/selected-processors.adoc#metro beschrieben sind, soll in diesem How-To die Anwendung erläutert werden.

Im Folgenden sind die drei Beispiele beschrieben, die durch die JSON-Datei importiert werden können.

1. FlowFiles zwischenspeichern und abrufen

Im ersten Beispiel werden FlowFiles generiert, zwischengespeichert und im späteren Verlauf erneut abgerufen.
Damit die generierten Daten zwischengespeichert werden können, muss allerdings zunächst ein FlowFile-Attribut für die Korrelation definiert werden, das zum späteren Abrufen der Daten genutzt werden kann. Nachdem das Attribut mit dem UpdateAttribute-Prozessor definiert wurde, wird der FlowFile zum einen mit dem PutMetro-Prozessor zwischengespeichert und zum anderen im Datenfluss weiterverarbeitet.

Bei der weiteren Verarbeitung wird der vorhandene Content durch einen neuen ersetzt, damit beim Abruf der zwischengespeicherten Daten ersichtlich ist, dass die Metro-Verbindung erfolgreich war. Dieser Abruf der zwischengespeicherten Daten erfolgt im letzten Schritt mit dem GetMetro-Prozessor, bei dem das zuvor definierte Attribut genutzt wird, um die gewünschten Daten erneut zu laden. Die Verbindung zwischen dem PutMetro- und dem GetMetro-Prozessor erfolgt hierbei über den MetroLineController-Service U1.

2. Multipler Abruf von Daten in der Metro

Im zweiten Fall ist ein sehr ähnlicher Datenfluss zum ersten Beispiel ersichtlich, wobei hier zwei GetMetro-Prozessoren auf dieselben Daten zugreifen wollen. Der Abruf von Daten kann bei Metro-Prozessoren nur einmal erfolgen, wodurch in diesem Beispiel der erste GetMetro-Prozessor erfolgreich Daten erhält, während der zweite Prozessor keine mehr finden kann und dadurch zur failure-Relation routet.

3. Verlassen der Metro-Linie

Zusätzlich zum Abruf der zwischengespeicherten Daten durch den GetMetro-Prozessor kann der ExitMetro-Prozessor genutzt werden, um Daten an einer zentralen Stelle zu erhalten. FlowFiles, die über denselben MetroLineController-Service zwischengespeichert wurden, können über den ExitMetro-Prozessor zusammengeführt werden, um beispielsweise eine bestimmte Fehlermeldung zu behandeln. Dieses Beispiel zeigt diese Funktionalität, indem zwei FlowFiles über unterschiedliche PutFile-Prozessoren an demselben ExitMetro-Prozessor empfangen werden.