Skip to content

Commit

Permalink
[Feature][Transform-V2] add FieldMapper Transform (#3781)
Browse files Browse the repository at this point in the history
* add FieldMapper Transform
  • Loading branch information
EricJoy2048 authored Jan 3, 2023
1 parent b2b97ad commit 1118c83
Show file tree
Hide file tree
Showing 10 changed files with 478 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# EnvConf
# JobEnvConfig

This document describes env configuration information,env unifies the environment variables of all engines.

Expand Down
64 changes: 64 additions & 0 deletions docs/en/transform-v2/field-mapper.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# FieldMapper

> FieldMapper transform plugin
## Description

Add input schema and output schema mapping.

## Options

| name | type | required | default value |
|--------------------|--------| -------- |---------------|
| field_mapper | Object | yes | |

### field_mapper [config]

Specify the field mapping relationship between input and output

### common options [config]

Transform plugin common parameters, please refer to [Transform Plugin](common-options.md) for details.

## Example

The data read from source is a table like this:

| id | name | age | card |
|-----|----------|-----|------|
| 1 | Joy Ding | 20 | 123 |
| 2 | May Ding | 20 | 123 |
| 3 | Kin Dom | 20 | 123 |
| 4 | Joy Dom | 20 | 123 |

We want to delete `age` field and update the filed order to `id`, `card`, `name` and rename `name` to `new_name`. We can add `FieldMapper` transform like this

```
transform {
FieldMapper {
source_table_name = "fake"
result_table_name = "fake1"
field_mapper = {
id = id
card = card
name = new_name
}
}
}
```

Then the data in result table `fake1` will like this

| id | card | new_name |
|-----|------|----------|
| 1 | 123 | Joy Ding |
| 2 | 123 | May Ding |
| 3 | 123 | Kin Dom |
| 4 | 123 | Joy Dom |


## Changelog

### new version

- Add Copy Transform Connector
2 changes: 1 addition & 1 deletion docs/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ const sidebars = {
}
]
},
"connector-v2/JobEnvConfig",
"connector-v2/Error-Quick-Reference-Manual",
"connector-v2/EnvConf"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.transform;

import org.apache.seatunnel.e2e.common.container.TestContainer;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;

import java.io.IOException;

public class TestFieldMapperIT extends TestSuiteBase {

@TestTemplate
public void testFieldMapper(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/field_mapper_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
row.num = 100
schema = {
fields {
id = "int"
name = "string"
age = "int"
string1 = "string"
int1 = "int"
c_bigint = "bigint"
}
}
}
}

transform {
FieldMapper {
source_table_name = "fake"
result_table_name = "fake1"
field_mapper = {
id = id
age = age_as
int1 = int1_as
name = name
}
}
}

sink {
Console {
source_table_name = "fake1"
}
Assert {
source_table_name = "fake1"
rules =
{
row_rules = [
{
rule_type = MIN_ROW
rule_value = 100
}
],
field_rules = [
{
field_name = id
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = age_as
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = int1_as
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = name
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.transform;

import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform;
import org.apache.seatunnel.transform.exception.FieldMapperTransformErrorCode;
import org.apache.seatunnel.transform.exception.FieldMapperTransformException;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.service.AutoService;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@AutoService(SeaTunnelTransform.class)
public class FieldMapperTransform extends AbstractSeaTunnelTransform {
public static final Option<Map<String, String>> FIELD_MAPPER = Options.key("field_mapper")
.mapType()
.noDefaultValue()
.withDescription("Specify the field mapping relationship between input and output");

private LinkedHashMap<String, String> fieldMapper = new LinkedHashMap<>();

private List<Integer> needReaderColIndex;

@Override
public String getPluginName() {
return "FieldMapper";
}

@Override
protected void setConfig(Config pluginConfig) {
if (!pluginConfig.hasPath(FIELD_MAPPER.key())) {
throw new FieldMapperTransformException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, "The configuration missing key: " + FIELD_MAPPER);
}
this.fieldMapper = convertConfigToSortedMap(pluginConfig.getConfig(FIELD_MAPPER.key()));
}

private static LinkedHashMap<String, String> convertConfigToSortedMap(Config config) {
// Because the entrySet in typesafe config couldn't keep key-value order
// So use jackson parsing schema information into a map to keep key-value order
ConfigRenderOptions options = ConfigRenderOptions.concise();
String json = config.root().render(options);
ObjectNode jsonNodes = JsonUtils.parseObject(json);
LinkedHashMap<String, String> fieldsMap = new LinkedHashMap<>();
jsonNodes.fields().forEachRemaining(field -> {
String key = field.getKey();
JsonNode value = field.getValue();

if (value.isTextual()) {
fieldsMap.put(key, value.textValue());
} else {
String errorMsg = String.format("The value [%s] of key [%s] that in config is not text", value, key);
throw new FieldMapperTransformException(CommonErrorCode.ILLEGAL_ARGUMENT, errorMsg);
}
});
return fieldsMap;
}

@Override
protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
needReaderColIndex = new ArrayList<>(fieldMapper.size());
List<String> outputFiledNameList = new ArrayList<>(fieldMapper.size());
List<SeaTunnelDataType<?>> outputDataTypeList = new ArrayList<>(fieldMapper.size());
ArrayList<String> inputFieldNames = Lists.newArrayList(inputRowType.getFieldNames());
fieldMapper.forEach((key, value) -> {
int fieldIndex = inputFieldNames.indexOf(key);
if (fieldIndex < 0) {
throw new FieldMapperTransformException(FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND,
"Can not found field " + key + " from inputRowType");
}
needReaderColIndex.add(fieldIndex);
outputFiledNameList.add(value);
outputDataTypeList.add(inputRowType.getFieldTypes()[fieldIndex]);
});

return new SeaTunnelRowType(outputFiledNameList.toArray(new String[0]),
outputDataTypeList.toArray(new SeaTunnelDataType[0]));
}

@Override
protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
Object[] outputDataArray = new Object[fieldMapper.size()];
for (int i = 0; i < outputDataArray.length; i++) {
outputDataArray[i] = inputRow.getField(needReaderColIndex.get(i));
}
return new SeaTunnelRow(outputDataArray);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.transform;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class FieldMapperTransformFactory implements TableTransformFactory {
@Override
public String factoryIdentifier() {
return "FieldMapper";
}

@Override
public OptionRule optionRule() {
return OptionRule.builder().required(FieldMapperTransform.FIELD_MAPPER).build();
}
}
Loading

0 comments on commit 1118c83

Please sign in to comment.