Skip to content

Commit

Permalink
[Kernel] Add and implement an JsonHandler API to atomically write t…
Browse files Browse the repository at this point in the history
…he data to JSON file (#2903)

## Description

Add the following API to `JsonHandler` which will be used when writing
the Delta Log actions to a delta file as part of Delta table write.

```
    /**
     * Serialize each {@code Row} in the iterator as JSON and write as a separate line in
     * destination file. This call either succeeds in creating the file with given contents or no
     * file is created at all. It won't leave behind a partially written file.
     * <p>
     * Following are the supported data types and their serialization rules. At a high-level, the
     * JSON serialization is similar to that of {@code jackson} JSON serializer.
     * <ul>
     *     <li>Primitive types: @code boolean, byte, short, int, long, float, double, string}</li>
     *     <li>{@code struct}: any element whose value is null is not written to file</li>
     *     <li>{@code map}: only a {@code map} with {@code string} key type is supported</li>
     *     <li>{@code array}: {@code null} value elements are written to file</li>
     * </ul>
     *
     * @param filePath Fully qualified destination file path
     * @param data     Iterator of {@link Row} objects where each row should be serialized as JSON
     *                 and written as separate line in the destination file.
     * @throws FileAlreadyExistsException if the file already exists.
     * @throws IOException                if any other I/O error occurs.
     */
    void writeJsonFileAtomically(String filePath, CloseableIterator<Row> data) throws IOException;
```
The default implementation makes use of the `LogStore` implementations
in `delta-storage` module.

## How was this patch tested?
Unittests
  • Loading branch information
vkorukanti authored Apr 19, 2024
1 parent 9d4e4f5 commit a1ddb8b
Show file tree
Hide file tree
Showing 6 changed files with 514 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.delta.kernel.client;

import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.util.Optional;

import io.delta.kernel.annotation.Evolving;
Expand Down Expand Up @@ -98,4 +99,26 @@ CloseableIterator<ColumnarBatch> readJsonFiles(
CloseableIterator<FileStatus> fileIter,
StructType physicalSchema,
Optional<Predicate> predicate) throws IOException;

/**
* Serialize each {@code Row} in the iterator as JSON and write as a separate line in
* destination file. This call either succeeds in creating the file with given contents or no
* file is created at all. It won't leave behind a partially written file.
* <p>
* Following are the supported data types and their serialization rules. At a high-level, the
* JSON serialization is similar to that of {@code jackson} JSON serializer.
* <ul>
* <li>Primitive types: @code boolean, byte, short, int, long, float, double, string}</li>
* <li>{@code struct}: any element whose value is null is not written to file</li>
* <li>{@code map}: only a {@code map} with {@code string} key type is supported</li>
* <li>{@code array}: {@code null} value elements are written to file</li>
* </ul>
*
* @param filePath Fully qualified destination file path
* @param data Iterator of {@link Row} objects where each row should be serialized as JSON
* and written as separate line in the destination file.
* @throws FileAlreadyExistsException if the file already exists.
* @throws IOException if any other I/O error occurs.
*/
void writeJsonFileAtomically(String filePath, CloseableIterator<Row> data) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.delta.kernel.test

import io.delta.kernel.client._
import io.delta.kernel.data.{ColumnVector, ColumnarBatch}
import io.delta.kernel.data.{ColumnVector, ColumnarBatch, Row}
import io.delta.kernel.expressions.{Expression, ExpressionEvaluator, Predicate, PredicateEvaluator}
import io.delta.kernel.types.{DataType, StructType}
import io.delta.kernel.utils.{CloseableIterator, FileStatus}
Expand Down Expand Up @@ -90,6 +90,9 @@ trait BaseMockJsonHandler extends JsonHandler {
physicalSchema: StructType,
predicate: Optional[Predicate]): CloseableIterator[ColumnarBatch] =
throw new UnsupportedOperationException("not supported in this test suite")

override def writeJsonFileAtomically(filePath: String, data: CloseableIterator[Row]): Unit =
throw new UnsupportedOperationException("not supported in this test suite")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.delta.storage.LogStore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

Expand All @@ -40,6 +41,8 @@

import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
import io.delta.kernel.defaults.internal.data.DefaultRowBasedColumnarBatch;
import io.delta.kernel.defaults.internal.json.JsonUtils;
import io.delta.kernel.defaults.internal.logstore.LogStoreProvider;
import io.delta.kernel.defaults.internal.types.DataTypeParser;

/**
Expand All @@ -63,15 +66,16 @@ public DefaultJsonHandler(Configuration hadoopConf) {
}

@Override
public ColumnarBatch parseJson(ColumnVector jsonStringVector, StructType outputSchema,
public ColumnarBatch parseJson(
ColumnVector jsonStringVector,
StructType outputSchema,
Optional<ColumnVector> selectionVector) {
List<Row> rows = new ArrayList<>();
for (int i = 0; i < jsonStringVector.getSize(); i++) {
boolean isSelected = !selectionVector.isPresent() ||
(!selectionVector.get().isNullAt(i) && selectionVector.get().getBoolean(i));
if (isSelected && !jsonStringVector.isNullAt(i)) {
rows.add(
parseJson(jsonStringVector.getString(i), outputSchema));
rows.add(parseJson(jsonStringVector.getString(i), outputSchema));
} else {
rows.add(null);
}
Expand All @@ -93,17 +97,16 @@ public StructType deserializeStructType(String structTypeJson) {

@Override
public CloseableIterator<ColumnarBatch> readJsonFiles(
CloseableIterator<FileStatus> scanFileIter,
StructType physicalSchema,
Optional<Predicate> predicate) throws IOException {
CloseableIterator<FileStatus> scanFileIter,
StructType physicalSchema,
Optional<Predicate> predicate) throws IOException {
return new CloseableIterator<ColumnarBatch>() {
private FileStatus currentFile;
private BufferedReader currentFileReader;
private String nextLine;

@Override
public void close()
throws IOException {
public void close() throws IOException {
Utils.closeCloseables(currentFileReader, scanFileIter);
}

Expand Down Expand Up @@ -151,8 +154,7 @@ public ColumnarBatch next() {
return new DefaultRowBasedColumnarBatch(physicalSchema, rows);
}

private void tryOpenNextFile()
throws IOException {
private void tryOpenNextFile() throws IOException {
Utils.closeCloseables(currentFileReader); // close the current opened file
currentFileReader = null;

Expand All @@ -174,6 +176,40 @@ private void tryOpenNextFile()
};
}

/**
* Makes use of {@link LogStore} implementations in `delta-storage` to atomically write the data
* to a file depending upon the destination filesystem.
*
* @param filePath Destination file path
* @param data Data to write as Json
* @throws IOException
*/
@Override
public void writeJsonFileAtomically(String filePath, CloseableIterator<Row> data)
throws IOException {
Path path = new Path(filePath);
LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme());
try {
logStore.write(
path,
new Iterator<String>() {
@Override
public boolean hasNext() {
return data.hasNext();
}

@Override
public String next() {
return JsonUtils.rowToJson(data.next());
}
},
false /* overwrite */,
hadoopConf);
} finally {
Utils.closeCloseables(data);
}
}

