Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Common Jsons: add flag to apply flatten to arrays #20993

Merged
merged 25 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
25d955c
add flag to apply flatten to arrays
marcosmarxm Jun 3, 2022
56d79f4
run format change docstirng
marcosmarxm Jun 3, 2022
a2838a2
fix recursion
adam-bloom Sep 27, 2022
cf38f73
add additional unit test cases for array flattening
adam-bloom Sep 27, 2022
52d05c8
add backward compatibility function
adam-bloom Sep 27, 2022
e81a5f1
bump dest-redshift version and add changelog
adam-bloom Sep 27, 2022
682c55e
format
marcosmarxm Oct 14, 2022
8954484
Merge branch 'master' into correct-flatten-json-function
marcosmarxm Dec 8, 2022
ddfe66b
bump connector version
marcosmarxm Dec 8, 2022
a23d9bc
fix pmd test
marcosmarxm Dec 8, 2022
f4de2eb
Merge branch 'master' into correct-flatten-json-function
marcosmarxm Dec 8, 2022
319c636
Merge branch 'master' into correct-flatten-json-function
davinchia Dec 23, 2022
55b9bd3
Merge branch 'master' into correct-flatten-json-function
grishick Dec 28, 2022
0e55dd5
use original method signature
adam-bloom Dec 29, 2022
9f6f01e
Merge remote-tracking branch 'upstream/master' into correct-flatten-j…
adam-bloom Jan 3, 2023
8f4da88
Merge branch 'master' into correct-flatten-json-function
grishick Jan 3, 2023
95b2fbf
Merge branch 'master' into greg/flatten-json-function
grishick Jan 3, 2023
da2475d
Merge branch 'master' into greg/flatten-json-function
grishick Jan 3, 2023
87f695f
Merge branch 'master' into greg/flatten-json-function
grishick Jan 3, 2023
b86ed1d
Merge branch 'master' into greg/flatten-json-function
grishick Jan 3, 2023
3ea9f68
Merge branch 'master' into greg/flatten-json-function
grishick Jan 3, 2023
c70cbec
Merge branch 'master' into greg/flatten-json-function
grishick Jan 3, 2023
f16f8f9
Merge branch 'master' into greg/flatten-json-function
grishick Jan 4, 2023
e1c5cc4
Merge branch 'master' into greg/flatten-json-function
grishick Jan 4, 2023
856e7bc
Merge branch 'master' into greg/flatten-json-function
grishick Jan 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,29 @@ public static int getIntOrZero(final JsonNode json, final List<String> keys) {
}

