How-Tos

In diesem Dokument werden verschiedene Problemstellungen mit ihren Lösungen präsentiert. Es gibt eine kompakte Beschreibung, sowie die herunterladbare IGUASU-Process Group.

Das so heruntergeladene JSON kann dann für das Erstellen einer neuen Process Group hochgeladen werden. Sie können sich das Beispiel im Produkt ansehen, erweitern etc.

In den Process Groups wird oft der Processor GenerateFlowFile verwendet, da viele Processors übermittelte FlowFiles für die Ausführung benötigt. Dieser dient nur dazu, einen Strang im Test - typischerweise über RUN ONCE - zu starten.

Manchmal kann es auch sinnvoll sein, ihn über das Kommando Start für einige Zeit in den Zustand RUNNING zu setzen. Dies kann aber je nach Einstellung sehr viele FlowFiles erzeugen und ist dann typischerweise im Text erläutert.

Nach dem Import sind diese Processors disabled, damit man sie nicht versehentlich startet. Der jeweilige Processor muss dann vor dem Start (meist RUN ONCE) noch auf enabled gesetzt werden.

Für die möglichen Ausgänge der Processors wird oft das Funnel verbunden. Dies soll zur Verdeutlichung der verwendeten Ausgänge dienen und die Daten direkt einsehbar machen.

Database

Process Group als JSON herunterladen: Database.json

Es wird gezeigt, wie einfache Operationen auf SQL-Datenbanken in IGUASU umgesetzt werden. Die Datenbank, auf der operiert werden soll, wird in einem DBPCPConnectionPool-Service definiert. Dieser liegt an der Process Group vor ("H2 file-based Database").

Für die Testzwecke innerhalb dieses How-Tos wird eine einfache, File-basierte H2-Datenbank verwendet, da diese direkt nutzbar ist.
Laden Sie dazu die neueste Version des H2 Treibers als 'Binary JAR' herunter.

Für das Hochladen von Datenbank-Treibern ist eine entsprechende Berechtigung notwendig.

Dieses JAR laden sie dann in den Settings unter Drivers in IGUASU hoch. Sie steht danach in dem DBPCConnectionPool Service zur Verfügung.

Zusätzlich sind an der Process Group noch Services zum Lesen und Schreiben von JSON zu finden. Diese dienen dazu, für Processors, die auf "Records" arbeiten, die internen Records in JSON zu überführen bzw. die Records aus JSON zu erstellen.

Es gibt in IGUASU verschiedene Möglichkeiten mit SQL-Datenbanken zu arbeiten. Hier werden nur ein paar ausgesuchte Varianten vorgestellt.

Die verschiedenen vorgestellten Operationen sind im IGUASU über Labels mit Nummern versehen:

1. Tabelle ADDRESS anlegen

Erzeugt über eine fest im ExecuteSQL-Processor hinterlegte CREATE-Query die ADDRESS-Tabelle. Das Feld, in dem das SQL steht ("SQL select query"), ist ein Feld, das die Nutzung der Expression Language erlaubt. Die Query könnte also auch dynamisch aufgebaut werden.

2. Tabelle ADDRESS entfernen

Wie zuvor wird ein statisches SQL verwendet, um die Tabelle zu entfernen.

3. Tabellen in der Datenbank auflisten

Der Processor ListDatabaseTables wird verwendet, um alle Tabellen in der Datenbank aufzulisten. Hier ist zu beachten, dass der Processor keine Eingänge erlaubt. Es kommt also darauf an, wie oft er läuft - dies wird unter "Run Schedule" in den Settings eingestellt. Per Default ist dieser Parameter auf "0" eingestellt, läuft also ständig. Würde er auf "0" belassen, dürfte die Belastung des Systems hoch sein, wenn er im Zustand RUNNING ist.

4. Einfügen von Daten in ADDRESS

Im GenerateFlowFile steht ein fixes JSON mit einer Adresse. Die Namen der Felder entsprechen den Namen der Felder in der Datenbank - darüber erfolgt die Zuordnung. Der PutDatabaseRecord-Processor ist mit einem "Record Reader" konfiguriert, der das Lesen eines JSON als Records erlaubt. Die konfigurierte Tabelle ist ADDRESS. Es könnten so auch viele Adressen angelegt werden, da ein JSON Array gelesen wird.

5. Lesen von Daten aus ADDRESS

Es werden alle Daten der Tabelle ADDRESS über ein fixes SQL im ExecuteSQLRecord gelesen. Der Unterschied des ExecuteSQLRecord zum in 1. und 2. verwendeten ExecuteSQL besteht darin, dass das Ergebnis direkt über einen RecordWriter ausgegeben wird. Hier ist ein Writer konfiguriert, der es als JSON weiterreicht.

6. Datenbank-Zugriff über Groovy

Hier wird die Möglichkeit gezeigt, direkt aus Groovy-Skripten heraus auf Datenbanken zuzugreifen. Dies sollte mit Bedacht und nur dann gemacht werden, wenn es mit den sonstigen Mitteln nicht möglich ist. Die Fehleranfälligkeit ist dabei größer.

