Skip to content

Commit

Permalink
[HUDI-5331] Add schema settings with stream api (#7384)
Browse files Browse the repository at this point in the history
* Add schema set with stream api.

Co-authored-by: superche <superche@tencent.com>
  • Loading branch information
hechao-ustc and superche authored Dec 6, 2022
1 parent ea48a85 commit 74f8d94
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.util;

import org.apache.flink.table.api.Schema;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.hudi.adapter.Utils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTableFactory;
Expand Down Expand Up @@ -125,6 +127,19 @@ public Builder partition(String... partitions) {
return this;
}

/**
* Add table schema.
*/
public Builder schema(Schema schema) {
for (Schema.UnresolvedColumn column : schema.getColumns()) {
column(column.toString());
}
if (schema.getPrimaryKey().isPresent()) {
pk(schema.getPrimaryKey().get().getColumnNames().stream().map(EncodingUtils::escapeIdentifier).collect(Collectors.joining(", ")));
}
return this;
}

/**
* Add a config option.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.sink.transform.ChainedTransformer;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.table.catalog.HoodieCatalog;
import org.apache.hudi.table.catalog.TableOptionProperties;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.HoodiePipeline;
import org.apache.hudi.util.StreamerUtil;
Expand All @@ -49,6 +52,10 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
Expand All @@ -70,6 +77,9 @@
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;

/**
* Integration test for Flink Hoodie stream sink.
*/
Expand Down Expand Up @@ -427,4 +437,121 @@ public void testHoodiePipelineBuilderSink() throws Exception {
execute(execEnv, false, "Api_Sink_Test");
TestData.checkWrittenDataCOW(tempFile, EXPECTED);
}

@Test
public void testHoodiePipelineBuilderSourceWithSchemaSet() throws Exception {
//create a StreamExecutionEnvironment instance.
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.getConfig().disableObjectReuse();
execEnv.setParallelism(1);
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// create table dir
final String dbName = DEFAULT_DATABASE.defaultValue();
final String tableName = "t1";
File testTable = new File(tempFile, dbName + Path.SEPARATOR + tableName);
testTable.mkdir();

Configuration conf = TestConfigurations.getDefaultConf(testTable.toURI().toString());
conf.setString(FlinkOptions.TABLE_NAME, tableName);
conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");

// write 3 batches of data set
TestData.writeData(TestData.dataSetInsert(1, 2), conf);
TestData.writeData(TestData.dataSetInsert(3, 4), conf);
TestData.writeData(TestData.dataSetInsert(5, 6), conf);

String latestCommit = TestUtils.getLastCompleteInstant(testTable.toURI().toString());

Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), testTable.toURI().toString());
options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit);

// create hoodie catalog, in order to get the table schema
Configuration catalogConf = new Configuration();
catalogConf.setString(CATALOG_PATH.key(), tempFile.toURI().toString());
catalogConf.setString(DEFAULT_DATABASE.key(), DEFAULT_DATABASE.defaultValue());
HoodieCatalog catalog = new HoodieCatalog("hudi", catalogConf);
catalog.open();
// get hoodieTable
ObjectPath tablePath = new ObjectPath(dbName, tableName);
TableOptionProperties.createProperties(testTable.toURI().toString(), HadoopConfigurations.getHadoopConf(catalogConf), options);
CatalogBaseTable hoodieTable = catalog.getTable(tablePath);

//read a hoodie table use low-level source api.
HoodiePipeline.Builder builder = HoodiePipeline.builder("test_source")
.schema(hoodieTable.getUnresolvedSchema())
.pk("uuid")
.partition("partition")
.options(options);
DataStream<RowData> rowDataDataStream = builder.source(execEnv);
List<RowData> result = new ArrayList<>();
rowDataDataStream.executeAndCollect().forEachRemaining(result::add);
TimeUnit.SECONDS.sleep(2);//sleep 2 second for collect data
TestData.assertRowDataEquals(result, TestData.dataSetInsert(5, 6));
}

@Test
public void testHoodiePipelineBuilderSinkWithSchemaSet() throws Exception {
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
Map<String, String> options = new HashMap<>();
execEnv.getConfig().disableObjectReuse();
execEnv.setParallelism(4);
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

options.put(FlinkOptions.PATH.key(), tempFile.toURI().toString());
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
Configuration conf = Configuration.fromMap(options);
// Read from file source
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();

JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
false,
true,
TimestampFormat.ISO_8601
);
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();

TextInputFormat format = new TextInputFormat(new Path(sourcePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
format.setCharsetName("UTF-8");

DataStream dataStream = execEnv
// use continuous file source to trigger checkpoint
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4);

Schema schema =
Schema.newBuilder()
.column("uuid", DataTypes.STRING().notNull())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("ts", DataTypes.TIMESTAMP(3))
.column("partition", DataTypes.STRING())
.primaryKey("uuid")
.build();

//sink to hoodie table use low-level sink api.
HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink")
.schema(schema)
.partition("partition")
.options(options);

builder.sink(dataStream, false);

execute(execEnv, false, "Api_Sink_Test");
TestData.checkWrittenDataCOW(tempFile, EXPECTED);
}
}

0 comments on commit 74f8d94

Please sign in to comment.