Skip to content

Commit

Permalink
GITBOOK-1169: Reporting - Default Connector changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Lalith Kota authored and gitbook-bot committed Nov 3, 2024
1 parent 9531a73 commit e2fbba6
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ Follow the [Installation guide](installation-and-troubleshooting.md) to install/
"topic.prefix": "${DB_PREFIX_INDEX}",
"table.include.list": "",
"heartbeat.interval.ms": "${DEFAULT_DEBEZIUM_CONNECTOR_HEARTBEAT_MS}",
"decimal.handling.mode": "double"
}
"decimal.handling.mode": "double",
"tombstones.on.delete": false
},
"wait_after_init_secs": 20
}
```

Expand Down Expand Up @@ -76,6 +78,7 @@ Each `$` in the json file will be treated as an environment variable. Environmen
"connection.password": "${OPENSEARCH_PASSWORD}",
"tasks.max": "1",
"topics": "${DB_PREFIX_INDEX}.public.res_partner",
"index.write.method": "upsert",
"key.ignore": "false",
"schema.ignore": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
Expand All @@ -93,7 +96,7 @@ Each `$` in the json file will be treated as an environment variable. Environmen
"transforms.keyExtId.field": "id",

"transforms.valExt.type": "org.openg2p.reporting.kafka.connect.transforms.ApplyJq${dollar}Value",
"transforms.valExt.expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",
"transforms.valExt.expr": ".payload.after + (if .payload.after then {source_ts_ms: .payload.source.ts_ms} else null end)",

"transforms.tsconvert01.type": "org.openg2p.reporting.kafka.connect.transforms.TimestampConverterAdv${dollar}Value",
"transforms.tsconvert01.field": "source_ts_ms",
Expand Down Expand Up @@ -148,19 +151,7 @@ Each `$` in the json file will be treated as an environment variable. Environmen
"transforms.join02.es.username": "${OPENSEARCH_USERNAME}",
"transforms.join02.es.password": "${OPENSEARCH_PASSWORD}",
```
* If you want to add data/fields from one connector to another index on OpenSearch, use the DynamicNewFieldInsertBack transform. For example; NATIONAL IDs of registrants are saved in g2p\_reg\_id table. But if that field data is needed on res\_partner index (main registrant data table) the following can be done on the g2p\_reg\_id connector. (The following adds `reg_id_NATIONAL_ID` field into res\_partner index from g2p\_reg\_id connector into the document with ID from `partner_id` field) :

