From 2c23a5e975ba8596a4e1520129b0eff239fdc076 Mon Sep 17 00:00:00 2001
From: Mercari OSS Bot <92021976+mercari-oss-bot@users.noreply.github.com>
Date: Fri, 18 Mar 2022 14:23:51 +0900
Subject: [PATCH] Initial commit
---
.github/workflows/publish.yaml | 25 +
.github/workflows/release.yaml | 29 ++
.github/workflows/settings.xml | 42 ++
.github/workflows/test.yaml | 19 +
.gitignore | 6 +
LICENSE.txt | 201 +++++++
Makefile | 15 +
README.md | 416 +++++++++++++++
.../pom.xml | 80 +++
.../transforms/kryptonite/CipherField.java | 390 ++++++++++++++
.../transforms/kryptonite/DataKeyConfig.java | 82 +++
.../transforms/kryptonite/FieldConfig.java | 106 ++++
.../kryptonite/FieldPathMatcher.java | 29 ++
.../transforms/kryptonite/RecordHandler.java | 143 +++++
.../transforms/kryptonite/SchemaRewriter.java | 357 +++++++++++++
.../kryptonite/SchemaawareRecordHandler.java | 109 ++++
.../kryptonite/SchemalessRecordHandler.java | 81 +++
.../kryptonite/TypeSchemaMapper.java | 81 +++
.../kryptonite/serdes/KryoInstance.java | 44 ++
.../kryptonite/serdes/KryoSerdeProcessor.java | 194 +++++++
.../kryptonite/serdes/SerdeProcessor.java | 28 +
.../kryptonite/util/JsonStringReader.java | 128 +++++
.../kryptonite/util/JsonStringWriter.java | 126 +++++
.../validators/CipherDataKeysValidator.java | 74 +++
.../validators/CipherEncodingValidator.java | 47 ++
.../validators/CipherModeValidator.java | 40 ++
.../validators/CipherNameValidator.java | 49 ++
.../validators/FieldConfigValidator.java | 57 ++
.../validators/FieldModeValidator.java | 40 ++
.../validators/KeySourceValidator.java | 39 ++
.../validators/TimeUnitValidator.java | 23 +
.../kryptonite/CipherFieldTest.java | 489 ++++++++++++++++++
.../src/test/resources/logback.xml | 15 +
kryptonite/pom.xml | 65 +++
.../hpgrahsl/kryptonite/AesGcmNoPadding.java | 59 +++
.../hpgrahsl/kryptonite/CipherMode.java | 22 +
.../hpgrahsl/kryptonite/Cipherable.java | 22 +
.../kryptonite/ConfigDataKeyVault.java | 47 ++
.../hpgrahsl/kryptonite/CryptoAlgorithm.java | 24 +
.../hpgrahsl/kryptonite/DataException.java | 24 +
.../hpgrahsl/kryptonite/FieldMetaData.java | 94 ++++
.../kryptonite/GcpKmsKeyStrategy.java | 30 ++
.../kryptonite/GcpSecretManagerKeyVault.java | 90 ++++
.../hpgrahsl/kryptonite/KeyException.java | 40 ++
.../kryptonite/KeyInvalidException.java | 40 ++
.../kryptonite/KeyNotFoundException.java | 40 ++
.../hpgrahsl/kryptonite/KeyStrategy.java | 31 ++
.../github/hpgrahsl/kryptonite/KeyVault.java | 28 +
.../hpgrahsl/kryptonite/Kryptonite.java | 94 ++++
.../hpgrahsl/kryptonite/NoOpKeyStrategy.java | 25 +
kryptonite/src/test/resources/logback.xml | 15 +
pom.xml | 206 ++++++++
52 files changed, 4600 insertions(+)
create mode 100644 .github/workflows/publish.yaml
create mode 100644 .github/workflows/release.yaml
create mode 100644 .github/workflows/settings.xml
create mode 100644 .github/workflows/test.yaml
create mode 100644 .gitignore
create mode 100644 LICENSE.txt
create mode 100644 Makefile
create mode 100644 README.md
create mode 100644 kafka-connect-transform-kryptonite-gcp/pom.xml
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/CipherField.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/DataKeyConfig.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/FieldConfig.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/FieldPathMatcher.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/RecordHandler.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/SchemaRewriter.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/SchemaawareRecordHandler.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/SchemalessRecordHandler.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/TypeSchemaMapper.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/serdes/KryoInstance.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/serdes/KryoSerdeProcessor.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/serdes/SerdeProcessor.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/util/JsonStringReader.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/util/JsonStringWriter.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/validators/CipherDataKeysValidator.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/validators/CipherEncodingValidator.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/validators/CipherModeValidator.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/validators/CipherNameValidator.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/validators/FieldConfigValidator.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/validators/FieldModeValidator.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/validators/KeySourceValidator.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/validators/TimeUnitValidator.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/test/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/CipherFieldTest.java
create mode 100644 kafka-connect-transform-kryptonite-gcp/src/test/resources/logback.xml
create mode 100644 kryptonite/pom.xml
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/AesGcmNoPadding.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/CipherMode.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/Cipherable.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/ConfigDataKeyVault.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/CryptoAlgorithm.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/DataException.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/FieldMetaData.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/GcpKmsKeyStrategy.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/GcpSecretManagerKeyVault.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/KeyException.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/KeyInvalidException.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/KeyNotFoundException.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/KeyStrategy.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/KeyVault.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/Kryptonite.java
create mode 100644 kryptonite/src/main/java/com/github/hpgrahsl/kryptonite/NoOpKeyStrategy.java
create mode 100644 kryptonite/src/test/resources/logback.xml
create mode 100644 pom.xml
diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml
new file mode 100644
index 0000000..38ecbf9
--- /dev/null
+++ b/.github/workflows/publish.yaml
@@ -0,0 +1,25 @@
+name: publish
+
+on:
+ push:
+ branches:
+ - master
+
+jobs:
+ publish:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - uses: actions/setup-java@v2
+ with:
+ java-version: '8'
+ distribution: 'adopt'
+ - name: Publish snapshot package
+ run: |
+ VERSION=$(mvn org.apache.maven.plugins:maven-help-plugin:3.2.0:evaluate -Dexpression=project.version|grep -Ev '(^\[|Download\w+:)')
+ if [[ ${VERSION} == *SNAPSHOT ]]; then
+ mvn -s ${{ github.workspace }}/.github/workflows/settings.xml --batch-mode deploy
+ fi
+ env:
+ JFROG_USERNAME: ${{ secrets.JFROG_USERNAME }}
+ JFROG_PASSWORD: ${{ secrets.JFROG_PASSWORD }}
diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml
new file mode 100644
index 0000000..bccc414
--- /dev/null
+++ b/.github/workflows/release.yaml
@@ -0,0 +1,29 @@
+name: release
+
+on:
+ push:
+ tags:
+ - "v*"
+
+jobs:
+ publish:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - uses: actions/setup-java@v2
+ with:
+ java-version: '8'
+ distribution: 'adopt'
+ - name: Publish package
+ run: |
+ mvn -s ${{ github.workspace }}/.github/workflows/settings.xml --batch-mode deploy
+ rm -rf */target/original-*.jar
+ env:
+ JFROG_USERNAME: ${{ secrets.JFROG_USERNAME }}
+ JFROG_PASSWORD: ${{ secrets.JFROG_PASSWORD }}
+ - name: Release
+ uses: softprops/action-gh-release@v1
+ with:
+ files: "*/target/*.jar"
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
diff --git a/.github/workflows/settings.xml b/.github/workflows/settings.xml
new file mode 100644
index 0000000..2918a7e
--- /dev/null
+++ b/.github/workflows/settings.xml
@@ -0,0 +1,42 @@
+
+
+
+
+ mercari
+
+
+
+
+ mercari
+
+
+ central
+ https://mercari.jfrog.io/mercari/libs-release
+
+ false
+
+
+
+ snapshots
+ https://mercari.jfrog.io/mercari/libs-snapshot
+
+ false
+
+
+
+
+
+
+
+
+ central
+ ${env.JFROG_USERNAME}
+ ${env.JFROG_PASSWORD}
+
+
+ snapshots
+ ${env.JFROG_USERNAME}
+ ${env.JFROG_PASSWORD}
+
+
+
diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
new file mode 100644
index 0000000..17c269f
--- /dev/null
+++ b/.github/workflows/test.yaml
@@ -0,0 +1,19 @@
+name: build
+
+on:
+ push:
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up JDK 8
+ uses: actions/setup-java@v2
+ with:
+ java-version: "8"
+ distribution: "adopt"
+ cache: maven
+ - name: Build with Maven
+ run: mvn --batch-mode --update-snapshots verify
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..bde9135
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,6 @@
+.idea/
+target/
+*.iml
+dependency-reduced-pom.xml
+.gradle
+build
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..336b80c
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright (c) 2017. Hans-Peter Grahsl (grahslhp@gmail.com)
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..3cdda17
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,15 @@
+.PHONY: chk
+chk:
+ mvn com.coveo:fmt-maven-plugin:check
+
+.PHONY: fmt
+fmt:
+ mvn com.coveo:fmt-maven-plugin:format
+
+.PHONY: test
+test:
+ mvn verify
+
+.PHONY: jar
+jar:
+ mvn clean package
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..06f74a6
--- /dev/null
+++ b/README.md
@@ -0,0 +1,416 @@
+# Kryptonite - An SMT for Kafka Connect
+
+Kryptonite is a turn-key ready [transformation](https://kafka.apache.org/documentation/#connect_transforms) (SMT) for [Apache Kafka®](https://kafka.apache.org/)
+to do field-level encryption/decryption of records with or without schema in data integration scenarios based on [Kafka Connect](https://kafka.apache.org/documentation/#connect).
+It uses authenticated encryption with associated data ([AEAD](https://en.wikipedia.org/wiki/Authenticated_encryption)) and in particular applies
+[AES](https://en.wikipedia.org/wiki/Advanced_Encryption_Standard) in [GCM](https://en.wikipedia.org/wiki/Galois/Counter_Mode) mode.
+
+## tl;dr
+
+### Data Records without Schema
+
+The following fictional data **record value without schema** - represented in JSON-encoded format -
+is used to illustrate a simple encrypt/decrypt scenario:
+
+```json5
+{
+ "id": "1234567890",
+ "myString": "some foo bla text",
+ "myInt": 42,
+ "myBoolean": true,
+ "mySubDoc1": {"myString":"hello json"},
+ "myArray1": ["str_1","str_2","...","str_N"],
+ "mySubDoc2": {"k1":9,"k2":8,"k3":7}
+}
+```
+
+#### Encryption of selected fields
+
+Let's assume the fields `"myString"`,`"myArray1"` and `"mySubDoc2"` of the above data record should get encrypted,
+the CipherField SMT can be configured as follows:
+
+```json5
+{
+ //...
+ "transforms":"cipher",
+ "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
+ "transforms.cipher.cipher_mode": "ENCRYPT",
+ "transforms.cipher.cipher_data_keys": "[{\"name\":\"my-demo-secret-key\",\"version\":\"123\",\"material\":\"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWE=\"}]", // key materials of utmost secrecy!
+ "transforms.cipher.cipher_data_key_name": "my-demo-secret-key",
+ "transforms.cipher.cipher_data_key_version": "123",
+ "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
+ "transforms.cipher.field_mode": "OBJECT",
+ //...
+}
+```
+
+The result after applying this SMT is a record in which all the fields specified in the `field_config` parameter are
+**encrypted using the secret key** specified with the `cipher_data_key_name` and `cipher_data_key_version` parameters.
+
+If you specify `cipher_data_keys`, then apparently, the **configured key materials have to be treated with utmost secrecy**, for leaking any of the secret keys renders encryption useless.
+The recommended way of doing this for now is to indirectly reference secret key materials by externalizing them into a separate properties file.
+Read a few details about this [here](#externalize-configuration-parameters).
+It is also possible to use the GCP Cloud KMS and Secret Manager. Please see [here](#gcp-integrations) for details.
+
+Since the configuration parameter `field_mode` is set to 'OBJECT', complex field types are processed as a whole instead of element-wise.
+
+Below is an exemplary JSON-encoded record after the encryption:
+
+```json5
+{
+ "id": "1234567890",
+ "myString": "123#OtWbJ+VR6P6i1x9DE4FKOmsV43HOHttUjdufCjrt6SIixILy+6Bk9zBdWC4KCgeN9I2z",
+ "myInt": 42,
+ "myBoolean": true,
+ "mySubDoc1": {"myString":"hello json"},
+ "myArray1": "123#uWz9MODqJ0hyzXYaraEZ08S1e78ZOC0G4zeL8eZmISUpMiNsfBLDviBlWrCL2cQRbt3qNGlpKUys7/Lio9OIc0A=",
+ "mySubDoc2": "123#O0AHEZ8pOccnmBHT/5kJj2QQeke3ltf8i/kJzEo/alB2sOqUooFGThBKDZA0HjdC2zz9thvB8zfjw7+fbfts6/4="
+}
+```
+
+**NOTE:** Encrypted fields are always represented as **Base64-encoded strings**,
+with the **ciphertext** of the field's original values and the version number of the secret key appended to the beginning, separated by **#**.
+
+#### Decryption of selected fields
+
+Provided that the secret key material used to encrypt the original data record is made available to a specific sink connector,
+the CipherField SMT can be configured to decrypt the data like so:
+
+```json5
+{
+ //...
+ "transforms":"cipher",
+ "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
+ "transforms.cipher.cipher_mode": "DECRYPT",
+ "transforms.cipher.cipher_data_keys": "[{\"name\":\"my-demo-secret-key\",\"version\":\"123\",\"material\":\"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWE=\"}]", // key materials of utmost secrecy!
+ "transforms.cipher.cipher_data_key_name": "my-demo-secret-key",
+ "transforms.cipher.cipher_data_key_version": "123",
+ "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
+ "transforms.cipher.field_mode": "OBJECT",
+ //...
+}
+```
+
+The result after applying this SMT is a record in which all the fields specified in the `field_config` parameter are **decrypted
+using the secret key version that is specified and was used to encrypt the original data**.
+
+Below is an exemplary JSON-encoded record after the decryption, which is equal to the original record:
+
+```json5
+{
+ "id": "1234567890",
+ "myString": "some foo bla text",
+ "myInt": 42,
+ "myBoolean": true,
+ "mySubDoc1": {"myString":"hello json"},
+ "myArray1": ["str_1","str_2","...","str_N"],
+ "mySubDoc2": {"k1":9,"k2":8,"k3":7}
+}
+```
+
+### Data Records with Schema
+
+The following example is based on an **Avro value record** and used to illustrate a simple encrypt/decrypt scenario for data records
+with schema. The schema could be defined as:
+
+```json5
+{
+ "type": "record", "fields": [
+ { "name": "id", "type": "string" },
+ { "name": "myString", "type": "string" },
+ { "name": "myInt", "type": "int" },
+ { "name": "myBoolean", "type": "boolean" },
+ { "name": "mySubDoc1", "type": "record",
+ "fields": [
+ { "name": "myString", "type": "string" }
+ ]
+ },
+ { "name": "myArray1", "type": { "type": "array", "items": "string"}},
+ { "name": "mySubDoc2", "type": { "type": "map", "values": "int"}}
+ ]
+}
+```
+
+The data of one such fictional record - represented by its `Struct.toString()` output - might look as:
+
+```text
+Struct{
+ id=1234567890,
+ myString=some foo bla text,
+ myInt=42,
+ myBoolean=true,
+ mySubDoc1=Struct{myString=hello json},
+ myArray1=[str_1, str_2, ..., str_N],
+ mySubDoc2={k1=9, k2=8, k3=7}
+}
+```
+
+#### Encryption of selected fields
+
+Let's assume the fields `"myString"`,`"myArray1"` and `"mySubDoc2"` of the above data record should get encrypted,
+the CipherField SMT can be configured as follows:
+
+```json5
+{
+ //...
+ "transforms":"cipher",
+ "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
+ "transforms.cipher.cipher_mode": "ENCRYPT",
+ "transforms.cipher.cipher_data_keys": "[{\"name\":\"my-demo-secret-key\",\"version\":\"123\",\"material\":\"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWE=\"}]", // key materials of utmost secrecy!
+ "transforms.cipher.cipher_data_key_name": "my-demo-secret-key",
+ "transforms.cipher.cipher_data_key_version": "123",
+ "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
+ "transforms.cipher.field_mode": "OBJECT",
+ //...
+}
+```
+
+The result after applying this SMT is a record in which all the fields specified in the `field_config` parameter are
+**encrypted using the secret key** specified by its id with the `cipher_data_key_name` and `cipher_data_version` parameters.
+
+If you specify `cipher_data_keys`, then apparently, the **configured key materials have to be treated with utmost secrecy**, for leaking any of the secret keys renders encryption useless.
+The recommended way of doing this for now is to indirectly reference secret key materials by externalizing them into a separate properties file.
+Read a few details about this [here](#externalize-configuration-parameters).
+It is also possible to use the GCP Cloud KMS and Secret Manager. Please see [here](#gcp-integrations) for details.
+
+Since the configuration parameter `field_mode` is set to 'OBJECT', complex field types are processed as a whole instead of element-wise.
+
+Below is an exemplary `Struct.toString()` output of the record after the encryption:
+
+```text
+Struct{
+ id=1234567890,
+ myString=123#OtWbJ+VR6P6i1x9DE4FKOmsV43HOHttUjdufCjrt6SIixILy+6Bk9zBdWC4KCgeN9I2z,
+ myInt=42,
+ myBoolean=true,
+ mySubDoc1=Struct{myString=hello json},
+ myArray1=123#uWz9MODqJ0hyzXYaraEZ08S1e78ZOC0G4zeL8eZmISUpMiNsfBLDviBlWrCL2cQRbt3qNGlpKUys7/Lio9OIc0A=,
+ mySubDoc2=123#O0AHEZ8pOccnmBHT/5kJj2QQeke3ltf8i/kJzEo/alB2sOqUooFGThBKDZA0HjdC2zz9thvB8zfjw7+fbfts6/4=
+}
+```
+
+**NOTE 1:** Encrypted fields are always represented as **Base64-encoded strings**,
+with the **ciphertext** of the field's original values and the version number of the secret key appended to the beginning, separated by **#**.
+
+**NOTE 2:** Obviously, in order to support this **the original schema of the data record is automatically redacted such
+that any encrypted fields can be stored as strings**, even though the original data types for the fields in question were different ones.
+
+#### Decryption of selected fields
+
+Provided that the secret key material used to encrypt the original data record is made available to a specific sink connector,
+the CipherField SMT can be configured to decrypt the data like so:
+
+```json5
+{
+ //...
+ "transforms":"cipher",
+ "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
+ "transforms.cipher.cipher_mode": "DECRYPT",
+ "transforms.cipher.cipher_data_keys": "[{\"name\":\"my-demo-secret-key\",\"version\":\"123\",\"material\":\"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWE=\"}]", // key materials of utmost secrecy!
+ "transforms.cipher.cipher_data_key_name": "my-demo-secret-key",
+ "transforms.cipher.cipher_data_key_version": "123",
+ "transforms.cipher.field_config": "[{\"name\":\"myString\",\"schema\": {\"type\": \"STRING\"}},{\"name\":\"myArray1\",\"schema\": {\"type\": \"ARRAY\",\"valueSchema\": {\"type\": \"STRING\"}}},{\"name\":\"mySubDoc2\",\"schema\": { \"type\": \"MAP\", \"keySchema\": { \"type\": \"STRING\" }, \"valueSchema\": { \"type\": \"INT32\"}}}]",
+ "transforms.cipher.field_mode": "OBJECT",
+ //...
+}
+```
+
+**Take notice of the extended `field_config` parameter settings.** For decryption of schema-aware data, the SMT configuration expects
+that for each field to decrypt the original schema information is explicitly specified.
+This allows to **redact the encrypted record's schema towards a compatible decrypted record's schema upfront,**
+such that the resulting plaintext field values can be stored in accordance with their original data types.
+
+The result after applying this SMT is a record in which all the fields specified in the `field_config` parameter are
+**decrypted using the secret key id that is specified and was used to encrypt the original data**.
+
+Below is the decrypted data - represented by its `Struct.toString()` output - which is equal to the original record:
+
+```text
+Struct{
+ id=1234567890,
+ myString=some foo bla text,
+ myInt=42,
+ myBoolean=true,
+ mySubDoc1=Struct{myString=hello json},
+ myArray1=[str_1, str_2, ..., str_N],
+ mySubDoc2={k1=9, k2=8, k3=7}
+}
+```
+
+## Configuration Parameters
+
+| name | Description | Type | Default | Valid values | Importance |
+|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------|
+| cipher_data_key_name | secret key name to be used as default data encryption key for all fields which don't refer to a field-specific secret key name | string | | non-empty string | high |
+| cipher_data_key_version | secret key version to be used as default data encryption key for all fields which don't refer to a field-specific secret key version | string | | non-empty string | high |
+| cipher_data_keys | JSON array with data key objects specifying the key name, key version and base64 encoded key bytes used for encryption / decryption. The key material is mandatory if the key_source=CONFIG | password | | JSON array holding at least one valid data key config object, e.g.
if key_source=CONFIG [{"identifier":"my-key-id-1234-abcd","material":"dmVyeS1zZWNyZXQta2V5JA=="}]
| medium |
+| cipher_data_key_cache_expiry_duration | defines the expiration duration of the secret key cache To be used if key_source is GCP_SECRET_MANAGER or GCP_SECRET_MANAGER_WITH_KMS | long | 24 | long value | low |
+| cipher_data_key_cache_expiry_duration_unit | defines the unit of expiration duration of the private key cache To be used if key_source is GCP_SECRET_MANAGER or GCP_SECRET_MANAGER_WITH_KMS | string | HOURS | NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS | low |
+| cipher_mode | defines whether the data should get encrypted or decrypted | string | | ENCRYPT or DECRYPT | high |
+| field_config | JSON array with field config objects specifying which fields together with their settings should get either encrypted / decrypted (nested field names are expected to be separated by '.' per default, or by a custom 'path_delimiter' config | string | | JSON array holding at least one valid field config object, e.g. [{"name": "my-field-abc"},{"name": "my-nested.field-xyz"}] | high |
+| key_source | defines the origin of the secret key material (currently supports keys specified in the config or the GCP Secret Manager) | string | CONFIG | CONFIG or GCP_SECRET_MANAGER or GCP_SECRET_MANAGER_WITH_KMS | medium |
+| kms_key_name | The GCP Cloud KMS key name for decrypting a data encryption key (DEK), if the DEK is encrypted with a key encryption key (KEK) To be used if key_source is GCP_SECRET_MANAGER_WITH_KMS | string | | non-empty string e.g. projects/YOUR_PROJECT/locations/LOCATION/keyRings/YOUR_KEY_RING/cryptoKeys/YOUR_KEY | medium |
+| field_mode | defines how to process complex field types (maps, lists, structs), either as full objects or element-wise | string | ELEMENT | ELEMENT or OBJECT | medium |
+| cipher_algorithm | cipher algorithm used for data encryption (currently supports only one AEAD cipher: AES/GCM/NoPadding) | string | AES/GCM/NoPadding | AES/GCM/NoPadding | low |
+| cipher_text_encoding | defines the encoding of the resulting ciphertext bytes (currently only supports 'base64') | string | base64 | base64 | low |
+| path_delimiter | path delimiter used as field name separator when referring to nested fields in the input record | string | . | non-empty string | low |
+
+### Externalize configuration parameters
+
+The problem with directly specifying configuration parameters which contain sensitive data, such as secret key materials,
+is that they are exposed via Kafka Connect's REST API.
+This means for connect clusters that are shared among teams the configured secret key materials would leak, which is of course unacceptable.
+The way to deal with this for now, is to indirectly reference such configuration parameters from external property files.
+
+This approach can be used to configure any kind of sensitive data such as KMS-specific client authentication settings,
+in case the secret keys aren't sourced from the config directly but rather retrieved from an external KMS such as Azure Key Vault.
+
+Below is a quick example of how such a configuration would look like:
+
+1. Before you can make use of configuration parameters from external sources you have to customize your Kafka Connect worker configuration
+ by adding the following two settings:
+
+```
+connect.config.providers=file
+connect.config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
+```
+
+2. Then you create the external properties file e.g. `classified.properties` which contains the secret key materials.
+ This file needs to be available on all your Kafka Connect workers which you want to run Kryptonite on.
+ Let's pretend the file is located at path `/secrets/kryptonite/classified.properties` on your worker nodes:
+
+```properties
+cipher_data_keys=[{"name":"my-demo-secret-key","version":"123","material":"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWE="}]
+```
+
+3. Finally, you simply reference this file and the corresponding key of the property therein, from your SMT configuration like so:
+
+```json5
+{
+ //...
+ "transforms":"cipher",
+ "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
+ "transforms.cipher.cipher_mode": "ENCRYPT",
+ "transforms.cipher.cipher_data_keys": "${file:/secrets/kryptonite/classified.properties:cipher_data_keys}",
+ "transforms.cipher.cipher_data_key_name": "my-demo-secret-key-123",
+ "transforms.cipher.cipher_data_key_version": "123",
+ "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
+ "transforms.cipher.field_mode": "OBJECT",
+ //...
+}
+```
+
+In case you want to learn more about configuration parameter externalization there is e.g. this nice
+[blog post](https://debezium.io/blog/2019/12/13/externalized-secrets/) from the Debezium team showing
+how to externalize username and password settings using a docker-compose example.
+
+### GCP Integrations
+
+You can use the GCP Secret Manager to manage your secret keys.
+The CipherField SMT can be configured as follows:
+
+```json5
+{
+ //...
+ "transforms":"cipher",
+ "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
+ "transforms.cipher.key_source": "GCP_SECRET_MANAGER",
+ "transforms.cipher.cipher_mode": "ENCRYPT",
+ "transforms.cipher.cipher_data_key_name": "projects/YOUR_PROJECT_NUMBER/secrets/YOUR_SECRET_NAME",
+ "transforms.cipher.cipher_data_key_version": "3",
+ "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
+ "transforms.cipher.field_mode": "OBJECT",
+ //...
+}
+```
+
+Specify `GCP_SECRET_MANAGER` for `key_source`, and specify the secret name and version of the Secret Manager to be used by default for
+`cipher_data_key_name` and `cipher_data_key_version`. It is assumed that the Secret Manager stores base64-encoded secret keys.
+It retrieves all valid versions of the secret specified for default use at startup and caches them in memory.
+Cache expiration can be set with `cipher_data_key_cache_expiry_duration` and `cipher_data_key_cache_expiry_duration_unit`.
+The default is 24 hours. When the cache expires, the secret is evicted and automatically cached again the next time it is accessed.
+When encrypting, the default secret version is used, or the matching secret version if specified in `field_config`.
+When decrypting, the secret key that matches the version prefix of the encrypted data will be used automatically.
+If there is no version number prefix, the default or the secret specified in `field_config` will be used.
+
+Rotating the secret key is simply a matter of registering a new secret version and updating the secret version used by default.
+Since the secret version is automatically selected for decryption, data encrypted with an older version of the secret key can be decrypted,
+unless the older version of the secret is disabled.
+
+Secret keys stored in the Secret Manager can also be encrypted with the Cloud KMS.
+Use the Cloud KMS for the key encryption key (KEK) and the Secret Manager for the data encryption key (DEK).
+See [Envelope encryption](https://cloud.google.com/kms/docs/envelope-encryption) for details.
+The CipherField SMT can be configured as follows:
+
+```json5
+{
+ //...
+ "transforms":"cipher",
+ "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
+ "transforms.cipher.key_source": "GCP_SECRET_MANAGER_WITH_KMS",
+ "transforms.cipher.cipher_mode": "ENCRYPT",
+ "transforms.cipher.kms_key_name": "projects/YOUR_PROJECT/locations/YOUR_LOCATION/keyRings/YOUR_KEY_RING/cryptoKeys/YOUR_KEY",
+ "transforms.cipher.cipher_data_key_name": "projects/YOUR_PROJECT_NUMBER/secrets/YOUR_SECRET_NAME",
+ "transforms.cipher.cipher_data_key_version": "3",
+ "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
+ "transforms.cipher.field_mode": "OBJECT",
+ //...
+}
+```
+
+Specify `GCP_SECRET_MANAGER_WITH_KMS` for `key_source`, and specify the name of the Cloud KMS key for `kms_key_name`.
+The basic behavior is the same as when `GCP_SECRET_MANAGER` is specified,
+but the Cloud KMS will decrypt the key when it retrieves the key stored in the Secret Manager at startup.
+
+This can be used when you do not want to store the raw secret key in the Secret Manager.
+Also, depending on the configuration, it is possible to automate the key encription key (KEK) rotation.
+
+### Build, installation / deployment
+
+This project can be built from source via Maven, or you can download the package from the
+GitHub [release page](https://github.com/kouzoh/kafka-connect-transform-kryptonite-gcp/releases).
+
+In order to deploy it you simply put the jar into a _'plugin path'_ that is configured to be scanned by your Kafka Connect worker nodes.
+
+After that, configure Kryptonite as transformation for any of your source / sink connectors, sit back and relax!
+Happy _'binge watching'_ plenty of ciphertexts ;-)
+
+### Cipher algorithm specifics
+
+Kryptonite currently provides a single cipher algorithm, namely, AES in GCM mode.
+It offers so-called _authenticated encryption with associated data_ (AEAD).
+
+By design, every application of Kryptonite on a specific record field results in different ciphertexts for one and the same plaintext.
+This is in general not only desirable but very important to make attacks harder.
+However, in the context of Kafka Connect records this has an unfavorable consequence for source connectors.
+**Applying the SMT on a source record's key would result in a 'partition mix-up'**
+because records with the same original plaintext key would end up in different topic partitions.
+In other words, **do NOT(!) use Kryptonite for
+source record keys** at the moment. There are plans in place to do away with this restriction and extend Kryptonite with a deterministic mode.
+This could then safely support the encryption of record keys while at the same time keep topic partitioning and record ordering intact.
+
+## Contribution
+
+Please read the CLA carefully before submitting your contribution to Mercari. Under any circumstances, by submitting your contribution, you are deemed to accept and agree to be bound by the terms and conditions of the CLA.
+
+https://www.mercari.com/cla/
+
+## License Information
+
+This project is licensed according to [Apache License Version 2.0](https://www.apache.org/licenses/LICENSE-2.0)
+
+```
+Copyright (c) 2021. Hans-Peter Grahsl (grahslhp@gmail.com)
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+```
diff --git a/kafka-connect-transform-kryptonite-gcp/pom.xml b/kafka-connect-transform-kryptonite-gcp/pom.xml
new file mode 100644
index 0000000..1836f0c
--- /dev/null
+++ b/kafka-connect-transform-kryptonite-gcp/pom.xml
@@ -0,0 +1,80 @@
+
+
+ 4.0.0
+
+
+ com.github.hpgrahsl.kafka.connect
+ kafka-connect-transform-kryptonite-parent
+ 0.3.1-SNAPSHOT
+
+ kafka-connect-transform-kryptonite-gcp
+ jar
+
+ kafka-connect-transform-kryptonite-gcp
+
+
+
+ com.github.hpgrahsl.kafka.connect
+ kryptonite
+ ${project.version}
+
+
+ org.apache.kafka
+ connect-api
+ ${kafka.version}
+ provided
+
+
+ org.apache.kafka
+ connect-transforms
+ ${kafka.version}
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
+
+
+ com.esotericsoftware
+ kryo
+ ${kryo.version}
+
+
+ ch.qos.logback
+ logback-core
+ ${logback.version}
+ test
+
+
+ ch.qos.logback
+ logback-classic
+ ${logback.version}
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ maven-surefire-plugin
+ ${surefire.plugin.version}
+
+
+ com.coveo
+ fmt-maven-plugin
+
+
+
+
+
diff --git a/kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/CipherField.java b/kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/CipherField.java
new file mode 100644
index 0000000..f8e18ee
--- /dev/null
+++ b/kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/CipherField.java
@@ -0,0 +1,390 @@
+/*
+ * Copyright (c) 2021. Hans-Peter Grahsl (grahslhp@gmail.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.github.hpgrahsl.kafka.connect.transforms.kryptonite;
+
+import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
+import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.hpgrahsl.kafka.connect.transforms.kryptonite.serdes.KryoSerdeProcessor;
+import com.github.hpgrahsl.kafka.connect.transforms.kryptonite.serdes.SerdeProcessor;
+import com.github.hpgrahsl.kafka.connect.transforms.kryptonite.validators.CipherDataKeysValidator;
+import com.github.hpgrahsl.kafka.connect.transforms.kryptonite.validators.CipherEncodingValidator;
+import com.github.hpgrahsl.kafka.connect.transforms.kryptonite.validators.CipherModeValidator;
+import com.github.hpgrahsl.kafka.connect.transforms.kryptonite.validators.CipherNameValidator;
+import com.github.hpgrahsl.kafka.connect.transforms.kryptonite.validators.FieldConfigValidator;
+import com.github.hpgrahsl.kafka.connect.transforms.kryptonite.validators.FieldModeValidator;
+import com.github.hpgrahsl.kafka.connect.transforms.kryptonite.validators.KeySourceValidator;
+import com.github.hpgrahsl.kafka.connect.transforms.kryptonite.validators.TimeUnitValidator;
+import com.github.hpgrahsl.kryptonite.CipherMode;
+import com.github.hpgrahsl.kryptonite.ConfigDataKeyVault;
+import com.github.hpgrahsl.kryptonite.GcpSecretManagerKeyVault;
+import com.github.hpgrahsl.kryptonite.Kryptonite;
+import com.github.hpgrahsl.kryptonite.NoOpKeyStrategy;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.NonEmptyString;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class CipherField> implements Transformation {
+
+ public enum FieldMode {
+ ELEMENT,
+ OBJECT
+ }
+
+ public enum KeySource {
+ CONFIG,
+ GCP_SECRET_MANAGER,
+ GCP_SECRET_MANAGER_WITH_KMS
+ }
+
+ public static final String OVERVIEW_DOC =
+ "Encrypt/Decrypt specified record fields with AEAD cipher."
+ + "The transformation should currently only be used for the record value ("
+ + CipherField.Value.class.getName()
+ + ")."
+ + "Future versions will support a dedicated 'mode of operation' applicable also to the record key ("
+ + CipherField.Key.class.getName()
+ + ") or value .";
+
+ public static final String FIELD_CONFIG = "field_config";
+ public static final String PATH_DELIMITER = "path_delimiter";
+ public static final String FIELD_MODE = "field_mode";
+ public static final String CIPHER_ALGORITHM = "cipher_algorithm";
+ public static final String CIPHER_DATA_KEY_NAME = "cipher_data_key_name";
+ public static final String CIPHER_DATA_KEY_VERSION = "cipher_data_key_version";
+ public static final String CIPHER_DATA_KEY_CACHE_EXPIRY_DURATION =
+ "cipher_data_key_cache_expiry_duration";
+ public static final String CIPHER_DATA_KEY_CACHE_EXPIRY_DURATION_UNIT =
+ "cipher_data_key_cache_expiry_duration_unit";
+ public static final String CIPHER_DATA_KEYS = "cipher_data_keys";
+ public static final String CIPHER_TEXT_ENCODING = "cipher_text_encoding";
+ public static final String CIPHER_MODE = "cipher_mode";
+ public static final String KEY_SOURCE = "key_source";
+ public static final String KMS_KEY_NAME = "kms_key_name";
+
+ private static final String PATH_DELIMITER_DEFAULT = ".";
+ private static final String FIELD_MODE_DEFAULT = "ELEMENT";
+ private static final String CIPHER_ALGORITHM_DEFAULT = "AES/GCM/NoPadding";
+ private static final long CIPHER_DATA_KEY_CACHE_EXPIRY_DURATION_DEFAULT = 24L;
+ private static final String CIPHER_DATA_KEY_CACHE_EXPIRY_DURATION_UNIT_DEFAULT = "HOURS";
+ private static final String CIPHER_TEXT_ENCODING_DEFAULT = "base64";
+ private static final String KEY_SOURCE_DEFAULT = "CONFIG";
+ private static final String CIPHER_DATA_KEYS_DEFAULT = "[]";
+ private static final String KMS_KEY_NAME_DEFAULT = null;
+
+ public static final ConfigDef CONFIG_DEF =
+ new ConfigDef()
+ .define(
+ FIELD_CONFIG,
+ Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ new FieldConfigValidator(),
+ Importance.HIGH,
+ "JSON array with field config objects specifying which fields together with their settings "
+ + "should get either encrypted / decrypted (nested field names are expected to be separated by '.' "
+ + "per default, or by a custom 'path_delimiter' config")
+ .define(
+ PATH_DELIMITER,
+ Type.STRING,
+ PATH_DELIMITER_DEFAULT,
+ new NonEmptyString(),
+ Importance.LOW,
+ "path delimiter used as field name separator when referring to nested fields "
+ + "in the input record")
+ .define(
+ FIELD_MODE,
+ Type.STRING,
+ FIELD_MODE_DEFAULT,
+ new FieldModeValidator(),
+ Importance.MEDIUM,
+ "defines how to process complex field types (maps, lists, structs), either as full objects "
+ + "or element-wise")
+ .define(
+ CIPHER_ALGORITHM,
+ Type.STRING,
+ CIPHER_ALGORITHM_DEFAULT,
+ new CipherNameValidator(),
+ Importance.LOW,
+ "cipher algorithm used for data encryption (currently supports only one AEAD cipher: "
+ + CIPHER_ALGORITHM_DEFAULT
+ + ")")
+ .define(
+ CIPHER_DATA_KEYS,
+ Type.PASSWORD,
+ CIPHER_DATA_KEYS_DEFAULT,
+ new CipherDataKeysValidator(),
+ Importance.MEDIUM,
+ "JSON array with data key objects specifying the key name, key version and base64 encoded "
+ + "key bytes used for encryption / decryption")
+ .define(
+ CIPHER_DATA_KEY_NAME,
+ Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ new NonEmptyString(),
+ Importance.HIGH,
+ "secret key name to be used as default data encryption key for all fields which don't refer to "
+ + "a field-specific secret key name")
+ .define(
+ CIPHER_DATA_KEY_VERSION,
+ Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ new NonEmptyString(),
+ Importance.HIGH,
+ "secret key version to be used as default data encryption key for all fields which don't refer to "
+ + "a field-specific secret key version")
+ .define(
+ CIPHER_DATA_KEY_CACHE_EXPIRY_DURATION,
+ Type.LONG,
+ CIPHER_DATA_KEY_CACHE_EXPIRY_DURATION_DEFAULT,
+ Importance.LOW,
+ "defines the expiration duration of the secret key cache")
+ .define(
+ CIPHER_DATA_KEY_CACHE_EXPIRY_DURATION_UNIT,
+ Type.STRING,
+ CIPHER_DATA_KEY_CACHE_EXPIRY_DURATION_UNIT_DEFAULT,
+ new TimeUnitValidator(),
+ Importance.LOW,
+ "defines the unit of expiration duration of the private key cache")
+ .define(
+ CIPHER_TEXT_ENCODING,
+ Type.STRING,
+ CIPHER_TEXT_ENCODING_DEFAULT,
+ new CipherEncodingValidator(),
+ ConfigDef.Importance.LOW,
+ "defines the encoding of the resulting ciphertext bytes (currently only supports 'base64')")
+ .define(
+ CIPHER_MODE,
+ Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ new CipherModeValidator(),
+ ConfigDef.Importance.HIGH,
+ "defines whether the data should get encrypted or decrypted")
+ .define(
+ KEY_SOURCE,
+ Type.STRING,
+ KEY_SOURCE_DEFAULT,
+ new KeySourceValidator(),
+ ConfigDef.Importance.HIGH,
+ "defines the origin of the secret key material (currently supports keys specified in the config or "
+ + "gcp secret manager)")
+ .define(
+ KMS_KEY_NAME,
+ Type.STRING,
+ KMS_KEY_NAME_DEFAULT,
+ Importance.MEDIUM,
+ "The GCP Cloud KMS key name for decrypting a data encryption key (DEK), "
+ + "if the DEK is encrypted with a key encryption key (KEK)");
+
+ private static final String PURPOSE = "(de)cipher record fields";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CipherField.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private RecordHandler recordHandlerWithSchema;
+ private RecordHandler recordHandlerWithoutSchema;
+ private SchemaRewriter schemaRewriter;
+ private Cache schemaCache;
+
+ @Override
+ public R apply(R record) {
+ if (operatingSchema(record) == null) {
+ return processWithoutSchema(record);
+ } else {
+ return processWithSchema(record);
+ }
+ }
+
+ public R processWithoutSchema(R record) {
+ LOGGER.debug("processing schemaless data");
+ Map valueMap = requireMap(operatingValue(record), PURPOSE);
+ Map updatedValueMap = new LinkedHashMap<>(valueMap);
+ recordHandlerWithoutSchema.matchFields(null, valueMap, null, updatedValueMap, "");
+ return newRecord(record, null, updatedValueMap);
+ }
+
+ public R processWithSchema(R record) {
+ LOGGER.debug("processing schema-aware data");
+ Struct valueStruct = requireStruct(operatingValue(record), PURPOSE);
+ Schema updatedSchema = schemaCache.get(valueStruct.schema());
+ if (updatedSchema == null) {
+ LOGGER.debug("adapting schema because record's schema not present in cache");
+ updatedSchema = schemaRewriter.adaptSchema(valueStruct.schema(), "");
+ schemaCache.put(valueStruct.schema(), updatedSchema);
+ }
+ Struct updatedValueStruct = new Struct(updatedSchema);
+ recordHandlerWithSchema.matchFields(
+ valueStruct.schema(), valueStruct, updatedSchema, updatedValueStruct, "");
+ return newRecord(record, updatedSchema, updatedValueStruct);
+ }
+
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void configure(Map props) {
+ try {
+ SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
+ Map fieldPathMap =
+ OBJECT_MAPPER
+ .readValue(config.getString(FIELD_CONFIG), new TypeReference>() {})
+ .stream()
+ .collect(Collectors.toMap(FieldConfig::getName, Function.identity()));
+ Kryptonite kryptonite = configureKryptonite(config);
+ SerdeProcessor serdeProcessor = new KryoSerdeProcessor();
+ recordHandlerWithSchema =
+ new SchemaawareRecordHandler(
+ config,
+ serdeProcessor,
+ kryptonite,
+ CipherMode.valueOf(config.getString(CIPHER_MODE)),
+ fieldPathMap);
+ recordHandlerWithoutSchema =
+ new SchemalessRecordHandler(
+ config,
+ serdeProcessor,
+ kryptonite,
+ CipherMode.valueOf(config.getString(CIPHER_MODE)),
+ fieldPathMap);
+ schemaRewriter =
+ new SchemaRewriter(
+ fieldPathMap,
+ FieldMode.valueOf(config.getString(FIELD_MODE)),
+ CipherMode.valueOf(config.getString(CIPHER_MODE)),
+ config.getString(PATH_DELIMITER));
+ schemaCache = new SynchronizedCache<>(new LRUCache<>(16));
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ throw new ConfigException(e.getMessage());
+ }
+ }
+
+ private Kryptonite configureKryptonite(SimpleConfig config) {
+ try {
+ KeySource keySource = KeySource.valueOf(config.getString(KEY_SOURCE));
+ switch (keySource) {
+ case CONFIG:
+ Set dataKeyConfig =
+ OBJECT_MAPPER.readValue(
+ config.getPassword(CIPHER_DATA_KEYS).value(),
+ new TypeReference>() {});
+ Map configKeyMap =
+ dataKeyConfig.stream()
+ .collect(
+ Collectors.toMap(DataKeyConfig::getIdentifier, DataKeyConfig::getKeyBytes));
+ return new Kryptonite(new ConfigDataKeyVault(configKeyMap));
+ case GCP_SECRET_MANAGER:
+ return new Kryptonite(
+ new GcpSecretManagerKeyVault(
+ config.getString(CIPHER_DATA_KEY_NAME),
+ new NoOpKeyStrategy(),
+ config.getLong(CIPHER_DATA_KEY_CACHE_EXPIRY_DURATION),
+ config.getString(CIPHER_DATA_KEY_CACHE_EXPIRY_DURATION_UNIT)));
+ case GCP_SECRET_MANAGER_WITH_KMS:
+ return new Kryptonite(
+ new GcpSecretManagerKeyVault(
+ config.getString(CIPHER_DATA_KEY_NAME), config.getString(KMS_KEY_NAME)));
+ default:
+ throw new ConfigException(
+ "failed to configure kryptonite instance due to invalid key source");
+ }
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ throw new ConfigException(e.getMessage(), e);
+ }
+ }
+
+ protected abstract Schema operatingSchema(R record);
+
+ protected abstract Object operatingValue(R record);
+
+ protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
+
+ public static final class Key> extends CipherField {
+
+ @Override
+ protected Schema operatingSchema(R record) {
+ return record.keySchema();
+ }
+
+ @Override
+ protected Object operatingValue(R record) {
+ return record.key();
+ }
+
+ @Override
+ protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+ return record.newRecord(
+ record.topic(),
+ record.kafkaPartition(),
+ updatedSchema,
+ updatedValue,
+ record.valueSchema(),
+ record.value(),
+ record.timestamp());
+ }
+ }
+
+ public static final class Value> extends CipherField {
+
+ @Override
+ protected Schema operatingSchema(R record) {
+ return record.valueSchema();
+ }
+
+ @Override
+ protected Object operatingValue(R record) {
+ return record.value();
+ }
+
+ @Override
+ protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
+ return record.newRecord(
+ record.topic(),
+ record.kafkaPartition(),
+ record.keySchema(),
+ record.key(),
+ updatedSchema,
+ updatedValue,
+ record.timestamp());
+ }
+ }
+}
diff --git a/kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/DataKeyConfig.java b/kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/DataKeyConfig.java
new file mode 100644
index 0000000..e774102
--- /dev/null
+++ b/kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/DataKeyConfig.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2021. Hans-Peter Grahsl (grahslhp@gmail.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.github.hpgrahsl.kafka.connect.transforms.kryptonite;
+
+import com.github.hpgrahsl.kryptonite.FieldMetaData;
+import java.util.Base64;
+import java.util.Objects;
+
+public class DataKeyConfig {
+
+ private String name;
+ private String version;
+ private String material = "";
+
+ public DataKeyConfig() {}
+
+ public DataKeyConfig(String name, String version, String material) {
+ this.name = name;
+ this.version = version;
+ this.material = material;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public String getMaterial() {
+ return material;
+ }
+
+ public String getIdentifier() {
+ return String.join(FieldMetaData.IDENTIFIER_DELIMITER_DEFAULT, name, version);
+ }
+
+ public byte[] getKeyBytes() {
+ return Base64.getDecoder().decode(material);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DataKeyConfig)) {
+ return false;
+ }
+ DataKeyConfig that = (DataKeyConfig) o;
+ return Objects.equals(getIdentifier(), that.getIdentifier())
+ && Objects.equals(material, that.material);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getIdentifier(), material);
+ }
+
+ @Override
+ public String toString() {
+ return "DataKeyConfig{"
+ + "identifier='"
+ + getIdentifier()
+ + "'}";
+ }
+}
diff --git a/kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/FieldConfig.java b/kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/FieldConfig.java
new file mode 100644
index 0000000..29e860c
--- /dev/null
+++ b/kafka-connect-transform-kryptonite-gcp/src/main/java/com/github/hpgrahsl/kafka/connect/transforms/kryptonite/FieldConfig.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2021. Hans-Peter Grahsl (grahslhp@gmail.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.github.hpgrahsl.kafka.connect.transforms.kryptonite;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class FieldConfig {
+
+ private String name;
+ private String algorithm;
+ private String keyName;
+ private String keyVersion;
+ private Map schema;
+
+ public FieldConfig() {}
+
+ public FieldConfig(
+ String name,
+ String algorithm,
+ String keyName,
+ String keyVersion,
+ Map schema) {
+ this.name = Objects.requireNonNull(name, "field config's name must not be null");
+ this.algorithm = algorithm;
+ this.keyName = keyName;
+ this.keyVersion = keyVersion;
+ this.schema = schema;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Optional getAlgorithm() {
+ return Optional.ofNullable(algorithm);
+ }
+
+ public Optional getKeyName() {
+ return Optional.ofNullable(keyName);
+ }
+
+ public Optional getKeyVersion() {
+ return Optional.ofNullable(keyVersion);
+ }
+
+ public Optional