Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
cgivre committed Jun 7, 2022
1 parent f9b5036 commit f5faeb3
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,99 +23,27 @@
import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.annotations.Workspace;
import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder;
import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
import org.apache.drill.exec.expr.holders.VarBinaryHolder;
import org.apache.drill.exec.expr.holders.VarCharHolder;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;

public class JsonConvertFrom {

private JsonConvertFrom() {
}

@FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true)
public static class ConvertFromJson implements DrillSimpleFunc {

@Param VarBinaryHolder in;
@Inject
ResultSetLoader loader;
@Workspace
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;

@Inject
OptionManager options;

@Output ComplexWriter writer;

@Override
public void setup() {
jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
.resultSetLoader(loader)
.standardOptions(options);
}

@Override
public void eval() {
try {
jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer);
org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
loader.startBatch();
jsonLoader.readBatch();
loader.close();

} catch (Exception e) {
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
}
}
}

@FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true)
public static class ConvertFromJsonVarchar implements DrillSimpleFunc {

@Param VarCharHolder in;
@Workspace
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;

@Inject
OptionManager options;

@Inject
ResultSetLoader loader;

@Output ComplexWriter writer;

@Override
public void setup() {
jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
.resultSetLoader(loader)
.standardOptions(options);
}

@Override
public void eval() {
try {
jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer);
org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
loader.startBatch();
jsonLoader.readBatch();
loader.close();

} catch (Exception e) {
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
}
}
}
private JsonConvertFrom() {}

@FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true)
@FunctionTemplate(names = {"convert_fromJSON", "convertFromJson", "convert_from_json"},
scope = FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL)
public static class ConvertFromJsonNullableInput implements DrillSimpleFunc {

@Param NullableVarBinaryHolder in;
@Param
NullableVarBinaryHolder in;

@Workspace
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
Expand All @@ -126,7 +54,8 @@ public static class ConvertFromJsonNullableInput implements DrillSimpleFunc {
@Inject
ResultSetLoader loader;

@Output ComplexWriter writer;
@Output
BaseWriter.ComplexWriter writer;

@Override
public void setup() {
Expand All @@ -137,7 +66,7 @@ public void setup() {

@Override
public void eval() {
if (in.isSet == 0) {
if (in.end == 0) {
// Return empty map
org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
mapWriter.start();
Expand All @@ -157,10 +86,15 @@ public void eval() {
}
}

@FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true)
public static class ConvertFromJsonVarcharNullableInput implements DrillSimpleFunc {
@FunctionTemplate(names = {"convert_fromJSON", "convertFromJson", "convert_from_json"},
scope = FunctionScope.SIMPLE)
public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc {

@Param NullableVarCharHolder in;
@Param
VarCharHolder in;

@Output
ComplexWriter writer;

@Workspace
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
Expand All @@ -171,27 +105,27 @@ public static class ConvertFromJsonVarcharNullableInput implements DrillSimpleFu
@Inject
ResultSetLoader loader;

@Output ComplexWriter writer;

@Override
public void setup() {
jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
.resultSetLoader(loader)
.standardOptions(options);
}

@Override
public void eval() {
if (in.isSet == 0) {
// Return empty map
String jsonString = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(in.start, in.end, in.buffer);

// If the input is null or empty, return an empty map
if (jsonString.length() == 0) {
org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
mapWriter.start();
mapWriter.end();
return;
}

try {
jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer);
jsonLoaderBuilder.fromString(jsonString);
org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
loader.startBatch();
jsonLoader.readBatch();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.drill.exec.store.json;


import ch.qos.logback.classic.Level;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
import org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.LogFixture;
import org.junit.BeforeClass;
import org.junit.Test;

import static org.junit.Assert.assertEquals;


public class TestJsonConversionUDF extends ClusterTest {

protected static LogFixture logFixture;
private final static Level CURRENT_LOG_LEVEL = Level.DEBUG;
@BeforeClass
public static void setup() throws Exception {
logFixture = LogFixture.builder()
.toConsole()
.logger(ProjectRecordBatch.class, CURRENT_LOG_LEVEL)
.logger(JsonLoaderImpl.class, CURRENT_LOG_LEVEL)
.logger(IteratorValidatorBatchIterator.class, CURRENT_LOG_LEVEL)
.build();

startCluster(ClusterFixture.builder(dirTestWatcher));
}

@Test
public void testConvertFromJsonFunctionWithBinaryInput() throws Exception {
client.alterSession(ExecConstants.JSON_READER_NAN_INF_NUMBERS, true);
String sql = "SELECT string_binary(convert_toJSON(convert_fromJSON(columns[1]))) as col FROM cp.`jsoninput/nan_test.csv`";
RowSet results = client.queryBuilder().sql(sql).rowSet();
assertEquals("Query result must contain 1 row", 1, results.rowCount());

results.print();
}

@Test
public void testConvertFromJSONWithStringInput() throws Exception {
// String sql = "SELECT *, convert_FromJSON('{\"foo\":\"bar\"}') FROM cp.`jsoninput/allTypes.csv`";
String sql = "SELECT convert_FromJSON('{\"foo\":\"bar\"}') FROM (VALUES(1))";
RowSet results = client.queryBuilder().sql(sql).rowSet();
results.print();
}

/*
private void doTestConvertToJsonFunction() throws Exception {
String table = "nan_test.csv";
File file = new File(dirTestWatcher.getRootDir(), table);
String csv = "col_0, {\"nan_col\":NaN}";
String query = String.format("select string_binary(convert_toJSON(convert_fromJSON(columns[1]))) as col " +
"from dfs.`%s` where columns[0]='col_0'", table);
try {
FileUtils.writeStringToFile(file, csv, Charset.defaultCharset());
List<QueryDataBatch> results = testSqlWithResults(query);
RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
assertEquals("Query result must contain 1 row", 1, results.size());
QueryDataBatch batch = results.get(0);
batchLoader.load(batch.getHeader().getDef(), batch.getData());
VectorWrapper<?> vw = batchLoader.getValueAccessorById(VarCharVector.class, batchLoader.getValueVectorId(SchemaPath.getCompoundPath("col")).getFieldIds());
// ensuring that `NaN` token ARE NOT enclosed with double quotes
String resultJson = vw.getValueVector().getAccessor().getObject(0).toString();
int nanIndex = resultJson.indexOf("NaN");
assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex - 1));
assertNotEquals("`NaN` must not be enclosed with \"\" ", '"', resultJson.charAt(nanIndex + "NaN".length()));
batch.release();
batchLoader.clear();
} finally {
FileUtils.deleteQuietly(file);
}
}
@Test
public void testConvertFromJsonFunction() throws Exception {
//runBoth(this::doTestConvertFromJsonFunction);
}
private void doTestConvertFromJsonFunction() throws Exception {
String table = "nan_test.csv";
File file = new File(dirTestWatcher.getRootDir(), table);
String csv = "col_0, {\"nan_col\":NaN}";
try {
FileUtils.writeStringToFile(file, csv);
testBuilder()
.sqlQuery(String.format("select convert_fromJSON(columns[1]) as col from dfs.`%s`", table))
.unOrdered()
.baselineColumns("col")
.baselineValues(mapOf("nan_col", Double.NaN))
.go();
} finally {
FileUtils.deleteQuietly(file);
}
}
*/

}
Loading

0 comments on commit f5faeb3

Please sign in to comment.