```json
"transforms.insertBack1.type": "org.openg2p.reporting.kafka.connect.transforms.DynamicNewFieldInsertBack${dollar}Value",
"transforms.insertBack1.id.expr": ".partner_id",
"transforms.insertBack1.condition": ".id_type_name == \"NATIONAL ID\"",
"transforms.insertBack1.value": "{reg_id_NATIONAL_ID: .value}",
"transforms.insertBack1.es.index": "${DB_PREFIX_INDEX}.public.res_partner",
"transforms.insertBack1.es.url": "${OPENSEARCH_URL}",
"transforms.insertBack1.es.security.enabled": "${OPENSEARCH_SECURITY_ENABLED}",
"transforms.insertBack1.es.username": "${OPENSEARCH_USERNAME}",
"transforms.insertBack1.es.password": "${OPENSEARCH_PASSWORD}",
```
* If you want to add data/fields from one connector to another index on OpenSearch, use the DynamicNewFieldInsertBack transform. For example, see the `insertBack` section in [this sample connector](https://github.com/OpenG2P/openg2p-reporting/blob/develop/scripts/social-registry/opensearch-connectors/14.reg\_id.json).
* If you wish to apply a [Jq filter](https://jqlang.github.io/jq/manual/) on the record, use ApplyJq transform. The current record will be replaced with the result after applying Jq. Example:

```json
Expand Down Expand Up @@ -196,9 +187,9 @@ Each `$` in the json file will be treated as an environment variable. Environmen
* For detailed transform configuration, refer to [Apache Kafka Connect Transformations](https://kafka.apache.org/documentation/#connect\_transforms) doc.
* For a list of all available SMTs and their configs, refer to [Reporting Kafka Connect Transforms](../kafka-connect-transform-reference.md).

#### Capturing Change History
#### Capturing Record History

* If you also wish to record all the changes that are made to the records of a table, create a new OpenSearch connector for the same topic as given in [this section](connector-creation-guide.md#opensearch-connector-creation) and change the following properties.
* If you also wish to record the history of how records of a table have changed, create a new OpenSearch connector for the same topic as given in [this section](connector-creation-guide.md#opensearch-connector-creation) and change the following properties.

```json
{
Expand All @@ -214,7 +205,12 @@ Each `$` in the json file will be treated as an environment variable. Environmen
}
}
```
* With this configuration, you will have two OpenSearch connectors. One that tracks the latest data of a table. And one that tracks all the changes. Correspondingly you have two indexes on OpenSearch (one with `_history` and one with regular data).
* Remove this line from the configuration.

```json
"index.write.method": "upsert",
```
* With this configuration, you will have two OpenSearch connectors. One that tracks the latest data of a table. And one that tracks the history. Correspondingly you have two indexes on OpenSearch (one with `_history` and one with regular data).

## OpenSearch dashboard creation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,67 @@ To ensure that all Kafka connectors are working login into Kafka UI (domain name

## Update Connectors

This procedure doesn't update the data present in OpenSearch, it only updates the connector configs, so only the new and incoming data is affected.
This procedure doesn't update the data present in OpenSearch, it only updates the connector configs, so only the new and incoming data is affected. (If you need the data in OpenSearch/Kafka to be refreshed, then follow the [Cleanup](installation-and-troubleshooting.md#cleanup-and-uninstall) guide. And then continue with this guide.)

### Upgrade base method

Follow this method if you wish to upgrade to the latest version of reporting (and reporting-init scripts) and to upgrade connector configs.

* After making changes to connectors/dashboards in your GitHub Repo, go to the Installed Apps section on Rancher and upgrade your module, SR/PBMS, etc. (without changing any helm values).
* When the upgrade finishes the new reporting connector changes are automatically applied to the connectors. Log in to Kafka UI and check whether the connector config has been updated.

### Rerun initialize method

Follow this method if you don't wish to upgrade the reporting version, but only want to update connector configurations, etc. (Useful mainly during development and testing.)

* Copy your OpenG2P module installation name on Kubernetes. (For example, say `social-registry`).
* Copy the reporting-init Job yaml from the output of this command into a temporary file, say `reporting-init.yaml` .

```bash
helm -n namespace get hooks social-registry
```
* Delete the reporting-init job. (Can also be done from Rancher UI)

```bash
kubectl -n namespace delete job social-registry-reporting-init
```
* Apply the above reporting-init.yaml after deletion.

```bash
kubectl -n namespace apply -f reporting-init.yaml
```

## Cleanup and uninstall

This describes steps to clean up the connectors and the data so that fresh connectors can be installed again.
{% hint style="danger" %}
Warning: All data on OpenSearch will be lost. This is especially important if you are capturing [the history of the records](connector-creation-guide.md#capturing-record-history) as you will not get that history back.
{% endhint %}

This describes steps to clean up the connectors and the data so fresh connectors can be installed again.

* Log in to Kafka UI -> Kafka Connect Section, and delete all the connectors.
* Delete all the topics related to the connectors as well.
* Delete all the topics related to the connectors as well. Also, delete Kafka _consumer offsets_ (IMPORTANT). TODO: give a guide on how to clear Kafka consumer offsets.
* Log in to OpenSearch -> Index Management, and delete all the relevant indices.
* Delete _replication slots_ and _publication_ on Postgres.
* Delete _replication slots_ and _publication_ on Postgres, using the following commands.
* List all replication slots:

```sql
select * from pg_replication_slots;
```
* Delete a replication slot:
```sql
select pg_drop_replication_slot('replication_slot_name_from_above');
```
* List all publications:
```sql
select * from pg_publication;
```
* Delete a publication:
```sql
drop publication publication_name_from_above;
```
If you want to install the connectors again, follow the [Update](installation-and-troubleshooting.md#update-connectors) guide.

0 comments on commit e2fbba6

Please sign in to comment.