Skip to content

Commit

Permalink
Merge pull request #18 from aiven/hash-encoding
Browse files Browse the repository at this point in the history
Encodes hashes hexadecimally instead of Base64
  • Loading branch information
willyborankin authored Jun 10, 2020
2 parents 747b4d6 + b4af4ab commit c5a1eb4
Show file tree
Hide file tree
Showing 5 changed files with 892 additions and 25 deletions.
6 changes: 4 additions & 2 deletions src/main/java/io/aiven/kafka/connect/transforms/Hash.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Map;
import java.util.Optional;

Expand All @@ -32,6 +31,8 @@
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.Transformation;

import io.aiven.kafka.connect.transforms.utils.Hex;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -178,8 +179,9 @@ private Optional<Object> getNewValueWithoutFieldName(final String recordStr,
}

private String hashString(final String string) {
// We don't call reset() here because digest() does resetting afterwards.
final byte[] digest = messageDigest.digest(string.getBytes());
return Base64.getEncoder().encodeToString(digest);
return Hex.encode(digest);
}

public static class Key<R extends ConnectRecord<R>> extends Hash<R> {
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/io/aiven/kafka/connect/transforms/utils/Hex.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2020 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.transforms.utils;

public class Hex {
private static final char[] HEX_ARRAY = "0123456789abcdef".toCharArray();

/**
* Encodes a byte array as a hexadecimal string.
* @implNote https://stackoverflow.com/a/9855338/1781549
*/
public static String encode(final byte[] bytes) {
final char[] hexChars = new char[bytes.length * 2];
for (int j = 0; j < bytes.length; j++) {
final int v = bytes[j] & 0xFF;
hexChars[j * 2] = HEX_ARRAY[v >>> 4]; // hi nibble
hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F]; // lo nibble
}
return new String(hexChars);
}
}
64 changes: 41 additions & 23 deletions src/test/java/io/aiven/kafka/connect/transforms/HashTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package io.aiven.kafka.connect.transforms;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -40,7 +37,32 @@ abstract class HashTest {

private static final String FIELD = "email";
private static final String EMPTY_FIELD_VALUE = "";
private static final String NON_EMPTY_FIELD_VALUE = "jerry@all_your_bases.com";
private static final String NON_EMPTY_FIELD_VALUE = "jerry@big-corp.com";

private static final Map<String, Map<String, String>> HASHED_VALUES = new HashMap<>();

static {
HASHED_VALUES.put("md5", new HashMap<>());
// echo -n "" | md5sum -t
HASHED_VALUES.get("md5").put(EMPTY_FIELD_VALUE, "d41d8cd98f00b204e9800998ecf8427e");
// echo -n "jerry@big-corp.com" | md5sum -t
HASHED_VALUES.get("md5").put(NON_EMPTY_FIELD_VALUE, "10e5756d5d4c9c1cadd5e1b952071378");

HASHED_VALUES.put("sha1", new HashMap<>());
// echo -n "" | sha1sum -t
HASHED_VALUES.get("sha1").put(EMPTY_FIELD_VALUE, "da39a3ee5e6b4b0d3255bfef95601890afd80709");
// echo -n "jerry@big-corp.com" | sha1sum -t
HASHED_VALUES.get("sha1").put(NON_EMPTY_FIELD_VALUE, "dd9ab6e93603bf618db0894a82da64f1623a94b6");

HASHED_VALUES.put("sha256", new HashMap<>());
// echo -n "" | sha256sum -t
HASHED_VALUES.get("sha256").put(EMPTY_FIELD_VALUE,
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855");
// echo -n "jerry@big-corp.com" | sha256sum -t
HASHED_VALUES.get("sha256").put(NON_EMPTY_FIELD_VALUE,
"20e85b05e7349963fc64746fbc7f3f4fdf31507921360847ebef333b229cf2d6");
}

private static final String DEFAULT_HASH_FUNCTION = HashConfig.HashFunction.SHA256.toString();
private static final String UNAFFECTED_FIELD = "name";
private static final String UNAFFECTED_FIELD_VALUE = "jerry";
Expand Down Expand Up @@ -232,6 +254,20 @@ void fieldName_EmptyStringValue(final String hashFunction) {
assertEquals(setNewValue(originalRecord, newValue), result);
}

@ParameterizedTest
@ValueSource(strings = {"md5", "sha1", "sha256"})
void sameValueSameHash(final String hashFunction) {
final Schema schema = SchemaBuilder.STRING_SCHEMA;
final Hash<SinkRecord> transform = transformation(null, false, hashFunction);

for (int i = 0; i < 10; i++) {
final SinkRecord originalRecord = record(schema, NON_EMPTY_FIELD_VALUE);
final SinkRecord result = transform.apply(originalRecord);
final String newValue = hash(hashFunction, NON_EMPTY_FIELD_VALUE);
assertEquals(setNewValue(originalRecord, newValue), result);
}
}

private Hash<SinkRecord> transformation(
final String fieldName,
final boolean skipMissingOrNull,
Expand Down Expand Up @@ -277,24 +313,6 @@ private SinkRecord setNewValue(final SinkRecord record, final Object newValue) {
}

private String hash(final String function, final String value) {
try {
final MessageDigest md;
switch (function) {
case "md5":
md = MessageDigest.getInstance("MD5");
break;
case "sha1":
md = MessageDigest.getInstance("SHA1");
break;
case "sha256":
md = MessageDigest.getInstance("SHA-256");
break;
default:
throw new IllegalArgumentException(function);
}
return Base64.getEncoder().encodeToString(md.digest(value.getBytes()));
} catch (final NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
return HASHED_VALUES.get(function).get(value);
}
}
68 changes: 68 additions & 0 deletions src/test/java/io/aiven/kafka/connect/transforms/utils/HexTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2020 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.transforms.utils;

import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class HexTest {
@Test
void testEncodeEmpty() {
final byte[] bytes = new byte[0];
assertEquals("", Hex.encode(bytes));
}

@Test
void testEncodeSingleByte() {
final byte[] bytes = new byte[1];
for (int i = 0; i < 256; i++) {
final byte b = (byte) i;
bytes[0] = b;
assertEquals(String.format("%02x", b), Hex.encode(bytes));
}
}

@Test
void testEncodeFromStrings() throws IOException, URISyntaxException {
final URL resource = getClass().getClassLoader().getResource("blns.txt");
final List<String> strings = Files.readAllLines(Paths.get(resource.toURI()));
for (final String s : strings) {
// Use the string as a byte array and hex-encode it.
final byte[] bytes = s.getBytes(Charset.defaultCharset());
final String encoded = Hex.encode(bytes);
assertEquals(bytes.length * 2, encoded.length());

// Decode the string back and compare to the original.
final char[] encodedChars = encoded.toCharArray();
final byte[] decodedBytes = new byte[bytes.length];
for (int i = 0; i < encoded.length(); i += 2) {
final String s1 = new String(encodedChars, i, 2);
decodedBytes[i / 2] = (byte) Integer.parseInt(s1, 16);
}
assertEquals(new String(decodedBytes, Charset.defaultCharset()), s);
}
}
}
Loading

0 comments on commit c5a1eb4

Please sign in to comment.