private Row parseJson(String json, StructType readSchema) {
try {
final JsonNode jsonNode = objectReaderReadBigDecimals.readTree(json);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.defaults.internal.json;

import java.io.IOException;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;

import io.delta.kernel.data.*;
import io.delta.kernel.types.*;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;

import io.delta.kernel.defaults.internal.data.DefaultJsonRow;

/**
* Utilities method to serialize and deserialize {@link Row} objects with a limited set of data type
* values.
* <p>
* Following are the supported data types:
* {@code boolean}, {@code byte}, {@code short}, {@code int}, {@code long}, {@code float},
* {@code double}, {@code string}, {@code StructType} (containing any of the supported subtypes),
* {@code ArrayType}, {@code MapType} (only a map with string keys is supported).
* <p>
* At a high-level, the JSON serialization is similar to that of Jackson's {@link ObjectMapper}.
*/
public class JsonUtils {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

static {
OBJECT_MAPPER.registerModule(
new SimpleModule().addSerializer(Row.class, new RowSerializer()));
}

private JsonUtils() {
}

/**
* Converts a {@link Row} to a single line JSON string. This is currently used just in tests.
* Wll be used as part of the refactoring planned in
* <a href="https://github.com/delta-io/delta/issues/2929">#2929</a>
*
* @param row the row to convert
* @return JSON string
*/
public static String rowToJson(Row row) {
try {
return OBJECT_MAPPER.writeValueAsString(row);
} catch (JsonProcessingException ex) {
throw new RuntimeException("Could not serialize row object to JSON", ex);
}
}

/**
* Converts a JSON string to a {@link Row}.
*
* @param json JSON string
* @param schema to read the JSON according the schema
* @return {@link Row} instance with given schema.
*/
public static Row rowFromJson(String json, StructType schema) {
try {
final JsonNode jsonNode = OBJECT_MAPPER.readTree(json);
return new DefaultJsonRow((ObjectNode) jsonNode, schema);
} catch (JsonProcessingException ex) {
throw new RuntimeException(String.format("Could not parse JSON: %s", json), ex);
}
}

public static class RowSerializer extends StdSerializer<Row> {
public RowSerializer() {
super(Row.class);
}

@Override
public void serialize(Row row, JsonGenerator gen, SerializerProvider provider)
throws IOException {
writeRow(gen, row, row.getSchema());
}

private void writeRow(JsonGenerator gen, Row row, StructType schema) throws IOException {
gen.writeStartObject();
for (int columnOrdinal = 0; columnOrdinal < schema.length(); columnOrdinal++) {
StructField field = schema.at(columnOrdinal);
if (!row.isNullAt(columnOrdinal)) {
gen.writeFieldName(field.getName());
writeValue(gen, row, columnOrdinal, field.getDataType());
}
}
gen.writeEndObject();
}

private void writeStruct(JsonGenerator gen, ColumnVector vector, StructType type, int rowId)
throws IOException {
gen.writeStartObject();
for (int columnOrdinal = 0; columnOrdinal < type.length(); columnOrdinal++) {
StructField field = type.at(columnOrdinal);
ColumnVector childVector = vector.getChild(columnOrdinal);
if (!childVector.isNullAt(rowId)) {
gen.writeFieldName(field.getName());
writeValue(gen, childVector, rowId, field.getDataType());
}
}
gen.writeEndObject();
}

private void writeArrayValue(JsonGenerator gen, ArrayValue arrayValue, ArrayType arrayType)
throws IOException {
gen.writeStartArray();
ColumnVector arrayElems = arrayValue.getElements();
for (int i = 0; i < arrayValue.getSize(); i++) {
if (arrayElems.isNullAt(i)) {
// Jackson serializes the null values in the array, but not in the map
gen.writeNull();
} else {
writeValue(gen, arrayValue.getElements(), i, arrayType.getElementType());
}
}
gen.writeEndArray();
}

private void writeMapValue(JsonGenerator gen, MapValue mapValue, MapType mapType)
throws IOException {
assertSupportedMapType(mapType);
gen.writeStartObject();
ColumnVector keys = mapValue.getKeys();
ColumnVector values = mapValue.getValues();
for (int i = 0; i < mapValue.getSize(); i++) {
if (!values.isNullAt(i)) {
gen.writeFieldName(keys.getString(i));
writeValue(gen, values, i, mapType.getValueType());
}
}
gen.writeEndObject();
}

private void writeValue(JsonGenerator gen, Row row, int columnOrdinal, DataType type)
throws IOException {
checkArgument(!row.isNullAt(columnOrdinal), "value should not be null");
if (type instanceof BooleanType) {
gen.writeBoolean(row.getBoolean(columnOrdinal));
} else if (type instanceof ByteType) {
gen.writeNumber(row.getByte(columnOrdinal));
} else if (type instanceof ShortType) {
gen.writeNumber(row.getShort(columnOrdinal));
} else if (type instanceof IntegerType) {
gen.writeNumber(row.getInt(columnOrdinal));
} else if (type instanceof LongType) {
gen.writeNumber(row.getLong(columnOrdinal));
} else if (type instanceof FloatType) {
gen.writeNumber(row.getFloat(columnOrdinal));
} else if (type instanceof DoubleType) {
gen.writeNumber(row.getDouble(columnOrdinal));
} else if (type instanceof StringType) {
gen.writeString(row.getString(columnOrdinal));
} else if (type instanceof StructType) {
writeRow(gen, row.getStruct(columnOrdinal), (StructType) type);
} else if (type instanceof ArrayType) {
writeArrayValue(gen, row.getArray(columnOrdinal), (ArrayType) type);
} else if (type instanceof MapType) {
writeMapValue(gen, row.getMap(columnOrdinal), (MapType) type);
} else {
// `binary` type is not supported according the Delta Protocol
throw new UnsupportedOperationException("unsupported data type: " + type);
}
}

private void writeValue(JsonGenerator gen, ColumnVector vector, int rowId, DataType type)
throws IOException {
checkArgument(!vector.isNullAt(rowId), "value should not be null");
if (type instanceof BooleanType) {
gen.writeBoolean(vector.getBoolean(rowId));
} else if (type instanceof ByteType) {
gen.writeNumber(vector.getByte(rowId));
} else if (type instanceof ShortType) {
gen.writeNumber(vector.getShort(rowId));
} else if (type instanceof IntegerType) {
gen.writeNumber(vector.getInt(rowId));
} else if (type instanceof LongType) {
gen.writeNumber(vector.getLong(rowId));
} else if (type instanceof FloatType) {
gen.writeNumber(vector.getFloat(rowId));
} else if (type instanceof DoubleType) {
gen.writeNumber(vector.getDouble(rowId));
} else if (type instanceof StringType) {
gen.writeString(vector.getString(rowId));
} else if (type instanceof StructType) {
writeStruct(gen, vector, (StructType) type, rowId);
} else if (type instanceof ArrayType) {
writeArrayValue(gen, vector.getArray(rowId), (ArrayType) type);
} else if (type instanceof MapType) {
writeMapValue(gen, vector.getMap(rowId), (MapType) type);
} else {
throw new UnsupportedOperationException("unsupported data type: " + type);
}
}
}

private static void assertSupportedMapType(MapType keyType) {
checkArgument(
keyType.getKeyType() instanceof StringType,
"Only STRING type keys are supported in MAP type in JSON serialization");
}
}
Loading

0 comments on commit a1ddb8b

Please sign in to comment.