diff --git a/docs/en/connector-v2/source/OpenMldb.md b/docs/en/connector-v2/source/OpenMldb.md new file mode 100644 index 00000000000..28934af8b40 --- /dev/null +++ b/docs/en/connector-v2/source/OpenMldb.md @@ -0,0 +1,85 @@ +# OpenMldb + +> OpenMldb source connector + +## Description + +Used to read data from OpenMldb. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [x] [stream](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) +- [ ] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|-----------------|---------|----------|---------------| +| cluster_mode | boolean | yes | - | +| sql | string | yes | - | +| database | string | yes | - | +| host | string | no | - | +| port | int | no | - | +| zk_path | string | no | - | +| zk_host | string | no | - | +| session_timeout | int | no | 10000 | +| request_timeout | int | no | 60000 | +| common-options | | no | - | + +### cluster_mode [string] + +OpenMldb is or not cluster mode + +### sql [string] + +Sql statement + +### database [string] + +Database name + +### host [string] + +OpenMldb host, only supported on OpenMldb single mode + +### port [int] + +OpenMldb port, only supported on OpenMldb single mode + +### zk_host [string] + +Zookeeper host, only supported on OpenMldb cluster mode + +### zk_path [string] + +Zookeeper path, only supported on OpenMldb cluster mode + +### session_timeout [int] + +OpenMldb session timeout(ms), default 60000 + +### request_timeout [int] + +OpenMldb request timeout(ms), default 10000 + +### common options + +Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details + +## Example + +```hocon + + OpenMldb { + host = "172.17.0.2" + port = 6527 + sql = "select * from demo_table1" + database = "demo_db" + cluster_mode = false + } + +``` \ No newline at end of file diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 2962c75b51d..e73f8c283d2 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -155,4 +155,5 @@ seatunnel.source.OneSignal = connector-http-onesignal seatunnel.source.Jira = connector-http-jira seatunnel.source.Gitlab = connector-http-gitlab seatunnel.sink.RabbitMQ = connector-rabbitmq -seatunnel.source.RabbitMQ = connector-rabbitmq \ No newline at end of file +seatunnel.source.RabbitMQ = connector-rabbitmq +seatunnel.source.OpenMldb = connector-openmldb \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-openmldb/pom.xml b/seatunnel-connectors-v2/connector-openmldb/pom.xml new file mode 100644 index 00000000000..e5158df0067 --- /dev/null +++ b/seatunnel-connectors-v2/connector-openmldb/pom.xml @@ -0,0 +1,54 @@ + + + + + seatunnel-connectors-v2 + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-openmldb + + + 0.6.3 + + + + + org.apache.seatunnel + connector-common + ${project.version} + + + com.4paradigm.openmldb + openmldb-jdbc + ${openmldb.version} + + + com.4paradigm.openmldb + openmldb-native + ${openmldb.version} + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbConfig.java b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbConfig.java new file mode 100644 index 00000000000..cd7df71fef0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbConfig.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.openmldb.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class OpenMldbConfig { + private static final int DEFAULT_SESSION_TIMEOUT = 10000; + private static final int DEFAULT_REQUEST_TIMEOUT = 60000; + public static final Option ZK_HOST = Options.key("zk_host") + .stringType() + .noDefaultValue() + .withDescription("Zookeeper server host"); + public static final Option ZK_PATH = Options.key("zk_path") + .stringType() + .noDefaultValue() + .withDescription("Zookeeper server path of OpenMldb cluster"); + public static final Option HOST = Options.key("host") + .stringType() + .noDefaultValue() + .withDescription("OpenMldb host"); + public static final Option PORT = Options.key("port") + .intType() + .noDefaultValue() + .withDescription("OpenMldb port"); + public static final Option SESSION_TIMEOUT = Options.key("session_timeout") + .intType() + .defaultValue(DEFAULT_SESSION_TIMEOUT) + .withDescription("OpenMldb session timeout"); + public static final Option REQUEST_TIMEOUT = Options.key("request_timeout") + .intType() + .defaultValue(DEFAULT_REQUEST_TIMEOUT) + .withDescription("OpenMldb request timeout"); + public static final Option CLUSTER_MODE = Options.key("cluster_mode") + .booleanType() + .noDefaultValue() + .withDescription("Whether cluster mode is enabled"); + public static final Option SQL = Options.key("sql") + .stringType() + .noDefaultValue() + .withDescription("Sql statement"); + public static final Option DATABASE = Options.key("database") + .stringType() + .noDefaultValue() + .withDescription("The database you want to access"); +} diff --git a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbParameters.java b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbParameters.java new file mode 100644 index 00000000000..92b875577de --- /dev/null +++ b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbParameters.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.openmldb.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.Getter; + +import java.io.Serializable; + +@Getter +public class OpenMldbParameters implements Serializable { + private String zkHost; + private String zkPath; + private String host; + private int port; + private int sessionTimeout = OpenMldbConfig.SESSION_TIMEOUT.defaultValue(); + private int requestTimeout = OpenMldbConfig.REQUEST_TIMEOUT.defaultValue(); + private Boolean clusterMode; + private String database; + private String sql; + + private OpenMldbParameters() { + // do nothing + } + + public static OpenMldbParameters buildWithConfig(Config pluginConfig) { + OpenMldbParameters openMldbParameters = new OpenMldbParameters(); + openMldbParameters.clusterMode = pluginConfig.getBoolean(OpenMldbConfig.CLUSTER_MODE.key()); + openMldbParameters.database = pluginConfig.getString(OpenMldbConfig.DATABASE.key()); + openMldbParameters.sql = pluginConfig.getString(OpenMldbConfig.SQL.key()); + // set zkHost + if (pluginConfig.hasPath(OpenMldbConfig.ZK_HOST.key())) { + openMldbParameters.zkHost = pluginConfig.getString(OpenMldbConfig.ZK_HOST.key()); + } + // set zkPath + if (pluginConfig.hasPath(OpenMldbConfig.ZK_PATH.key())) { + openMldbParameters.zkPath = pluginConfig.getString(OpenMldbConfig.ZK_PATH.key()); + } + // set host + if (pluginConfig.hasPath(OpenMldbConfig.HOST.key())) { + openMldbParameters.host = pluginConfig.getString(OpenMldbConfig.HOST.key()); + } + // set port + if (pluginConfig.hasPath(OpenMldbConfig.PORT.key())) { + openMldbParameters.port = pluginConfig.getInt(OpenMldbConfig.PORT.key()); + } + // set session timeout + if (pluginConfig.hasPath(OpenMldbConfig.SESSION_TIMEOUT.key())) { + openMldbParameters.sessionTimeout = pluginConfig.getInt(OpenMldbConfig.SESSION_TIMEOUT.key()); + } + // set request timeout + if (pluginConfig.hasPath(OpenMldbConfig.REQUEST_TIMEOUT.key())) { + openMldbParameters.requestTimeout = pluginConfig.getInt(OpenMldbConfig.REQUEST_TIMEOUT.key()); + } + return openMldbParameters; + } +} diff --git a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbSqlExecutor.java b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbSqlExecutor.java new file mode 100644 index 00000000000..97d05e36ec1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/config/OpenMldbSqlExecutor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.openmldb.config; + +import com._4paradigm.openmldb.sdk.SdkOption; +import com._4paradigm.openmldb.sdk.SqlException; +import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor; + +public class OpenMldbSqlExecutor { + private static final SdkOption SDK_OPTION = new SdkOption(); + private static volatile SqlClusterExecutor SQL_EXECUTOR; + + private OpenMldbSqlExecutor() { + + } + + public static void initSdkOption(OpenMldbParameters openMldbParameters) { + if (openMldbParameters.getClusterMode()) { + SDK_OPTION.setZkCluster(openMldbParameters.getZkHost()); + SDK_OPTION.setZkPath(openMldbParameters.getZkPath()); + } else { + SDK_OPTION.setHost(openMldbParameters.getHost()); + SDK_OPTION.setPort(openMldbParameters.getPort()); + SDK_OPTION.setClusterMode(false); + } + SDK_OPTION.setSessionTimeout(openMldbParameters.getSessionTimeout()); + SDK_OPTION.setRequestTimeout(openMldbParameters.getRequestTimeout()); + } + + public static SqlClusterExecutor getSqlExecutor() throws SqlException { + if (SQL_EXECUTOR == null) { + synchronized (OpenMldbSqlExecutor.class) { + if (SQL_EXECUTOR == null) { + SQL_EXECUTOR = new SqlClusterExecutor(SDK_OPTION); + } + } + } + return SQL_EXECUTOR; + } + + public static void close() { + if (SQL_EXECUTOR != null) { + synchronized (OpenMldbParameters.class) { + if (SQL_EXECUTOR != null) { + SQL_EXECUTOR.close(); + SQL_EXECUTOR = null; + } + } + } + } +} diff --git a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/exception/OpenMldbConnectorException.java b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/exception/OpenMldbConnectorException.java new file mode 100644 index 00000000000..20b950eaaec --- /dev/null +++ b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/exception/OpenMldbConnectorException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.openmldb.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class OpenMldbConnectorException extends SeaTunnelRuntimeException { + public OpenMldbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public OpenMldbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public OpenMldbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java new file mode 100644 index 00000000000..8364d18c45b --- /dev/null +++ b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSource.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.openmldb.source; + +import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.JobMode; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbConfig; +import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbParameters; +import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbSqlExecutor; +import org.apache.seatunnel.connectors.seatunnel.openmldb.exception.OpenMldbConnectorException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com._4paradigm.openmldb.sdk.Column; +import com._4paradigm.openmldb.sdk.Schema; +import com._4paradigm.openmldb.sdk.SqlException; +import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor; +import com.google.auto.service.AutoService; + +import java.sql.SQLException; +import java.sql.Types; +import java.util.List; + +@AutoService(SeaTunnelSource.class) +public class OpenMldbSource extends AbstractSingleSplitSource { + private OpenMldbParameters openMldbParameters; + private JobContext jobContext; + private SeaTunnelRowType seaTunnelRowType; + + @Override + public String getPluginName() { + return "OpenMldb"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, + OpenMldbConfig.CLUSTER_MODE.key(), + OpenMldbConfig.SQL.key(), + OpenMldbConfig.DATABASE.key()); + if (!result.isSuccess()) { + throw new OpenMldbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SOURCE, result.getMsg())); + } + if (pluginConfig.getBoolean(OpenMldbConfig.CLUSTER_MODE.key())) { + // cluster mode + result = CheckConfigUtil.checkAllExists(pluginConfig, + OpenMldbConfig.ZK_HOST.key(), OpenMldbConfig.ZK_PATH.key()); + } else { + // single mode + result = CheckConfigUtil.checkAllExists(pluginConfig, + OpenMldbConfig.HOST.key(), OpenMldbConfig.PORT.key()); + } + if (!result.isSuccess()) { + throw new OpenMldbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SOURCE, result.getMsg())); + } + this.openMldbParameters = OpenMldbParameters.buildWithConfig(pluginConfig); + OpenMldbSqlExecutor.initSdkOption(openMldbParameters); + try { + SqlClusterExecutor sqlExecutor = OpenMldbSqlExecutor.getSqlExecutor(); + Schema inputSchema = sqlExecutor.getInputSchema(openMldbParameters.getDatabase(), openMldbParameters.getSql()); + List columnList = inputSchema.getColumnList(); + this.seaTunnelRowType = convert(columnList); + } catch (SQLException | SqlException e) { + throw new OpenMldbConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, + "Failed to initialize data schema"); + } + } + + @Override + public Boundedness getBoundedness() { + return JobMode.BATCH.equals(jobContext.getJobMode()) ? Boundedness.BOUNDED : Boundedness.UNBOUNDED; + } + + @Override + public SeaTunnelDataType getProducedType() { + return seaTunnelRowType; + } + + @Override + public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) throws Exception { + return new OpenMldbSourceReader(openMldbParameters, seaTunnelRowType, readerContext); + } + + @Override + public void setJobContext(JobContext jobContext) { + this.jobContext = jobContext; + } + + private SeaTunnelDataType convertSeaTunnelDataType(int type) { + switch (type) { + case Types.BOOLEAN: + return BasicType.BOOLEAN_TYPE; + case Types.INTEGER: + return BasicType.INT_TYPE; + case Types.SMALLINT: + return BasicType.SHORT_TYPE; + case Types.BIGINT: + return BasicType.LONG_TYPE; + case Types.FLOAT: + return BasicType.FLOAT_TYPE; + case Types.DOUBLE: + return BasicType.DOUBLE_TYPE; + case Types.VARCHAR: + return BasicType.STRING_TYPE; + case Types.DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case Types.TIMESTAMP: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + default: + throw new OpenMldbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "SeaTunnel does not support this data type"); + } + } + + private SeaTunnelRowType convert(List columnList) { + String[] fieldsName = new String[columnList.size()]; + SeaTunnelDataType[] fieldsType = new SeaTunnelDataType[columnList.size()]; + for (int i = 0; i < columnList.size(); i++) { + Column column = columnList.get(i); + fieldsName[i] = column.getColumnName(); + fieldsType[i] = convertSeaTunnelDataType(column.getSqlType()); + } + return new SeaTunnelRowType(fieldsName, fieldsType); + } +} diff --git a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceFactory.java b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceFactory.java new file mode 100644 index 00000000000..749024ef11f --- /dev/null +++ b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.openmldb.source; + +import org.apache.seatunnel.api.configuration.util.Condition; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbConfig; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class OpenMldbSourceFactory implements TableSourceFactory { + @Override + public String factoryIdentifier() { + return "OpenMldb"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(OpenMldbConfig.CLUSTER_MODE) + .required(OpenMldbConfig.SQL) + .required(OpenMldbConfig.DATABASE) + .optional(OpenMldbConfig.SESSION_TIMEOUT) + .optional(OpenMldbConfig.REQUEST_TIMEOUT) + .conditional(Condition.of(OpenMldbConfig.CLUSTER_MODE, false), + OpenMldbConfig.HOST, OpenMldbConfig.PORT) + .conditional(Condition.of(OpenMldbConfig.CLUSTER_MODE, true), + OpenMldbConfig.ZK_HOST, OpenMldbConfig.ZK_PATH) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceReader.java b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceReader.java new file mode 100644 index 00000000000..0884c23ced6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-openmldb/src/main/java/org/apache/seatunnel/connectors/seatunnel/openmldb/source/OpenMldbSourceReader.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.openmldb.source; + +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbParameters; +import org.apache.seatunnel.connectors.seatunnel.openmldb.config.OpenMldbSqlExecutor; +import org.apache.seatunnel.connectors.seatunnel.openmldb.exception.OpenMldbConnectorException; + +import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +@Slf4j +public class OpenMldbSourceReader extends AbstractSingleSplitReader { + private final OpenMldbParameters openMldbParameters; + private final SeaTunnelRowType seaTunnelRowType; + private final SingleSplitReaderContext readerContext; + + public OpenMldbSourceReader(OpenMldbParameters openMldbParameters, + SeaTunnelRowType seaTunnelRowType, + SingleSplitReaderContext readerContext) { + this.openMldbParameters = openMldbParameters; + this.seaTunnelRowType = seaTunnelRowType; + this.readerContext = readerContext; + } + + @Override + public void open() throws Exception { + OpenMldbSqlExecutor.initSdkOption(openMldbParameters); + } + + @Override + public void close() throws IOException { + OpenMldbSqlExecutor.close(); + } + + @Override + public void pollNext(Collector output) throws Exception { + int totalFields = seaTunnelRowType.getTotalFields(); + Object[] objects = new Object[totalFields]; + SqlClusterExecutor sqlExecutor = OpenMldbSqlExecutor.getSqlExecutor(); + try (ResultSet resultSet = sqlExecutor.executeSQL(openMldbParameters.getDatabase(), + openMldbParameters.getSql())) { + while (resultSet.next()) { + for (int i = 0; i < totalFields; i++) { + objects[i] = getObject(resultSet, i, seaTunnelRowType.getFieldType(i)); + } + output.collect(new SeaTunnelRow(objects)); + } + } finally { + if (Boundedness.BOUNDED.equals(readerContext.getBoundedness())) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded openmldb source"); + readerContext.signalNoMoreElement(); + } + } + } + + private Object getObject(ResultSet resultSet, int index, SeaTunnelDataType dataType) throws SQLException { + index = index + 1; + switch (dataType.getSqlType()) { + case BOOLEAN: + return resultSet.getBoolean(index); + case INT: + return resultSet.getInt(index); + case SMALLINT: + return resultSet.getShort(index); + case BIGINT: + return resultSet.getLong(index); + case FLOAT: + return resultSet.getFloat(index); + case DOUBLE: + return resultSet.getDouble(index); + case STRING: + return resultSet.getString(index); + case DATE: + Date date = resultSet.getDate(index); + return date.toLocalDate(); + case TIMESTAMP: + Timestamp timestamp = resultSet.getTimestamp(index); + return timestamp.toLocalDateTime(); + default: + throw new OpenMldbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unsupported this data type"); + } + } +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index 6eab714d8e7..7a7a7d3af32 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -64,6 +64,7 @@ connector-google-sheets connector-slack connector-rabbitmq + connector-openmldb diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index b37a99356f0..bbbd55f299e 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -375,6 +375,11 @@ ${project.version} provided + + org.apache.seatunnel + connector-openmldb + ${project.version} +