Das besondere ist hier, das an dem ExecuteGroovyScript-Processor dynamische Properties konfiguriert werden können, die mit definierten Prefixes besondere Funktionen ausfüllen. Wird ein Prefix SQL.* verwendet, sind dort die erreichbaren DBPCPConnectionPool-Services zur Auswahl. Ebenso einfach kann dieser Service dann im Skript verwendet werden.
Die Beschreibung dieser und weiterer Properties sind in den "Additional Details…​" zu finden, die an der Beschreibung des Processors in der Properties-Sicht verlinkt sind.

Enrichment

Process Group als JSON herunterladen: Enrichment.json

Eine typische Herausforderung bei der Erstellung von Flows ist es, dass Daten einerseits weiterverarbeitet und angereichert oder umgewandelt werden sollen, andererseits aber später auch in der Ursprungsform benötigt werden.

Hierzu gibt es in IGUASU die beiden Processors ForkEnrichment, sowie JoinEnrichment. Sie bilden ein Paar, indem ForkEnrichment die Ausgänge original und enrichment hat. Diese stellen dann die beiden Branches dar, die in JoinEnrichment wieder zusammengeführt werden.

JoinEnrichment verwendet dazu Records und entsprechende Reader/Writer. Beispielsweise kann er damit JSON oder XML zusammenfügen (weitere Record-Formate sind etwa Avro, CEF, CSV, Parquet). Zum Zusammenfügen gibt es verschiedene Optionen - im einfachsten Fall werden die Dokumente der beiden Branches einfach komplett in ein neues Dokument zusammengefügt.

In dem Download-Beispiel wird ein einfaches JSON mit dem Inhalt "data": "value one" hereingereicht. Im enrichment-Zweig wird dann gezählt, wie lang der Wert von data ist. Das Ergebnis ist ein weiteres JSON mit dem Inhalt "count": 9.
Diese beiden JSONs werden dann vom JoinEnrichment in ein Dokument überführt, sodass sowohl auf die Daten als auch auf die errechnete Länge zugegriffen werden kann.

Anstelle der einfachen Ermittlung der Länge könnte es auch den Aufruf eines Web Services, eine Datenbankanfrage oder ähnliches geben, um weitere Daten zu erheben, die man zusammen mit dem Original verarbeiten möchte.

Man kann für das gleiche Ergebnis auch MergeContent verwenden. Hier ist aber wesentlich mehr zu konfigurieren und beachten - beispielsweise spezifische Attribute, die über UpdateAttribute oder ähnliches gesetzt werden müssen. Es kann aber komplexere Szenarien geben, in denen auch MergeContent für ein Data Enrichment geeignet ist.

Metro

Process Group als JSON herunterladen: Metro.json

In der Process Group sind drei Beispiele abgebildet, in denen die Funktionsweise der Metro-Processors veranschaulicht werden. Während die einzelnen Metro-Processors umfangreich im Abschnitt Metro-Processors beschrieben sind, soll in diesem How-To die Anwendung erläutert werden.

Im Folgenden sind die drei Beispiele beschrieben, die durch die JSON-Datei importiert werden können.

1. FlowFiles zwischenspeichern und abrufen

Im ersten Beispiel werden FlowFiles generiert, zwischengespeichert und im späteren Verlauf erneut abgerufen.
Damit die generierten Daten zwischengespeichert werden können, muss allerdings zunächst ein FlowFile-Attribut für die Korrelation definiert werden, das zum späteren Abrufen der Daten genutzt werden kann. Nachdem das Attribut mit dem UpdateAttribute-Processor definiert wurde, wird der FlowFile zum einen mit dem PutMetro-Processor zwischengespeichert und zum anderen im Datenfluss weiterverarbeitet.

Bei der weiteren Verarbeitung wird der vorhandene Content durch einen neuen ersetzt, damit beim Abruf der zwischengespeicherten Daten ersichtlich ist, dass die Metro-Verbindung erfolgreich war. Dieser Abruf der zwischengespeicherten Daten erfolgt im letzten Schritt mit dem GetMetro-Processor, bei dem das zuvor definierte Attribut genutzt wird, um die gewünschten Daten erneut zu laden. Die Verbindung zwischen dem PutMetro- und dem GetMetro-Processor erfolgt hierbei über den MetroLineController-Service U1.

2. Multipler Abruf von Daten in der Metro

Im zweiten Fall ist ein sehr ähnlicher Datenfluss zum ersten Beispiel ersichtlich, wobei hier zwei GetMetro-Processors auf dieselben Daten zugreifen wollen. Der Abruf von Daten kann bei Metro-Processors nur einmal erfolgen, wodurch in diesem Beispiel der erste GetMetro-Processor erfolgreich Daten erhält, während der zweite Processor keine mehr finden kann und dadurch zur failure-Relation routet.

3. Verlassen der Metro-Linie

Zusätzlich zum Abruf der zwischengespeicherten Daten durch den GetMetro-Processor kann der ExitMetro-Processor genutzt werden, um Daten an einer zentralen Stelle zu erhalten. FlowFiles, die über denselben MetroLineController-Service zwischengespeichert wurden, können über den ExitMetro-Processor zusammengeführt werden, um beispielsweise eine bestimmte Fehlermeldung zu behandeln. Dieses Beispiel zeigt diese Funktionalität, indem zwei FlowFiles über unterschiedliche PutFile-Processors an demselben ExitMetro-Processor empfangen werden.