Replication
This service replicates data from an RDBMS to OpenSearch.
|
Automatic migration of replication settings If the BPC version >= 3.1 is started for the first time, any existing replication settings are automatically migrated for the new 'Replication' module. If all replication jobs have been transferred correctly, please delete the two old replication settings (Core Services → Settings) "Replication_Jobs" and "Replication_Threads" to avoid confusion and misunderstandings. These are marked with orange bars. |
|
Impact on system performance Replication has a direct impact on system performance. In particular, the number of threads (Replication → Settings → Replication_Threads) and the interval (replicationInterval) play a role. The more jobs run in a short time, the higher the CPU load. This applies in particular if a lot of data is replicated. Although the replicationBlockSize has a positive effect on the replication duration, it also has a direct influence on the memory requirements of Apache Karaf. |
Preparation for data record up-to-dateness
A few prerequisites should be created before setting up the replications. Replication requires a column with exclusively incrementing timestamps for each table (log as well as childlog), as this timestamp is used to subsequently find new/updated data records. Log different processes, servers, … log into the DB in parallel and thus possibly with a time delay (even if only milliseconds), it is possible that individual data records are not displayed in the BPC because they are committed with a timestamp older than the latest replicated one.
It is therefore recommended:
-
log not the server time of the logging server, but that of the DB server, as different process servers may have slightly different times
-
Ensure that this column is actually rewritten with every insert or update
Both can be done simply via an additional (hidden) column with default value and associated trigger. This has the advantage that nothing changes at all for the logging process)
Adding a technical TIMESTAMP column for replication
Oracle
For PM log and child log (without prefix) under Oracle, the procedure for creating then looks like this, for example:
--LOG:
--Spalte (mit 6Nachkommastellen Präzission) ergänzen, Defaultvalue für Inserts auf die aktuelle Uhrzeit in UTC setzen (damit ist keine Anpassung im Loggingprozess nötig)
ALTER TABLE LOG ADD (DB_UPDATE_TS TIMESTAMP DEFAULT SYSTIMESTAMP AT TIME ZONE 'UTC' NOT NULL);
--Distribute existing log entries again, otherwise replication will not work well:
--Either all in one block (!better not with tables > 1mio entries!):
UPDATE LOG SET DB_UPDATE_TS = TIMESTAMP;
Commit;
--or for large amounts of data and live systems that also want to do something with the tables, as an anonymous PL/SQL block with little undo tablespace requirement:
--for this, there should also be an index on the timestamp column to avoid FullTableScans!
declare begin
FOR counter IN 0 .. 3650 LOOP
--dbms_output.put_line(to_char(to_date('2010-01-01', 'YYYY-MM-DD') + counter, 'YYYY-MM-DD') || ' - ' || to_char(to_date('2010-01-01', 'YYYY-MM-DD') + 1 + counter, 'YYYY-MM-DD'));
--LOG:
update log set DB_UPDATE_TS = TIMESTAMP where TIMESTAMP between to_date('2010-01-01', 'YYYY-MM-DD') + counter and to_date('2010-01-01', 'YYYY-MM-DD') + 1 + counter;
commit;
--CHILDLOG:
update childlog set DB_UPDATE_TS = TIMESTAMP where TIMESTAMP between to_date('2010-01-01', 'YYYY-MM-DD') + counter and to_date('2010-01-01', 'YYYY-MM-DD') + 1 + counter;
commit;
END LOOP;
end;
--Create a trigger that sets the column anew with every update (so that no adjustment is necessary in the logging process)
CREATE OR REPLACE
TRIGGER LOG_DB_UPDATE_TS
BEFORE UPDATE ON LOG
REFERENCING NEW AS NEW OLD AS OLD
FOR EACH ROW
DECLARE
BEGIN
:NEW.DB_UPDATE_TS := SYSTIMESTAMP AT TIME ZONE 'UTC';
END;
/
--Create index:
CREATE INDEX IDX_LOG_DBLU ON LOG(DB_UPDATE_TS) COMPUTE STATISTICS;
--The whole thing again for Childlog:
ALTER TABLE CHILDLOG ADD (DB_UPDATE_TS TIMESTAMP DEFAULT SYSTIMESTAMP AT TIME ZONE 'UTC' NOT NULL);
--See if necessary. PLSQL block above!
UPDATE CHILDLOG SET DB_UPDATE_TS = TIMESTAMP;
commit;
CREATE OR REPLACE
TRIGGER CHILDLOG_DB_UPDATE_TS
BEFORE UPDATE ON CHILDLOG
REFERENCING NEW AS NEW OLD AS OLD
FOR EACH ROW
DECLARE
BEGIN
:NEW.DB_UPDATE_TS := SYSTIMESTAMP AT TIME ZONE 'UTC';
END;
/
CREATE INDEX IDX_CHILDLOG_DBLU ON CHILDLOG(DB_UPDATE_TS) COMPUTE STATISTICS;
--Finished
MSSQL
|
|
|
Der Trigger für Childlog fehlt. Normalerweise wird dieser aber nicht benötigt, da die Einträge immer nur hinzugefügt und nicht aktualisiert werden. |
/* Spalten hinzufügen. DATETIME2 für bessere Auflösung als DATETIME. Offensichtlich ist MSSQL nicht millisekundengenau, daher statt current_timestamp die Funktion SYSDATETIME() verwenden, diese ist nanosekundengenau. */
ALTER TABLE [LOG] ADD DB_UPDATE_TS DATETIME2 DEFAULT SYSDATETIME() NOT NULL;
GO
ALTER TABLE [CHILDLOG] ADD DB_UPDATE_TS DATETIME2 DEFAULT SYSDATETIME() NOT NULL;
GO
/* Spalten initial füllen */
BEGIN
UPDATE [LOG] SET DB_UPDATE_TS = [timestamp];
END
GO
BEGIN
UPDATE [CHILDLOG] SET DB_UPDATE_TS = [timestamp];
END
GO
/* Trigger */
CREATE TRIGGER LOG_DB_UPDATE_TS
ON [LOG]
AFTER UPDATE
AS
BEGIN
IF NOT UPDATE(DB_UPDATE_TS)
BEGIN
UPDATE t
SET t.DB_UPDATE_TS = SYSDATETIME()
FROM [LOG] AS t
INNER JOIN inserted AS i
ON t.PROCESSID = i.PROCESSID;
END
END
GO
/* Indizes */
CREATE INDEX IDX_LOG_DBLU ON LOG (DB_UPDATE_TS);
CREATE INDEX IDX_CHILDLOG_DBLU ON CHILDLOG (DB_UPDATE_TS);
MySQL
and for MySQL quite simply:
# Fügt der Tabelle einen von der DB verwalteten ausreichend genauen Timestamp hinzu
ALTER TABLE LOG ADD DB_UPDATE_TS TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6)
ALTER TABLE CHILDLOG ADD DB_UPDATE_TS TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6)
PostgreSQL
and for PostgreSQL also via trigger:
ALTER TABLE log ADD db_update_ts timestamp NOT null DEFAULT (timezone('UTC', now()));
ALTER TABLE childlog ADD db_update_ts timestamp NOT null DEFAULT (timezone('UTC', now()));
CREATE OR REPLACE FUNCTION update_db_update_ts_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.db_update_ts = timezone('UTC', now());
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_log_db_update_ts BEFORE INSERT OR UPDATE ON log FOR EACH ROW EXECUTE PROCEDURE update_db_update_ts_column();
CREATE TRIGGER update_childlog_db_update_ts BEFORE INSERT OR UPDATE ON childlog FOR EACH ROW EXECUTE PROCEDURE update_db_update_ts_column();
CREATE INDEX idx_log_dblu ON log (db_update_ts);
CREATE INDEX idx_childlog_dblu ON childlog (db_update_ts);
Konfiguration
Datenquellen
Datenquellen sind Verbindungen zu einzelnen Datenbanken. Die einzelnen Replikationsjobs verwenden/referenzieren diese dann. Diese werden unter Backend Connections vom Typ "data_source" eingerichtet und anhand der Komponenten ID referenziert.
Oberfläche
Für das Einrichten der einzelnen Replikation-Jobs steht eine eigene Oberfläche unter Einstellungen → Replication → Komponenten → Editor bereit. Über die Oberfläche können Einträge erzeugt, gelöscht, dupliziert und auch einzeln aktiviert/deaktiviert werden.
Konfigurationsparameter des Replication Moduls
Folgend werden die verschiedenen Parameter und damit verknüpften Funktionen beschrieben. Diese sind unter BPC Administration → Replication → Allgemein zu finden.
| Setting (Key) | Datentyp | Beschreibung |
|---|---|---|
Replication_Threads |
int |
Anzahl der Threads die zur gleichzeitigen Ausführung der Replication Jobs verwendet werden. |
Replication_Templates |
JSON |
Zur Verfügungstellung von Vorlagen, welche bei den einzelnen Replikationsjobs dann ausgewählt werden können ("Template laden"). Um darüber dann zum Beispiel die Felder "Index Settings", "Index Mappings" und "Dynamic Templates" zu füllen. |
Logs_Enabled |
boolean |
Legt fest, ob die Durchläufe der Replication-Jobs (wenn dort aktiviert) in den speziellen OpenSearch Index 'bpc-replicationjobs-log' geloggt werden sollen oder nicht. |
Logs_BulkWritePeriodInSeconds |
int |
Angabe in Sekunden. Nach jedem Replikationsdurchlauf diese zu loggen würde sich negativ auf die Gesamtperformance des Systems auswirken. Deshalb werden die Logeinträge gesammelt und nach dieser Zeit im Bulk geschrieben. |
Logs_CleanupPeriodInMinutes |
int |
Angabe in Minuten. Intervall in dem die Bereinigung des 'bpc-replicationjobs-log' Index erfolgen soll. |
Logs_DeleteEntriesOlderThan |
text |
Angabe, wie lange Dokumente im 'bpc-replicationjobs-log' Index enthalten sein sollen.
Kommt bei der Bereinigung zum Einsatz.
Beispiel: |
TailSync_Logs_Enabled |
boolean |
Legt fest, ob die Durchläufe der Tail Sync Durchläufe (wenn dort aktiviert) in den speziellen OpenSearch Index 'bpc-tailsync-log' geloggt werden sollen oder nicht. |
TailSync_Logs_CleanupPeriodInMinutes |
int |
Angabe in Minuten. Intervall in dem die Bereinigung des 'bpc-tailsync-log' Index erfolgen soll. |
TailSync_Logs_DeleteEntriesOlderThan |
text |
Angabe, wie lange Dokumente im 'bpc-tailsync-log' Index enthalten sein sollen.
Kommt bei der Bereinigung zum Einsatz.
Beispiel: |
Konfigurationsparameter eines Replication Jobs
Folgend werden die verschiedenen Parameter und damit verknüpften Funktionen beschrieben. Diese sind unter BPC Administration → Replication → Komponenten zu finden. Es wird empfohlen, die spezialisierte Oberfläche zu verwenden: BPC Administration → Replication → Editor
Grundeinstellungen
| Setting (Key) | Datentyp | Beschreibung |
|---|---|---|
Replication_Enabled |
boolean |
Den Replikationsjob aktivieren/deaktivieren |
Replication_StartDate |
String |
Replikation von Datensätzen die neuer sind als dieser Zeitpunkt. oder als relativer Wert im Format:
Beispiele
|
Quelle / Source
| Setting (Key) | Datentyp | Beschreibung | ||
|---|---|---|---|---|
Source_DataSource |
String |
Zu verwendende Datenquelle (ID der Backend Connections vom Typ |
||
Source_Table |
String |
Tabellenname der Quelle bzw. Name der CTE, wenn die optionale 'Source_CommonTableExpressionQuery' verwendet wird. |
||
Source_Timezone |
String |
Zeitzone der in der Quell-Datenbanktabelle verwendeten Datum-Felder (verwendet intern TimeZone.getTimeZone). Wird nur auf die eigentlichen Daten angewandt und nicht auf die der 'lastUpdateColumn'-Spalte. Beispiele
|
||
Source_IdColumns |
String |
Spalten für die Bildung eines eindeutigen Schlüssels im OpenSearch.
Zum Beispiel: |
||
Source_LastUpdateColumn |
String |
Diese Spalte wird für die Ermittlung des Alters des Datensatzes herangezogen
|
||
Source_LastUpdateColumnTimezone |
String |
Zeitzone, welche in der Quell-Datenbanktabelle für die Daten in der
Beispiele
|
||
Source_QueryTimeoutInSeconds |
Integer |
Legt fest, wie lange der JDBC-Treiber auf eine Rückantwort der DB wartet. Siehe auch JDBC java.sql.Statement.setQueryTimeout() |
||
Source_CommonTableExpressionQuery |
String |
Kann anstatt Datenbank-Views verwendet werden.
Darf nur den SELECT Statement einer Common Table Expression (CTE) enthalten.
Das |
Ziel / Target
| Setting (Key) | Datentyp | Beschreibung |
|---|---|---|
Target_Index |
String |
Ziel-Index im OpenSearch. |
Target_CaseSensitivityOfFields |
String |
Legt fest, wie die Felder in OpenSearch angelegt werden sollen (Groß-/Kleinschreibung).
|
Target_IndexCreationSettings |
JSON |
Dem Ziel-Index bei Erstellung abweichende Settings vergeben.
Beispiel mit hinzugefügtem "Index Sorting"
Soll das "Index Sorting" verwendet werden, dann müssen für die angegebenen Sortierungsfelder (LASTUPDATE in dem Beispiel) auch gleich OpenSearch-Mappings angelegt werden (siehe Target_IndexMappings). |
Target_IndexMappings |
JSON |
Dem Ziel-Index bei Erstellung ein Mapping vergeben. Sollte nur in bestimmten Fällen notwendig sein. Das Mapping für das "Index Sorting" Beispiel von oben.
|
Target_IndexDynamicTemplates |
JSON |
Dem Ziel-Index ein maßgeschneidertes Mapping zuweisen. Ist dies gesetzt, dann wird das Globale (siehe Core Einstellung → Core_IndexDynamicTemplates) nicht verwendet. In der Elasticsearch Dokumentation (Verweis bis die OpenSearch Dokumentation ebenbürtig ist) gibt es mehr Infos zu den Möglichkeiten der Dynamic Templates. Bei dem folgenden Beispiel werden alle Felder die OpenSearch als Textfelder (Strings) erkennt mit einem Mapping versehen ("alle_textfelder") bei dem der Inhalt nicht analysiert wird (spart Speicherplatz und die Daten können trotzdem noch angezeigt werden). Plus einer Ausnahme ("spezialfall"): Für alle Textfelder welche den Namenspostfix 'name' haben wird unser Standard Mapping verwendet. Beispiel
Um den OpenSearch-Typ für ein Datenbankfeld vorzugeben. Es kann ab und zu vorkommen, dass sich OpenSearch mit dem Mapping vertut und einen unpassenden Typ verwendet. Konkretes Beispiel: Die Oracle Spalte mit dem Namen ‘ZAHL’ vom Datentyp ‘NUMBER(10,2)’ wird im OpenSearch Mapping als Typ ‘long’ anstatt ‘float’ angelegt. Mit dem untenstehenden Beispiel kann dies korrigiert werden. Beispiel
|
Erweiterte Einstellungen / Advanced
Schattenkopien
Bei einer Schattenkopie werden zu dem festgelegten Zeitpunkt alle Dokumente (die nach dem festgelegten 'replicationStartDate' liegen) des OpenSearch-Index (siehe 'targetIndex') in einen neuen Index kopiert. Am Ende wird der Alias auf den neuen Index umgebogen und der bisherige Index gelöscht. Dies kann zum Beispiel einmal in der Woche durchgeführt werden, um wieder an einen schlanken Index zu kommen.
Zusatzhinweis: Die in OpenSearch als gelöscht markierten Dokumente (entweder selbst manuell gelöscht oder durch den Tail Sync) werden natürlich auch nicht mit in den neuen Index übernommen. Achtung: Der zugehörige Replikationslauf wird solange ausgesetzt, bis die Schattenkopie erstellt wurde.
| Setting (Key) | Datentyp | Beschreibung |
|---|---|---|
ShadowCopy_Enabled |
boolean |
Erstellung von Schattenkopien aktivieren (shadow copy). |
ShadowCopy_CronPattern |
String |
Cron-ähnliches Pattern (nach Quartz-Scheduler-Syntax) zur Festlegung des Zeitpunktes, wann die Schattenkopien erstellt werden sollen. Beispiele
Weitere Beispiele und die Dokumentation sind auf der Quartz-Scheduler Webseite zu finden. |
ShadowCopy_KeepCopiesCount |
Integer |
Anzahl der Schattenkopien die vorgehalten werden sollen.
|
Tail Sync
Der 'Tail Sync' arbeitet auf dem aktuellen Index (siehe 'targetIndex') und synchronisiert älterer Daten (neue/geänderte/gelöschte Datenbanksätze) mit OpenSearch, sodass diese wieder mit der Datenbank übereinstimmen. Dies kann z.B. jede Nacht einmal durchgeführt werden. Dokumente, die älter als das Startdatum der Replikation sind (siehe 'replicationStartDate'), werden erst einmal aus dem OpenSearch-Index gelöscht. Danach geht er Blockweise (Default 10 Tages-Schritte) die Datenbank durch und gleicht diese mit OpenSearch ab. Dabei werden neue/geänderte Datenbanksätze in den OpenSearch Index übernommen und Dokumente, die nicht mehr in der Datenbank existieren, werden in OpenSearch gelöscht. Der Tail Sync arbeitet nur mit den festgelegten 'idColumns' und 'lastUpdateColumn' Feldern (Hint: passender DB-Index hilft Wunder!). Die kompletten Daten werden nur bei neuen/geänderten Datenbanksätzen ausgelesen.
Alternativ bzw. zusätzlich zu einem zeitgesteuerten Tail Sync kann dieser auch manuell gestartet werden.
Dazu gibt es unter Replication → Jobs für jeden Replikationsjob einen Button, der einen Tail Sync startet.
Für das manuelle Starten haben die Einstellung replicationTailSyncEnabled und replicationTailSyncCronPattern keine Auswirkungen, alle anderen Einstellungen werden berücksichtigt.
|
Der Sync läuft nicht bis zum aktuellen Datum (siehe |
| Setting (Key) | Datentyp | Beschreibung | ||
|---|---|---|---|---|
TailSync_Enabled |
boolean |
Den 'Tail Sync' aktivieren |
||
TailSync_CronPattern |
String |
Cron-ähnliches Pattern (nach Quartz-Scheduler-Syntax) zur Festlegung des Zeitpunktes, wann der Tail Sync gestartet werden soll. Beispiele
Weitere Beispiele und die Dokumentation sind auf der Quartz-Scheduler Webseite zu finden. |
||
TailSync_BlockSize |
Integer |
Blockgröße des Datenbanktreibers.
Beachte den Hinweis zu der Option |
||
TailSync_RelativeStartDate |
String |
Das relativ angegebene Startdatum. Ab diesem Zeitpunkt soll dann synchronisiert werden. Daten, die vor dem Startdatum der Replikation liegen, werden auch weiterhin gelöscht. Dieses Datum wird nur verwendet, wenn es nach dem regulären Startdatum und vor dem Enddatum liegt.
|
||
TailSync_RelativeEndDate |
String |
Das relativ angegebene Enddatum. Bis zu diesem Zeitpunkt soll dann synchronisiert werden und nicht weiter.
|
||
TailSync_BlockDayRange |
Integer |
Number of days (block) in which the data is to be processed.
See also |
||
TailSync_RelativeDeleteOlderThanDate |
String |
The relative deletion date to be specified. All documents that are older are deleted. If this option is not set, all documents that are older than the start date of the replication are deleted (see |
||
TailSync_LoggingEnabled |
boolean |
The relative deletion date to be specified.
Specifies whether the tail sync runs are to be logged in the The global setting |
Consistency Check
A simple consistency check is carried out after the replication runs. The number of documents in the source and target are compared. This is done in the period 'replicationStartDate' and the last date which is in the target (OpenSearch).
|
If there is a lot of data (tens of millions of records), this can lead to an increased load on the system and an initial replication can be extremely slowed down. Only activate the consistency check if you are sure that the data will be fully replicated. |
| Setting (Key) | Data type | Description |
|---|---|---|
ConsistencyCheck_Frequency |
Integer |
The frequency with which the consistency check is performed. Examples:
|
Lookup Joins
Can be used to enrich documents to be replicated with additional data. If, for example the data to be replicated only contains a partner ID and the name of the partner etc. is still required in the monitor.
Denormalization is used under OpenSearch, i.e. this data is also transferred to the document and is available like all other data (high-performance search; aggregation, ….). Prerequisite: The lookup "tables" must be available as independent indexes and can, for example, be transferred from a DB table via an additional replication.
The lookup data can be updated manually ( BPC settings → Overview → Status → Replication → Jobs → Job → Synchronize lookup joins ) and automatically using our OpenSearch BPC plugin (os-bpc-plugin).
Several lookup tables can be referenced, the possible values of an ENTRY are described below. Structure: "join": [ { EINTRAG }, { EINTRAG }, … ]
|
For the value comparison to work, the column types in the database tables must be identical. In the case of string fields, "*.raw" should be used as the lookupKeyField, but without the ".raw" |
[
{
"keyField": "PARTNER",
"lookupIndex": "lookup-partner",
"lookupKeyField": "ID.raw",
"resultFieldsPrefix": "partner_",
"resultFieldsExcluded": [ "ID", "LASTUPDATE" ]
},
{
"keyField": "MESSAGETYPE",
"lookupIndex": "lookup-messagetype",
"lookupKeyField": "ID.raw",
"resultFieldsPrefix": "messagetype_",
"resultFieldsExcluded": [ "ID", "LASTUPDATE" ]
}
]
| Field | Data type | Description |
|---|---|---|
keyField |
String |
The key field in the table to be replicated. The data from the lookup index is transferred via the value of the field. Example: PARTNER_ID |
keyFieldValuesSeparator |
String |
If there are several values in the keyField that are separated by a separator, this separator can be specified here. The lookup join is then performed on the individual values. Example: The 'lookupIndex' contains the following data:
If the keyFieldValuesSeparator is '%%', the resultFieldsPrefix is 'MENGENEINHEIT_' and the transferred keyValue value is '42%%20%%30', the following field is created: QUANTITY_LONGNAME = centimeter%%gram%%kilogram |
lookupIndex |
String |
The OpenSearch index with the lookup data. Example: lookup-partner |
lookupKeyField |
String |
Example: ID.raw |
resultFieldsPrefix |
String |
The fields to be transferred from the lookup index must be provided with a unique prefix so that there is no conflict with existing fields. Example: partner_ |
resultFieldsIncluded |
String Array |
If not all fields from the lookup index are to be included, the fields to be included can be specified here. Example: [ "FIRST_NAME", "LAST_NAME" ] |
resultFieldsExcluded |
String Array |
If almost all fields are to be included, the fields to be excluded can be specified here. Example: [ "ID", "UPDATED" ] |
Logging of replications
The runs of the replication jobs can be logged and displayed, for example, via the automatically created monitor "Replication Jobs Monitor".
Similarly, runs of the tail sync can also be logged and viewed in the monitor "Tail Sync Logs Monitor".
These monitors are created at startup if they are not available.
They cannot be permanently deleted.
The indices used bpc-replicationjobs-log and bpc-tailsync-log are also created automatically if a run needs to be logged.
Logging can be switched on and off globally and for each replication job (see corresponding settings above). Logging is activated globally for both replications and tail syncs by default, but deactivated in the individual replication jobs. Therefore, no logging is performed by default.
The log entries for replication runs are written in bulk to the index mentioned above after some time. This has little impact on the overall performance of the system. However, a lot of entries can be generated in a very short time (millions in a few hours), so perhaps do not run the cleanup too often and keep an eye on the number of documents it contains: Settings → Core Services → Indices
Since tail syncs take place much less frequently, the bpc-tailsync-log index should not grow so quickly during tail sync logging, but it is still regularly cleaned of old entries (see configuration above).