/**
* Flattens an ObjectNode, or dumps it into a {null: value} map if it's not an object.
* Flattens an ObjectNode, or dumps it into a {null: value} map if it's not an object. When
* applyFlattenToArray is true, each element in the array will be one entry in the returned map.
* This behavior is used in the Redshift SUPER type. When it is false, the whole array will be one
* entry. This is used in the JobTracker.
*/
@SuppressWarnings("PMD.ForLoopCanBeForeach")
public static Map<String, Object> flatten(final JsonNode node) {
public static Map<String, Object> flatten(final JsonNode node, final Boolean applyFlattenToArray) {
if (node.isObject()) {
final Map<String, Object> output = new HashMap<>();
for (final Iterator<Entry<String, JsonNode>> it = node.fields(); it.hasNext();) {
final Entry<String, JsonNode> entry = it.next();
final String field = entry.getKey();
final JsonNode value = entry.getValue();
mergeMaps(output, field, flatten(value));
mergeMaps(output, field, flatten(value, applyFlattenToArray));
}
return output;
} else if (node.isArray() && applyFlattenToArray) {
final Map<String, Object> output = new HashMap<>();
final int arrayLen = node.size();
for (int i = 0; i < arrayLen; i++) {
final String field = String.format("[%d]", i);
final JsonNode value = node.get(i);
mergeMaps(output, field, flatten(value, applyFlattenToArray));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious if it is always true that nested arrays are flattened?

}
return output;
} else {
Expand All @@ -286,6 +298,15 @@ public static Map<String, Object> flatten(final JsonNode node) {
}
}

/**
* Flattens an ObjectNode, or dumps it into a {null: value} map if it's not an object. New usage of
* this function is best to explicitly declare the intended array mode. This version is provided for
* backward compatibility.
*/
public static Map<String, Object> flatten(final JsonNode node) {
return flatten(node, false);
}

/**
* Prepend all keys in subMap with prefix, then merge that map into originalMap.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;

class JsonsTest {
Expand All @@ -31,6 +34,11 @@ class JsonsTest {
private static final String SERIALIZED_JSON2 = "{\"str\":\"abc\"}";
private static final String ABC = "abc";
private static final String DEF = "def";
private static final String GHI = "ghi";
private static final String JKL = "jkl";
private static final String MNO = "mno";
private static final String PQR = "pqr";
private static final String STU = "stu";
private static final String TEST = "test";
private static final String TEST2 = "test2";
private static final String XYZ = "xyz";
Expand Down Expand Up @@ -60,7 +68,8 @@ void testSerializeJsonNode() {
Jsons.serialize(Jsons.jsonNode(ImmutableMap.of(
TEST, ABC,
TEST2, DEF))));
// issue: 5878 add test for binary node serialization, binary data are serialized into base64
// issue: 5878 add test for binary node serialization, binary data are
// serialized into base64
assertEquals(
"{\"test\":\"dGVzdA==\"}",
Jsons.serialize(Jsons.jsonNode(ImmutableMap.of(
Expand All @@ -83,7 +92,8 @@ void testDeserializeToJsonNode() {
assertEquals(
"[{\"str\":\"abc\"},{\"str\":\"abc\"}]",
Jsons.deserialize("[{\"str\":\"abc\"},{\"str\":\"abc\"}]").toString());
// issue: 5878 add test for binary node deserialization, for now should be base64 string
// issue: 5878 add test for binary node deserialization, for now should be
// base64 string
assertEquals(
"{\"test\":\"dGVzdA==\"}",
Jsons.deserialize("{\"test\":\"dGVzdA==\"}").toString());
Expand Down Expand Up @@ -230,26 +240,27 @@ void testToPrettyString() {

@Test
void testGetOptional() {
final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": {}, \"mno\": \"pqr\", \"stu\": null }");
final JsonNode json = Jsons
.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": {}, \"mno\": \"pqr\", \"stu\": null }");

assertEquals(Optional.of(Jsons.jsonNode("ghi")), Jsons.getOptional(json, "abc", "def"));
assertEquals(Optional.of(Jsons.emptyObject()), Jsons.getOptional(json, "jkl"));
assertEquals(Optional.of(Jsons.jsonNode("pqr")), Jsons.getOptional(json, "mno"));
assertEquals(Optional.of(Jsons.jsonNode(null)), Jsons.getOptional(json, "stu"));
assertEquals(Optional.of(Jsons.jsonNode(GHI)), Jsons.getOptional(json, ABC, DEF));
assertEquals(Optional.of(Jsons.emptyObject()), Jsons.getOptional(json, JKL));
assertEquals(Optional.of(Jsons.jsonNode(PQR)), Jsons.getOptional(json, MNO));
assertEquals(Optional.of(Jsons.jsonNode(null)), Jsons.getOptional(json, STU));
assertEquals(Optional.empty(), Jsons.getOptional(json, XYZ));
assertEquals(Optional.empty(), Jsons.getOptional(json, ABC, XYZ));
assertEquals(Optional.empty(), Jsons.getOptional(json, ABC, DEF, XYZ));
assertEquals(Optional.empty(), Jsons.getOptional(json, ABC, "jkl", XYZ));
assertEquals(Optional.empty(), Jsons.getOptional(json, "stu", XYZ));
assertEquals(Optional.empty(), Jsons.getOptional(json, ABC, JKL, XYZ));
assertEquals(Optional.empty(), Jsons.getOptional(json, STU, XYZ));
}

@Test
void testGetStringOrNull() {
final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": \"mno\", \"pqr\": 1 }");

assertEquals("ghi", Jsons.getStringOrNull(json, ABC, DEF));
assertEquals("mno", Jsons.getStringOrNull(json, "jkl"));
assertEquals("1", Jsons.getStringOrNull(json, "pqr"));
assertEquals(GHI, Jsons.getStringOrNull(json, ABC, DEF));
assertEquals(MNO, Jsons.getStringOrNull(json, JKL));
assertEquals("1", Jsons.getStringOrNull(json, PQR));
assertNull(Jsons.getStringOrNull(json, ABC, DEF, XYZ));
assertNull(Jsons.getStringOrNull(json, XYZ));
}
Expand All @@ -260,6 +271,68 @@ void testGetEstimatedByteSize() {
assertEquals(Jsons.toBytes(json).length, Jsons.getEstimatedByteSize(json));
}

@Test
void testFlatten__noArrays() {
final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": true, \"pqr\": 1 }");
Map<String, Object> expected = Stream.of(new Object[][] {
{"abc.def", GHI},
{JKL, true},
{PQR, 1},
}).collect(Collectors.toMap(data -> (String) data[0], data -> data[1]));
assertEquals(expected, Jsons.flatten(json, false));
}

@Test
void testFlatten__withArraysNoApplyFlatten() {
final JsonNode json = Jsons
.deserialize("{ \"abc\": [{ \"def\": \"ghi\" }, { \"fed\": \"ihg\" }], \"jkl\": true, \"pqr\": 1 }");
Map<String, Object> expected = Stream.of(new Object[][] {
{ABC, "[{\"def\":\"ghi\"},{\"fed\":\"ihg\"}]"},
{JKL, true},
{PQR, 1},
}).collect(Collectors.toMap(data -> (String) data[0], data -> data[1]));
assertEquals(expected, Jsons.flatten(json, false));
}

@Test
void testFlatten__checkBackwardCompatiblity() {
final JsonNode json = Jsons
.deserialize("{ \"abc\": [{ \"def\": \"ghi\" }, { \"fed\": \"ihg\" }], \"jkl\": true, \"pqr\": 1 }");
Map<String, Object> expected = Stream.of(new Object[][] {
{ABC, "[{\"def\":\"ghi\"},{\"fed\":\"ihg\"}]"},
{JKL, true},
{PQR, 1},
}).collect(Collectors.toMap(data -> (String) data[0], data -> data[1]));
assertEquals(expected, Jsons.flatten(json));
}

@Test
void testFlatten__withArraysApplyFlatten() {
final JsonNode json = Jsons
.deserialize("{ \"abc\": [{ \"def\": \"ghi\" }, { \"fed\": \"ihg\" }], \"jkl\": true, \"pqr\": 1 }");
Map<String, Object> expected = Stream.of(new Object[][] {
{"abc.[0].def", "ghi"},
{"abc.[1].fed", "ihg"},
{JKL, true},
{PQR, 1},
}).collect(Collectors.toMap(data -> (String) data[0], data -> data[1]));
assertEquals(expected, Jsons.flatten(json, true));
}

@Test
void testFlatten__withArraysApplyFlattenNested() {
final JsonNode json = Jsons
.deserialize(
"{ \"abc\": [{ \"def\": {\"ghi\": [\"xyz\"] }}, { \"fed\": \"ihg\" }], \"jkl\": true, \"pqr\": 1 }");
Map<String, Object> expected = Stream.of(new Object[][] {
{"abc.[0].def.ghi.[0]", "xyz"},
{"abc.[1].fed", "ihg"},
{JKL, true},
{PQR, 1},
}).collect(Collectors.toMap(data -> (String) data[0], data -> data[1]));
assertEquals(expected, Jsons.flatten(json, true));
}

private static class ToClass {

@JsonProperty("str")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.52
LABEL io.airbyte.version=0.3.53
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public boolean isValidData(final JsonNode data) {

// check VARCHAR limits for VARCHAR fields within the SUPER object, if overall object is valid
if (isValid) {
final Map<String, Object> dataMap = Jsons.flatten(data);
final Map<String, Object> dataMap = Jsons.flatten(data, true);
for (final Object value : dataMap.values()) {
if (value instanceof String stringValue) {
final int stringDataSize = stringValue.getBytes(StandardCharsets.UTF_8).length;
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.3.53 | 2023-01-03 | [\#17273](https://github.com/airbytehq/airbyte/pull/17273) | Fixed handling of arrays in SUPER maximum size check |
| 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers |
| 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling |
| 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
Expand Down