Replication
Dieser Dienst repliziert Daten aus einem RDBMS nach Elasticsearch.
Automatische Migration der Replikationseinstellungen Wird die BPC Version >= 3.1 zum ersten Mal gestartet, dann werden evtl. vorhandene Replikationseinstellungen automatisch für das neue Modul ‘Replication’ migriert. Wenn alle Replikationsjobs korrekt übernommen wurden, dann bitte - um Verwirrungen und Missverständnisse zu vermeiden - die beiden alten Replikations-Einstellungen (Core Services → Einstellungen) “Replication_Jobs” und “Replication_Threads” löschen. Diese sind dort mit orangen Balken markiert. |
Auswirkung auf die System-Performance Die Replikation hat direkten Einfluss auf die System-Performance. Dabei spielen insbesondere die Anzahl der Threads (Replication → Einstellungen → Replication_Threads) und der Intervall (replicationInterval) ein Rolle. Je mehr Jobs in kurzer Zeit laufen, desto höher ist die CPU Auslastung. Das gilt insbesondere, wenn viele Daten repliziert werden. Die replicationBlockSize wirkt sich zwar positiv auf die Replikationsdauer aus, hat aber auch direkten Einfluss auf den Speicherbedarf von Apache Karaf. |
Vorbereitung zur Datensatzaktualität
Vor der Einrichtung der Replikationen sollten ein paar Voraussetzungen geschaffen werden. Die Replikation benötigt für jede Tabelle (Log wie auch Childlog) eine Spalte mit ausschließlich anwachsenden Timestamps, da dieser Timestamp verwendet wird, um anschließend neuere/aktualisierte Datensätze zu finden. Loggen unterschiedliche Prozesse, Server, … also parallel in die DB und somit eventuell (auch nur um Millisekunden) zeitversetzt, kann es passieren, dass einzelne Datensätze nicht im BPC angezeigt werden, da diese mit einem Timestamp älter als dem neuesten replizierten committet werden.
Es empfiehlt sich daher:
-
nicht die Serverzeit des loggenden, sondern die des DB-Servers zu loggen, da unterschiedliche Prozess-Server eventuell leicht abweichende Uhrzeiten haben
-
sicherzustellen, dass wirklich bei jedem insert oder update diese Spalte neu geschrieben wird
Beides kann einfach über eine zusätzliche (versteckte) Spalte mit Default-Value und zugehörigem Trigger erfolgen. Das hat den Vorteil, dass sich für den Loggingprozess überhaupt nichts ändert)
Hinzufügen einer technischen TIMESTAMP Spalte für Replikation
Oracle
Für PM-Log und Childlog (ohne Prefix) unter Oracle sieht das Vorgehen zum Anlegen dann beispielsweise so aus:
--LOG:
--Spalte (mit 6Nachkommastellen Präzission) ergänzen, Defaultvalue für Inserts auf die aktuelle Uhrzeit in UTC setzen (damit ist keine Anpassung im Loggingprozess nötig)
ALTER TABLE LOG ADD (DB_UPDATE_TS TIMESTAMP DEFAULT SYSTIMESTAMP AT TIME ZONE 'UTC' NOT NULL);
--Bestehende Logeinsträge wieder "verteilen", sonst kommt die Replikation nicht gut klar:
--Entweder alle in einem Block (!besser nicht mit Tabellen > 1mio Einträge!):
UPDATE LOG SET DB_UPDATE_TS = TIMESTAMP;
Commit;
--oder bei großen Datenmengen und Livesystemen, die auch noch was mit den Tabellen machen wollen, als anonymer PL/SQL-Block mit wenig Undo-Tablespacebedarf:
--hierfür sollte zusätzlich ein Index auf der Timestamp-Spalte liegen um FullTableScans zu vermeiden!
declare begin
FOR counter IN 0 .. 3650 LOOP
--dbms_output.put_line(to_char(to_date('2010-01-01', 'YYYY-MM-DD') + counter, 'YYYY-MM-DD') || ' - ' || to_char(to_date('2010-01-01', 'YYYY-MM-DD') + 1 + counter, 'YYYY-MM-DD'));
--LOG:
update log set DB_UPDATE_TS = TIMESTAMP where TIMESTAMP between to_date('2010-01-01', 'YYYY-MM-DD') + counter and to_date('2010-01-01', 'YYYY-MM-DD') + 1 + counter;
commit;
--CHILDLOG:
update childlog set DB_UPDATE_TS = TIMESTAMP where TIMESTAMP between to_date('2010-01-01', 'YYYY-MM-DD') + counter and to_date('2010-01-01', 'YYYY-MM-DD') + 1 + counter;
commit;
END LOOP;
end;
--Einen Trigger anlegen, der die Spalte bei jedem Update neu setzt (damit ist keine Anpassung im Loggingprozess nötig)
CREATE OR REPLACE
TRIGGER LOG_DB_UPDATE_TS
BEFORE UPDATE ON LOG
REFERENCING NEW AS NEW OLD AS OLD
FOR EACH ROW
DECLARE
BEGIN
:NEW.DB_UPDATE_TS := SYSTIMESTAMP AT TIME ZONE 'UTC';
END;
/
--Index erstellen:
CREATE INDEX IDX_LOG_DBLU ON LOG(DB_UPDATE_TS) COMPUTE STATISTICS;
--Das ganze Geraffel noch mal für Childlog:
ALTER TABLE CHILDLOG ADD (DB_UPDATE_TS TIMESTAMP DEFAULT SYSTIMESTAMP AT TIME ZONE 'UTC' NOT NULL);
--Siehe ggf. PLSQL-Block oben!
UPDATE CHILDLOG SET DB_UPDATE_TS = TIMESTAMP;
commit;
CREATE OR REPLACE
TRIGGER CHILDLOG_DB_UPDATE_TS
BEFORE UPDATE ON CHILDLOG
REFERENCING NEW AS NEW OLD AS OLD
FOR EACH ROW
DECLARE
BEGIN
:NEW.DB_UPDATE_TS := SYSTIMESTAMP AT TIME ZONE 'UTC';
END;
/
CREATE INDEX IDX_CHILDLOG_DBLU ON CHILDLOG(DB_UPDATE_TS) COMPUTE STATISTICS;
--Fertig
MSSQL
|
Der Trigger für Childlog fehlt. Normalerweise wird dieser aber nicht benötigt, da die Einträge immer nur hinzugefügt und nicht aktualisiert werden. |
/* Spalten hinzufügen. DATETIME2 für bessere Auflösung als DATETIME. Offensichtlich ist MSSQL nicht millisekundengenau, daher statt current_timestamp die Funktion SYSDATETIME() verwenden, diese ist nanosekundengenau. */
ALTER TABLE [LOG] ADD DB_UPDATE_TS DATETIME2 DEFAULT SYSDATETIME() NOT NULL;
GO
ALTER TABLE [CHILDLOG] ADD DB_UPDATE_TS DATETIME2 DEFAULT SYSDATETIME() NOT NULL;
GO
/* Spalten initial füllen */
BEGIN
UPDATE [LOG] SET DB_UPDATE_TS = [timestamp];
END
GO
BEGIN
UPDATE [CHILDLOG] SET DB_UPDATE_TS = [timestamp];
END
GO
/* Trigger */
CREATE TRIGGER LOG_DB_UPDATE_TS
ON [LOG]
AFTER UPDATE
AS
BEGIN
IF NOT UPDATE(DB_UPDATE_TS)
BEGIN
UPDATE t
SET t.DB_UPDATE_TS = SYSDATETIME()
FROM [LOG] AS t
INNER JOIN inserted AS i
ON t.PROCESSID = i.PROCESSID;
END
END
GO
/* Indizes */
CREATE INDEX IDX_LOG_DBLU ON LOG (DB_UPDATE_TS);
CREATE INDEX IDX_CHILDLOG_DBLU ON CHILDLOG (DB_UPDATE_TS);
MySQL
und für MySQL ganz einfach:
# Fügt der Tabelle einen von der DB verwalteten ausreichend genauen Timestamp hinzu
ALTER TABLE LOG ADD DB_UPDATE_TS TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6)
ALTER TABLE CHILDLOG ADD DB_UPDATE_TS TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6)
PostgreSQL
und für PostgreSQL auch über Trigger:
CREATE OR REPLACE FUNCTION update_db_update_ts_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.db_update_ts = timezone('UTC', now());
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_tc_log_db_update_ts BEFORE INSERT OR UPDATE ON tc_log FOR EACH ROW EXECUTE PROCEDURE update_db_update_ts_column();
CREATE TRIGGER update_tc_childlog_db_update_ts BEFORE INSERT OR UPDATE ON tc_childlog FOR EACH ROW EXECUTE PROCEDURE update_db_update_ts_column();
Konfiguration
Datenquellen
Datenquellen sind Verbindungen zu einzelnen Datenbanken. Die einzelnen Replikationsjobs verwenden/referenzieren diese dann. Diese werden unter Backend Connections vom Typ "data_source" eingerichtet und anhand der Komponenten ID referenziert.
Oberfläche
Für das Einrichten der einzelnen Replikation-Jobs steht eine eigene Oberfläche unter Einstellungen → Replication → Komponenten → Editor bereit. Über die Oberfläche können Einträge erzeugt, gelöscht, dupliziert und auch einzeln aktiviert/deaktiviert werden.
Konfigurationsparameter des Replication Moduls
Folgend werden die verschiedenen Parameter und damit verknüpften Funktionen beschrieben. Diese sind unter BPC Administration → Replication → Allgemein zu finden.
Setting (Key) | Datentyp | Beschreibung |
---|---|---|
Replication_Threads |
int |
Anzahl der Threads die zur gleichzeitigen Ausführung der Replication Jobs verwendet werden. |
Replication_Templates |
JSON |
Zur Verfügungstellung von Vorlagen, welche bei den einzelnen Replikationsjobs dann ausgewählt werden können ("Template laden"). Um darüber dann zum Beispiel die Felder "Index Settings", "Index Mappings" und "Dynamic Templates" zu füllen. |
Logs_Enabled |
boolean |
Legt fest, ob die Durchläufe der Replication-Jobs (wenn dort aktiviert) in den speziellen Elasticsearch Index 'bpc-replicationjobs-log' geloggt werden sollen oder nicht. |
Logs_BulkWritePeriodInSeconds |
int |
Angabe in Sekunden. Nach jedem Replikationsdurchlauf diese zu loggen würde sich negativ auf die Gesamtperformance des Systems auswirken. Deshalb werden die Logeinträge gesammelt und nach dieser Zeit im Bulk geschrieben. |
Logs_CleanupPeriodInMinutes |
int |
Angabe in Minuten. Intervall in dem die Bereinigung des 'bpc-replicationjobs-log' Index erfolgen soll. |
Logs_DeleteEntriesOlderThan |
text |
Angabe, wie lange Dokumente im 'bpc-replicationjobs-log' Index enthalten sein sollen.
Kommt bei der Bereinigung zum Einsatz.
Beispiel: |
Konfigurationsparameter eines Replication Jobs
Folgend werden die verschiedenen Parameter und damit verknüpften Funktionen beschrieben. Diese sind unter BPC Administration → Replication → Komponenten zu finden. Es wird empfohlen, die spezialisierte Oberfläche zu verwenden: BPC Administration → Replication → Editor
Grundeinstellungen
Setting (Key) | Datentyp | Beschreibung |
---|---|---|
Replication_Enabled |
boolean |
Den Replikationsjob aktivieren/deaktivieren |
Replication_StartDate |
String |
Replikation von Datensätzen die neuer sind als dieser Zeitpunkt. oder als relativer Wert im Format:
Beispiele
|
Quelle / Source
Setting (Key) | Datentyp | Beschreibung | ||
---|---|---|---|---|
Source_DataSource |
String |
Zu verwendende Datenquelle (ID der Backend Connections vom Typ |
||
Source_Table |
String |
Tabellenname der Quelle |
||
Source_Timezone |
String |
Zeitzone der in der Quell-Datenbanktabelle verwendeten Datum-Felder (verwendet intern TimeZone.getTimeZone). Wird nur auf die eigentlichen Daten angewandt und nicht auf die der 'lastUpdateColumn'-Spalte. Beispiele
|
||
Source_IdColumns |
String |
Spalten für die Bildung eines eindeutigen Schlüssels im ElasticSearch.
Zum Beispiel: |
||
Source_LastUpdateColumn |
String |
Diese Spalte wird für die Ermittlung des Alters des Datensatzes herangezogen
|
||
Source_LastUpdateColumnTimezone |
String |
Zeitzone, welche in der Quell-Datenbanktabelle für die Daten in der
|
||
Source_QueryTimeoutInSeconds |
Integer |
Legt fest, wie lange der JDBC-Treiber auf eine Rückantwort der DB wartet. Siehe auch JDBC java.sql.Statement.setQueryTimeout() Beispiele
|
Ziel / Target
Setting (Key) | Datentyp | Beschreibung |
---|---|---|
Target_Index |
String |
Ziel-Index im Elasticsearch. |
Target_CaseSensitivityOfFields |
String |
Legt fest, wie die Felder in Elasticsearch angelegt werden sollen (Groß-/Kleinschreibung).
|
Target_IndexCreationSettings |
JSON |
Dem Ziel-Index bei Erstellung abweichende Settings vergeben.
Index Sorting Beispiel. Es wurden die Einstellungen von Core Services Einstellung → Core_IndexCreationSettings als Grundlage verwendet und mit den Index Sorting Einstellungen angereichert.
Wird das Index Sorting verwendet, dann müssen für die angegebenen Sortierungsfelder (LASTUPDATE in dem Beispiel) auch gleich ES Mappings angelegt werden (siehe Target_IndexMappings) Vollständiges Beispiel
|
Target_IndexMappings |
JSON |
Dem Ziel-Index bei Erstellung ein Mapping vergeben. Sollte nur in bestimmten Fällen notwendig sein. Das Mapping für das Index Sorting Beispiel von oben.
|
Target_IndexDynamicTemplates |
JSON |
Dem Ziel-Index ein maßgeschneidertes Mapping zuweisen. Ist dies gesetzt, dann wird das Globale (siehe Core Einstellung → Core_IndexDynamicTemplates) nicht verwendet. In der Elasticsearch Dokumentation gibt es mehr Infos zu den Möglichkeiten der Dynamic Templates. Bei dem folgenden Beispiel werden alle Felder die Elasticsearch als Textfelder (Strings) erkennt mit einem Mapping versehen ("alle_textfelder") bei dem der Inhalt nicht analysiert wird (spart Speicherplatz und die Daten können trotzdem noch angezeigt werden). Plus einer Ausnahme ("spezialfall"): Für alle Textfelder welche den Namenspostfix 'name' haben wird unser Standard Mapping verwendet. Beispiel
|
Erweiterte Einstellungen / Advanced
Schattenkopien
Bei einer Schattenkopie werden zu dem festgelegten Zeitpunkt alle Dokumente (die nach dem festgelegten 'replicationStartDate' liegen) des Elasticsearch Index (siehe 'targetIndex') in einen neuen Index kopiert. Am Ende wird der Alias auf den neuen Index umgebogen und der bisherige Index gelöscht. Dies kann zum Beispiel einmal in der Woche durchgeführt werden, um wieder an einen schlanken Index zu kommen.
Zusatzhinweis: Die in Elasticsearch als gelöscht markierten Dokumente (entweder selbst manuell gelöscht oder durch den Tail Sync) werden natürlich auch nicht mit in den neuen Index übernommen. Achtung: Der zugehörige Replikationslauf wird solange ausgesetzt, bis die Schattenkopie erstellt wurde.
Setting (Key) | Datentyp | Beschreibung |
---|---|---|
ShadowCopy_Enabled |
boolean |
Erstellung von Schattenkopien aktivieren (shadow copy). |
ShadowCopy_CronPattern |
String |
Cron like Pattern zur Festlegung des Zeitpunktes wann die Schattenkopien erstellt werden sollen. Beispiele
Weitere Beispiele und die Dokumentation sind auf der Quartz-Scheduler Webseite zu finden. |
ShadowCopy_KeepCopiesCount |
Integer |
Anzahl der Schattenkopien die vorgehalten werden sollen.
|
Tail Sync
Der 'Tail Sync' arbeitet auf dem aktuellen Index (siehe 'targetIndex') und synchronisiert älterer Daten (neue/geänderte/gelöschte Datenbanksätze) mit Elasticsearch, sodass diese wieder mit der Datenbank übereinstimmen. Dies kann z.B. jede Nacht einmal durchgeführt werden. Dokumente, die älter als das Startdatum der Replikation sind (siehe 'replicationStartDate'), werden erst einmal aus dem Elasticsearch Index gelöscht. Danach geht er Blockweise (Default 10 Tages-Schritte) die Datenbank durch und gleicht diese mit Elasticsearch ab. Dabei werden neue/geänderte Datenbanksätze in den Elasticsearch Index übernommen und Dokumente, die nicht mehr in der Datenbank existieren, werden in Elasticsearch gelöscht. Der Tail Sync arbeitet nur mit den festgelegten 'idColumns' und 'lastUpdateColumn' Feldern (Hint: passender DB-Index hilft Wunder!). Die kompletten Daten werden nur bei neuen/geänderten Datenbanksätzen ausgelesen.
Der Sync läuft nicht bis zum aktuellen Datum (siehe |
Setting (Key) | Datentyp | Beschreibung | ||
---|---|---|---|---|
TailSync_Enabled |
boolean |
Den 'Tail Sync' aktivieren |
||
TailSync_CronPattern |
String |
Cron like Pattern zur Festlegung des Zeitpunktes wann der Tail Sync gestartet werden soll. Beispiele
Weitere Beispiele und die Dokumentation sind auf der Quartz-Scheduler Webseite zu finden. |
||
TailSync_BlockSize |
Integer |
Blockgröße des Datenbanktreibers.
Beachte den Hinweis zu der Option |
||
TailSync_RelativeStartDate |
String |
Das relativ angegebene Startdatum. Ab diesem Zeitpunkt soll dann synchronisiert werden. Daten, die vor dem Startdatum der Replikation liegen, werden auch weiterhin gelöscht. Dieses Datum wird nur verwendet, wenn es nach dem regulären Startdatum und vor dem Enddatum liegt.
|
||
TailSync_RelativeEndDate |
String |
Das relativ angegebene Enddatum. Bis zu diesem Zeitpunkt soll dann synchronisiert werden und nicht weiter.
|
||
TailSync_BlockDayRange |
Integer |
Anzahl der Tage (Block) in denen die Daten abgearbeitet werden sollen.
Siehe auch |
||
TailSync_RelativeDeleteOlderThanDate |
String |
Das relativ anzugebende Löschdatum. Alle Dokumente, die älter sind, werden gelöscht. Wird diese Option nicht gesetzt, dann wird sie auch nicht ausgeführt. |
Consistency Check
Nach den Replikationsläufen wird ein einfacher Konsistenzcheck durchgeführt. Dabei werden die Anzahl Dokumente welche sich in der Quelle und dem Ziel befinden verglichen. Und zwar in dem Zeitraum 'replicationStartDate' und dem letzten Datum welches sich im Ziel (Elasticsearch) befindet.
Bei vielen Daten (zig Millionen von Sätzen) kann es zu erhöhter Last auf dem System kommen und eine initiale Replikation extrem verlangsamt werden. Aktivieren Sie den Consistency Check nur, wenn Sie sicher sind, dass die Daten vollständig repliziert werden. |
Setting (Key) | Datentyp | Beschreibung |
---|---|---|
ConsistencyCheck_Frequency |
Integer |
Die Frequenz, mit welcher der Konsistenzcheck durchgeführt wird. Beispiele:
|
Lookup Joins
Können verwendet werden, um zu replizierende Dokumente mit zusätzlichen Daten anzureichern. Wenn z.B. in den zu replizierenden Daten nur eine Partner-ID steht und noch der Name des Partners etc. im Monitor benötigt wird.
Man verwendet unter Elasticsearch eine Denormalisierung, das heißt, diese Daten werden mit in das Dokument übernommen und stehen wie alle anderen Daten zur Verfügung (performante Suche; Aggregation, …). Voraussetzung: Die Lookup-"Tabellen" müssen als eigenständige Indexe zur Verfügung stehen und können z.B. über eine zusätzliche Replikation von einer DB-Tabelle übernommen werden.
Die Lookup-Daten können manuell ( BPC-Einstellungen → Übersicht → Status → Replikation → Jobs → Job → Lookup Joins synchronisieren ) sowie automatisch mithilfe unseres Elasticsearch BPC Plugin (es-bpc-plugin) aktualisiert werden.
Es können mehrere Lookup-Tabellen referenziert werden, unten werden die möglichen Werte eines EINTRAGs beschrieben. Aufbau: "join": [ { EINTRAG }, { EINTRAG }, … ]
Damit der Wertevergleich funktioniert, müssen die Spaltentypen in den Datenbanktabellen identisch sein. Wenn es sich um String-Felder handelt, sollte "*.raw" als lookupKeyField verwendet werden, bei Zahlfeldern hingegen ohne das ".raw" |
[
{
"keyField": "PARTNER",
"lookupIndex": "lookup-partner",
"lookupKeyField": "ID.raw",
"resultFieldsPrefix": "partner_",
"resultFieldsExcluded": [ "ID", "LASTUPDATE" ]
},
{
"keyField": "MESSAGETYPE",
"lookupIndex": "lookup-messagetype",
"lookupKeyField": "ID.raw",
"resultFieldsPrefix": "messagetype_",
"resultFieldsExcluded": [ "ID", "LASTUPDATE" ]
}
]
Feld | Datentyp | Beschreibung |
---|---|---|
keyField |
String |
Das Schlüsselfeld in der zu replizierenden Tabelle. Über den Wert des Feldes werden die Daten aus dem Lookup-Index übernommen. Beispiel: PARTNER_ID |
lookupIndex |
String |
Der Elasticsearch Index mit den Lookup-Daten. Beispiel: lookup-partner |
lookupKeyField |
String |
Beispiel: ID.raw |
resultFieldsPrefix |
String |
Die zu übernehmenden Felder aus dem Lookup-Index müssen mit einem eindeutigen Prefix versehen werden, sodass es zu keinem Konflikt mit existierenden Feldern kommt. Beispiel: partner_ |
resultFieldsIncluded |
String Array |
Sollen nicht alle Felder aus dem Lookup-Index übernommen werden, können die zu übernehmenden Felder hier festgelegt werden. Beispiel: [ "FIRST_NAME", "LAST_NAME" ] |
resultFieldsExcluded |
String Array |
Sollen fast alle Felder übernommen werden, dann können hier die auszuschließenden festgelegt werden. Beispiel: [ "ID", "UPDATED" ] |
Replication Jobs Logging
Die Durchläufe der Replication Jobs können geloggt und zum Beispiel über den automatisch angelegten Monitor “Replication Jobs Monitor” angezeigt werden. Dieser Monitor wird beim Start angelegt, wenn er nicht vorhanden sein sollte. Er kann also nicht permanent gelöscht werden. Ebenso wird der verwendete Index ‘bpc-replicationjobs-log’ automatisch angelegt, wenn ein Durchlauf geloggt werden muss.
Das Logging kann global sowie je Replication Job an- und ausgeschalten werden (siehe entsprechende Einstellungen oben). Im Auslieferungszustand ist das Logging global aktiviert und das der Replication Jobs deaktiviert. Es wird also erst einmal nichts geloggt!
Die Log-Einträge werden im Bulk nach einiger Zeit in den erwähnten Index geschrieben. Das hat wenig Einfluss auf die Gesamtperformance des Systems. Allerdings können sehr viel Einträge in kürzester Zeit erzeugt werden (Millionen in ein paar Stunden), deshalb den Cleanup vielleicht nicht zu selten ausführen und ein Auge auf die Anzahl der darin enthaltenen Dokumente behalten: Einstellungen → Core Services → Indizes