diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java index af7cb37669d1..a37f5c29a8fe 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.it; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; @@ -39,6 +40,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq; import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp; @@ -71,9 +73,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.generatePatternTreeBuffer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; @RunWith(IoTDBTestRunner.class) @Category({ClusterIT.class}) @@ -183,7 +187,7 @@ public void testPartitionInfoSnapshot() throws Exception { } assertTriggerInformation(createTriggerReqs, client.getTriggerTable()); - assertUDFInformation(createFunctionReqs, client.getUDFTable()); + assertUDFInformation(createFunctionReqs, client.getUDFTable(new TGetUdfTableReq(Model.TREE))); TShowCQResp showCQResp = client.showCQ(); assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), showCQResp.getStatus().getCode()); @@ -283,12 +287,14 @@ private List createUDF(SyncConfigNodeIServiceClient client) TCreateFunctionReq createFunctionReq1 = new TCreateFunctionReq("test1", "org.apache.iotdb.udf.UDTFExample", true) + .setModel(Model.TREE) .setJarName(jarName) .setJarFile(jarFile) .setJarMD5(jarMD5); TCreateFunctionReq createFunctionReq2 = new TCreateFunctionReq("test2", "org.apache.iotdb.udf.UDTFExample", true) + .setModel(Model.TREE) .setJarName(jarName) .setJarFile(jarFile) .setJarMD5(jarMD5); @@ -307,11 +313,13 @@ private List createUDF(SyncConfigNodeIServiceClient client) } private void assertUDFInformation(List req, TGetUDFTableResp resp) { + Map nameToReqMap = + req.stream().collect(Collectors.toMap(r -> r.getUdfName().toUpperCase(), r -> r)); for (int i = 0; i < req.size(); i++) { - TCreateFunctionReq createFunctionReq = req.get(i); UDFInformation udfInformation = UDFInformation.deserialize(resp.getAllUDFInformation().get(i)); - + assertTrue(nameToReqMap.containsKey(udfInformation.getFunctionName())); + TCreateFunctionReq createFunctionReq = nameToReqMap.get(udfInformation.getFunctionName()); assertEquals(createFunctionReq.getUdfName().toUpperCase(), udfInformation.getFunctionName()); assertEquals(createFunctionReq.getClassName(), udfInformation.getClassName()); assertEquals(createFunctionReq.getJarName(), udfInformation.getJarName()); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java index 53b92918877d..f6e3861218d6 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java @@ -1115,8 +1115,8 @@ public void showFunctions(CloseableHttpClient httpClient) { Map map = queryMetaData(httpClient, sql); List columnNamesResult = (List) map.get("columnNames"); List> valuesResult = (List>) map.get("values"); - assertEquals(3, columnNamesResult.size()); - assertEquals(3, valuesResult.size()); + assertEquals(4, columnNamesResult.size()); + assertEquals(4, valuesResult.size()); } public void showTimeseries(CloseableHttpClient httpClient) { @@ -1767,8 +1767,8 @@ public void showFunctionsV2(CloseableHttpClient httpClient) { Map map = queryMetaDataV2(httpClient, sql); List columnNamesResult = (List) map.get("column_names"); List> valuesResult = (List>) map.get("values"); - assertEquals(3, columnNamesResult.size()); - assertEquals(3, valuesResult.size()); + assertEquals(4, columnNamesResult.size()); + assertEquals(4, valuesResult.size()); } public void showTimeseriesV2(CloseableHttpClient httpClient) { diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSyntaxConventionStringLiteralIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSyntaxConventionStringLiteralIT.java index 95204c783ec2..7b682ffac82b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSyntaxConventionStringLiteralIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBSyntaxConventionStringLiteralIT.java @@ -322,7 +322,7 @@ public void testUDFClassName() { // executed correctly try (ResultSet resultSet = statement.executeQuery("show functions")) { - assertEquals(3, resultSet.getMetaData().getColumnCount()); + assertEquals(4, resultSet.getMetaData().getColumnCount()); int count = 0; while (resultSet.next()) { StringBuilder stringBuilder = new StringBuilder(); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/udaf/IoTDBUDAFManagementIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/udaf/IoTDBUDAFManagementIT.java index 75dc0c1f5f77..12d8c1ac3480 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/udaf/IoTDBUDAFManagementIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/udaf/IoTDBUDAFManagementIT.java @@ -81,7 +81,7 @@ public void createReflectShowDropUDAFTest() { statement.executeQuery("SELECT udaf(*) FROM root.vehicle"); try (ResultSet resultSet = statement.executeQuery("SHOW FUNCTIONS")) { - assertEquals(3, resultSet.getMetaData().getColumnCount()); + assertEquals(4, resultSet.getMetaData().getColumnCount()); int count = 0; while (resultSet.next()) { ++count; @@ -108,7 +108,7 @@ public void createAndDropUDAFSeveralTimesTest() { ++count; } Assert.assertEquals(1 + FUNCTIONS_COUNT, count); - assertEquals(3, resultSet.getMetaData().getColumnCount()); + assertEquals(4, resultSet.getMetaData().getColumnCount()); statement.execute("DROP FUNCTION udaf"); statement.execute( @@ -122,7 +122,7 @@ public void createAndDropUDAFSeveralTimesTest() { ++count; } Assert.assertEquals(1 + FUNCTIONS_COUNT, count); - assertEquals(3, resultSet.getMetaData().getColumnCount()); + assertEquals(4, resultSet.getMetaData().getColumnCount()); statement.execute("DROP FUNCTION udaf"); } } catch (SQLException throwable) { @@ -222,7 +222,7 @@ public void createFunctionWithURITest() throws SQLException { ++count; } Assert.assertEquals(2 + FUNCTIONS_COUNT, count); - assertEquals(3, resultSet.getMetaData().getColumnCount()); + assertEquals(4, resultSet.getMetaData().getColumnCount()); statement.execute("DROP FUNCTION udaf1"); statement.execute("DROP FUNCTION udaf2"); } catch (Exception e) { @@ -309,7 +309,7 @@ public void testShowBuiltinFunction() { "CREATE FUNCTION udaf AS 'org.apache.iotdb.db.query.udf.example.UDAFCount'"); try (ResultSet resultSet = statement.executeQuery("SHOW FUNCTIONS")) { - assertEquals(3, resultSet.getMetaData().getColumnCount()); + assertEquals(4, resultSet.getMetaData().getColumnCount()); int count = 0; while (resultSet.next()) { StringBuilder stringBuilder = new StringBuilder(); @@ -320,7 +320,7 @@ public void testShowBuiltinFunction() { if (result.contains(FUNCTION_TYPE_EXTERNAL_UDAF)) { Assert.assertEquals( String.format( - "UDAF,%s,org.apache.iotdb.db.query.udf.example.UDAFCount,", + "UDAF,%s,org.apache.iotdb.db.query.udf.example.UDAFCount,AVAILABLE,", FUNCTION_TYPE_EXTERNAL_UDAF), result); } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFManagementIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFManagementIT.java index 798256e5a16d..67b7fec57a7c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFManagementIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFManagementIT.java @@ -87,7 +87,7 @@ public void testCreateReflectShowDrop() { statement.executeQuery("select udf(*, *) from root.vehicle"); try (ResultSet resultSet = statement.executeQuery("show functions")) { - assertEquals(3, resultSet.getMetaData().getColumnCount()); + assertEquals(4, resultSet.getMetaData().getColumnCount()); int count = 0; while (resultSet.next()) { StringBuilder stringBuilder = new StringBuilder(); @@ -123,7 +123,7 @@ public void testCreateAndDropSeveralTimes() { Assert.assertEquals( 1 + NATIVE_FUNCTIONS_COUNT + BUILTIN_FUNCTIONS_COUNT + BUILTIN_SCALAR_FUNCTIONS_COUNT, count); - assertEquals(3, resultSet.getMetaData().getColumnCount()); + assertEquals(4, resultSet.getMetaData().getColumnCount()); statement.execute("drop function udf"); statement.execute("create function udf as 'org.apache.iotdb.db.query.udf.example.Adder'"); @@ -138,7 +138,7 @@ public void testCreateAndDropSeveralTimes() { Assert.assertEquals( 1 + NATIVE_FUNCTIONS_COUNT + BUILTIN_FUNCTIONS_COUNT + BUILTIN_SCALAR_FUNCTIONS_COUNT, count); - assertEquals(3, resultSet.getMetaData().getColumnCount()); + assertEquals(4, resultSet.getMetaData().getColumnCount()); statement.execute("drop function udf"); } } catch (SQLException throwable) { @@ -249,7 +249,7 @@ public void testCreateFunctionWithURI() throws SQLException { Assert.assertEquals( 2 + NATIVE_FUNCTIONS_COUNT + BUILTIN_FUNCTIONS_COUNT + BUILTIN_SCALAR_FUNCTIONS_COUNT, count); - assertEquals(3, resultSet.getMetaData().getColumnCount()); + assertEquals(4, resultSet.getMetaData().getColumnCount()); statement.execute("drop function udf"); statement.execute("drop function udf1"); } catch (Exception e) { @@ -353,7 +353,10 @@ public void testCreateBuiltinFunction() { statement.execute("create function sin as 'org.apache.iotdb.db.query.udf.example.Adder'"); fail(); } catch (SQLException throwable) { - assertTrue(throwable.getMessage().contains("the same name UDF has been created")); + assertTrue( + throwable + .getMessage() + .contains("the given function name conflicts with the built-in function name")); } } @@ -376,7 +379,7 @@ public void testShowBuiltinFunction() { statement.execute("create function udf as 'org.apache.iotdb.db.query.udf.example.Adder'"); try (ResultSet resultSet = statement.executeQuery("show functions")) { - assertEquals(3, resultSet.getMetaData().getColumnCount()); + assertEquals(4, resultSet.getMetaData().getColumnCount()); int count = 0; while (resultSet.next()) { StringBuilder stringBuilder = new StringBuilder(); @@ -391,7 +394,7 @@ public void testShowBuiltinFunction() { if (result.contains(FUNCTION_TYPE_EXTERNAL_UDTF)) { Assert.assertEquals( String.format( - "UDF,%s,org.apache.iotdb.db.query.udf.example.Adder,", + "UDF,%s,org.apache.iotdb.db.query.udf.example.Adder,AVAILABLE,", FUNCTION_TYPE_EXTERNAL_UDTF), result); ++count; diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregationFunction.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregationFunction.java new file mode 100644 index 000000000000..e831d5b5292b --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/AggregationFunction.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.api.relational; + +public interface AggregationFunction extends SQLFunction {} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/SQLFunction.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/SQLFunction.java new file mode 100644 index 000000000000..e6a3846bc2ac --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/SQLFunction.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.api.relational; + +public interface SQLFunction {} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java new file mode 100644 index 000000000000..996d6994138b --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/ScalarFunction.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.api.relational; + +public interface ScalarFunction extends SQLFunction {} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java new file mode 100644 index 000000000000..01815ffa1631 --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.api.relational; + +public interface TableFunction extends SQLFunction {} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java new file mode 100644 index 000000000000..98d3b6b80c50 --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.api.relational.access; + +import org.apache.iotdb.udf.api.type.Binary; +import org.apache.iotdb.udf.api.type.Type; + +import java.io.IOException; + +public interface Record { + /** + * Returns the int value at the specified column in this row. + * + *

Users need to ensure that the data type of the specified column is {@code TSDataType.INT32}. + * + * @param columnIndex index of the specified column + * @return the int value at the specified column in this row + * @throws IOException if an I/O error occurs + */ + int getInt(int columnIndex) throws IOException; + + /** + * Returns the long value at the specified column in this row. + * + *

Users need to ensure that the data type of the specified column is {@code TSDataType.INT64}. + * + * @param columnIndex index of the specified column + * @return the long value at the specified column in this row + * @throws IOException if an I/O error occurs + */ + long getLong(int columnIndex) throws IOException; + + /** + * Returns the float value at the specified column in this row. + * + *

Users need to ensure that the data type of the specified column is {@code TSDataType.FLOAT}. + * + * @param columnIndex index of the specified column + * @return the float value at the specified column in this row + * @throws IOException if an I/O error occurs + */ + float getFloat(int columnIndex) throws IOException; + + /** + * Returns the double value at the specified column in this row. + * + *

Users need to ensure that the data type of the specified column is {@code + * TSDataType.DOUBLE}. + * + * @param columnIndex index of the specified column + * @return the double value at the specified column in this row + * @throws IOException if an I/O error occurs + */ + double getDouble(int columnIndex) throws IOException; + + /** + * Returns the boolean value at the specified column in this row. + * + *

Users need to ensure that the data type of the specified column is {@code + * TSDataType.BOOLEAN}. + * + * @param columnIndex index of the specified column + * @return the boolean value at the specified column in this row + * @throws IOException if an I/O error occurs + */ + boolean getBoolean(int columnIndex) throws IOException; + + /** + * Returns the Binary value at the specified column in this row. + * + *

Users need to ensure that the data type of the specified column is {@code TSDataType.TEXT}. + * + * @param columnIndex index of the specified column + * @return the Binary value at the specified column in this row + * @throws IOException if an I/O error occurs + */ + Binary getBinary(int columnIndex) throws IOException; + + /** + * Returns the String value at the specified column in this row. + * + *

Users need to ensure that the data type of the specified column is {@code TSDataType.TEXT}. + * + * @param columnIndex index of the specified column + * @return the String value at the specified column in this row + * @throws IOException if an I/O error occurs + */ + String getString(int columnIndex) throws IOException; + + /** + * Returns the actual data type of the value at the specified column in this row. + * + * @param columnIndex index of the specified column + * @return the actual data type of the value at the specified column in this row + */ + Type getDataType(int columnIndex); + + /** + * Returns {@code true} if the value of the specified column is null. + * + * @param columnIndex index of the specified column + * @return {@code true} if the value of the specified column is null + */ + boolean isNull(int columnIndex) throws IOException; + + /** + * Returns the number of columns. + * + * @return the number of columns + */ + int size(); +} diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index bdd93f9844b4..84145d01d795 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -203,6 +203,8 @@ public enum TSStatusCode { UDF_DOWNLOAD_ERROR(1201), CREATE_UDF_ON_DATANODE_ERROR(1202), DROP_UDF_ON_DATANODE_ERROR(1203), + CREATE_UDF_ERROR(1204), + DROP_UDF_ERROR(1205), // Trigger CREATE_TRIGGER_ERROR(1300), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java index 7f3da022e787..411a5b92072f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java @@ -48,7 +48,8 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; -import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.DropTableModelFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.DropTreeModelFunctionPlan; import org.apache.iotdb.confignode.consensus.request.write.model.CreateModelPlan; import org.apache.iotdb.confignode.consensus.request.write.model.DropModelInNodePlan; import org.apache.iotdb.confignode.consensus.request.write.model.DropModelPlan; @@ -294,8 +295,11 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept case CreateFunction: plan = new CreateFunctionPlan(); break; - case DropFunction: - plan = new DropFunctionPlan(); + case DropTreeModelFunction: + plan = new DropTreeModelFunctionPlan(); + break; + case DropTableModelFunction: + plan = new DropTableModelFunctionPlan(); break; case AddTriggerInTable: plan = new AddTriggerInTablePlan(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java index efc90da7e8ff..5d3bdcc6710e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java @@ -133,9 +133,12 @@ public enum ConfigPhysicalPlanType { /** Function. */ CreateFunction((short) 700), - DropFunction((short) 701), + DropTreeModelFunction((short) 701), GetFunctionTable((short) 702), GetFunctionJar((short) 703), + GetAllFunctionTable((short) 704), + UpdateFunction((short) 705), + DropTableModelFunction((short) 706), /** Template. */ CreateSchemaTemplate((short) 800), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetAllFunctionTablePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetAllFunctionTablePlan.java new file mode 100644 index 000000000000..a2f8d65ef1be --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetAllFunctionTablePlan.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.consensus.request.read.function; + +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; +import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; + +public class GetAllFunctionTablePlan extends ConfigPhysicalReadPlan { + + public GetAllFunctionTablePlan() { + super(ConfigPhysicalPlanType.GetAllFunctionTable); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java index bcc9fed7b9b8..3e36f6568859 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/function/GetFunctionTablePlan.java @@ -19,12 +19,20 @@ package org.apache.iotdb.confignode.consensus.request.read.function; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; public class GetFunctionTablePlan extends ConfigPhysicalReadPlan { - public GetFunctionTablePlan() { + private final Model model; + + public GetFunctionTablePlan(Model model) { super(ConfigPhysicalPlanType.GetFunctionTable); + this.model = model; + } + + public Model getModel() { + return model; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropTableModelFunctionPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropTableModelFunctionPlan.java new file mode 100644 index 000000000000..f43f9339bbab --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropTableModelFunctionPlan.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.consensus.request.write.function; + +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; + +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class DropTableModelFunctionPlan extends ConfigPhysicalPlan { + + private String functionName; + + public DropTableModelFunctionPlan() { + super(ConfigPhysicalPlanType.DropTableModelFunction); + } + + public DropTableModelFunctionPlan(String functionName) { + super(ConfigPhysicalPlanType.DropTableModelFunction); + this.functionName = functionName; + } + + public String getFunctionName() { + return functionName; + } + + @Override + protected void serializeImpl(DataOutputStream stream) throws IOException { + stream.writeShort(getType().getPlanType()); + ReadWriteIOUtils.write(functionName, stream); + } + + @Override + protected void deserializeImpl(ByteBuffer buffer) throws IOException { + functionName = ReadWriteIOUtils.readString(buffer); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + DropTableModelFunctionPlan that = (DropTableModelFunctionPlan) o; + return Objects.equals(functionName, that.functionName); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), functionName); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropFunctionPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropTreeModelFunctionPlan.java similarity index 85% rename from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropFunctionPlan.java rename to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropTreeModelFunctionPlan.java index a4ecc6330b30..ff13ed03b3fe 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropFunctionPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/DropTreeModelFunctionPlan.java @@ -29,16 +29,16 @@ import java.nio.ByteBuffer; import java.util.Objects; -public class DropFunctionPlan extends ConfigPhysicalPlan { +public class DropTreeModelFunctionPlan extends ConfigPhysicalPlan { private String functionName; - public DropFunctionPlan() { - super(ConfigPhysicalPlanType.DropFunction); + public DropTreeModelFunctionPlan() { + super(ConfigPhysicalPlanType.DropTreeModelFunction); } - public DropFunctionPlan(String functionName) { - super(ConfigPhysicalPlanType.DropFunction); + public DropTreeModelFunctionPlan(String functionName) { + super(ConfigPhysicalPlanType.DropTreeModelFunction); this.functionName = functionName; } @@ -68,7 +68,7 @@ public boolean equals(Object o) { if (!super.equals(o)) { return false; } - DropFunctionPlan that = (DropFunctionPlan) o; + DropTreeModelFunctionPlan that = (DropTreeModelFunctionPlan) o; return Objects.equals(functionName, that.functionName); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/UpdateFunctionPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/UpdateFunctionPlan.java new file mode 100644 index 000000000000..eb91ae678194 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/function/UpdateFunctionPlan.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.consensus.request.write.function; + +import org.apache.iotdb.commons.udf.UDFInformation; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class UpdateFunctionPlan extends ConfigPhysicalPlan { + private UDFInformation udfInformation; + + public UpdateFunctionPlan() { + super(ConfigPhysicalPlanType.UpdateFunction); + } + + public UpdateFunctionPlan(UDFInformation udfInformation) { + super(ConfigPhysicalPlanType.UpdateFunction); + this.udfInformation = udfInformation; + } + + public UDFInformation getUdfInformation() { + return udfInformation; + } + + @Override + protected void serializeImpl(DataOutputStream stream) throws IOException { + stream.writeShort(getType().getPlanType()); + udfInformation.serialize(stream); + } + + @Override + protected void deserializeImpl(ByteBuffer buffer) throws IOException { + udfInformation = UDFInformation.deserialize(buffer); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + UpdateFunctionPlan that = (UpdateFunctionPlan) o; + return Objects.equals(udfInformation, that.udfInformation); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), udfInformation); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/function/FunctionTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/function/FunctionTableResp.java index 82d375a40b1a..8ae7f698945f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/function/FunctionTableResp.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/function/FunctionTableResp.java @@ -34,11 +34,11 @@ public class FunctionTableResp implements DataSet { private final TSStatus status; - private final List allUdfInformation; + private final List udfInformation; - public FunctionTableResp(TSStatus status, List allUdfInformation) { + public FunctionTableResp(TSStatus status, List udfInformation) { this.status = status; - this.allUdfInformation = allUdfInformation; + this.udfInformation = udfInformation; } @TestOnly @@ -47,15 +47,15 @@ public TSStatus getStatus() { } @TestOnly - public List getAllUdfInformation() { - return allUdfInformation; + public List getUdfInformation() { + return udfInformation; } public TGetUDFTableResp convertToThriftResponse() throws IOException { List udfInformationByteBuffers = new ArrayList<>(); - for (UDFInformation udfInformation : allUdfInformation) { - udfInformationByteBuffers.add(udfInformation.serialize()); + for (UDFInformation information : udfInformation) { + udfInformationByteBuffers.add(information.serialize()); } return new TGetUDFTableResp(status, udfInformationByteBuffers); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 45e19047ef4d..d7459753ec02 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -164,6 +164,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; import org.apache.iotdb.confignode.rpc.thrift.TDescTableResp; import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; @@ -193,6 +194,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; @@ -1533,18 +1535,18 @@ public TSStatus createFunction(TCreateFunctionReq req) { } @Override - public TSStatus dropFunction(String udfName) { + public TSStatus dropFunction(TDropFunctionReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - ? udfManager.dropFunction(udfName) + ? udfManager.dropFunction(req.getModel(), req.getUdfName()) : status; } @Override - public TGetUDFTableResp getUDFTable() { + public TGetUDFTableResp getUDFTable(TGetUdfTableReq req) { TSStatus status = confirmLeader(); return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - ? udfManager.getUDFTable() + ? udfManager.getUDFTable(req.getModel()) : new TGetUDFTableResp(status, Collections.emptyList()); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 954946f229ea..526c66612995 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -89,6 +89,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq; import org.apache.iotdb.confignode.rpc.thrift.TDescTableResp; import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; +import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq; @@ -118,6 +119,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferReq; @@ -519,9 +521,9 @@ TDataPartitionTableResp getOrCreateDataPartition( TSStatus createFunction(TCreateFunctionReq req); - TSStatus dropFunction(String udfName); + TSStatus dropFunction(TDropFunctionReq req); - TGetUDFTableResp getUDFTable(); + TGetUDFTableResp getUDFTable(TGetUdfTableReq req); TGetJarInListResp getUDFJar(TGetJarInListReq req); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java index 00ed020a14e2..810d1426a8ee 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.manager; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.IoTDBConstant; @@ -27,10 +28,13 @@ import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.consensus.request.read.function.GetAllFunctionTablePlan; import org.apache.iotdb.confignode.consensus.request.read.function.GetFunctionTablePlan; import org.apache.iotdb.confignode.consensus.request.read.function.GetUDFJarPlan; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; -import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.DropTableModelFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.DropTreeModelFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.UpdateFunctionPlan; import org.apache.iotdb.confignode.consensus.response.JarResp; import org.apache.iotdb.confignode.consensus.response.function.FunctionTableResp; import org.apache.iotdb.confignode.persistence.UDFInfo; @@ -83,35 +87,42 @@ public TSStatus createFunction(TCreateFunctionReq req) { final String jarMD5 = req.getJarMD5(); final String jarName = req.getJarName(); final byte[] jarFile = req.getJarFile(); - udfInfo.validate(udfName, jarName, jarMD5); + final Model model = req.getModel(); + udfInfo.validate(model, udfName, jarName, jarMD5); - final UDFInformation udfInformation = - new UDFInformation(udfName, req.getClassName(), false, isUsingURI, jarName, jarMD5); - final boolean needToSaveJar = isUsingURI && udfInfo.needToSaveJar(jarName); + UDFInformation udfInformation = + new UDFInformation( + udfName, req.getClassName(), model, false, isUsingURI, jarName, jarMD5); - LOGGER.info( - "Start to create UDF [{}] on Data Nodes, needToSaveJar[{}]", udfName, needToSaveJar); - - final TSStatus dataNodesStatus = - RpcUtils.squashResponseStatusList( - createFunctionOnDataNodes(udfInformation, needToSaveJar ? jarFile : null)); - if (dataNodesStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return dataNodesStatus; - } + final boolean needToSaveJar = isUsingURI && udfInfo.needToSaveJar(jarName); + LOGGER.info("Start to add UDF [{}] in UDF_Table on Config Nodes", udfName); CreateFunctionPlan createFunctionPlan = new CreateFunctionPlan(udfInformation, needToSaveJar ? new Binary(jarFile) : null); if (needToSaveJar && createFunctionPlan.getSerializedSize() > planSizeLimit) { - return new TSStatus(TSStatusCode.CREATE_TRIGGER_ERROR.getStatusCode()) + return new TSStatus(TSStatusCode.CREATE_UDF_ERROR.getStatusCode()) .setMessage( String.format( "Fail to create UDF[%s], the size of Jar is too large, you can increase the value of property 'config_node_ratis_log_appender_buffer_size_max' on ConfigNode", udfName)); } + TSStatus preCreateStatus = configManager.getConsensusManager().write(createFunctionPlan); + if (preCreateStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return preCreateStatus; + } + udfInformation = + new UDFInformation(udfName, req.getClassName(), model, true, isUsingURI, jarName, jarMD5); + LOGGER.info( + "Start to create UDF [{}] on Data Nodes, needToSaveJar[{}]", udfName, needToSaveJar); + final TSStatus dataNodesStatus = + RpcUtils.squashResponseStatusList( + createFunctionOnDataNodes(udfInformation, needToSaveJar ? jarFile : null)); + if (dataNodesStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return dataNodesStatus; + } - LOGGER.info("Start to add UDF [{}] in UDF_Table on Config Nodes", udfName); - - return configManager.getConsensusManager().write(createFunctionPlan); + LOGGER.info("Start to activate UDF [{}] in UDF_Table on Config Nodes", udfName); + return configManager.getConsensusManager().write(new UpdateFunctionPlan(udfInformation)); } catch (Exception e) { LOGGER.warn(e.getMessage(), e); return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) @@ -134,18 +145,33 @@ private List createFunctionOnDataNodes(UDFInformation udfInformation, return clientHandler.getResponseList(); } - public TSStatus dropFunction(String functionName) { + public TSStatus dropFunction(Model model, String functionName) { functionName = functionName.toUpperCase(); udfInfo.acquireUDFTableLock(); try { - udfInfo.validate(functionName); + UDFInformation information = udfInfo.getUDFInformation(model, functionName); + information.setAvailable(false); + TSStatus preDropStatus = + configManager.getConsensusManager().write(new UpdateFunctionPlan(information)); + if (preDropStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return preDropStatus; + } - TSStatus result = RpcUtils.squashResponseStatusList(dropFunctionOnDataNodes(functionName)); + TSStatus result = + RpcUtils.squashResponseStatusList(dropFunctionOnDataNodes(model, functionName)); if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return result; } - return configManager.getConsensusManager().write(new DropFunctionPlan(functionName)); + if (Model.TREE.equals(model)) { + return configManager + .getConsensusManager() + .write(new DropTreeModelFunctionPlan(functionName)); + } else { + return configManager + .getConsensusManager() + .write(new DropTableModelFunctionPlan(functionName)); + } } catch (Exception e) { LOGGER.warn(e.getMessage(), e); return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) @@ -155,11 +181,12 @@ public TSStatus dropFunction(String functionName) { } } - private List dropFunctionOnDataNodes(String functionName) { + private List dropFunctionOnDataNodes(Model model, String functionName) { final Map dataNodeLocationMap = configManager.getNodeManager().getRegisteredDataNodeLocations(); - final TDropFunctionInstanceReq request = new TDropFunctionInstanceReq(functionName, false); + final TDropFunctionInstanceReq request = + new TDropFunctionInstanceReq(functionName, false).setModel(model); DataNodeAsyncRequestContext clientHandler = new DataNodeAsyncRequestContext<>( @@ -168,13 +195,27 @@ private List dropFunctionOnDataNodes(String functionName) { return clientHandler.getResponseList(); } - public TGetUDFTableResp getUDFTable() { + public TGetUDFTableResp getUDFTable(Model model) { + try { + return ((FunctionTableResp) + configManager.getConsensusManager().read(new GetFunctionTablePlan(model))) + .convertToThriftResponse(); + } catch (IOException | ConsensusException e) { + LOGGER.error("Fail to get UDFTable", e); + return new TGetUDFTableResp( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage(e.getMessage()), + Collections.emptyList()); + } + } + + public TGetUDFTableResp getAllUDFTable() { try { return ((FunctionTableResp) - configManager.getConsensusManager().read(new GetFunctionTablePlan())) + configManager.getConsensusManager().read(new GetAllFunctionTablePlan())) .convertToThriftResponse(); } catch (IOException | ConsensusException e) { - LOGGER.error("Fail to get TriggerTable", e); + LOGGER.error("Fail to get AllUDFTable", e); return new TGetUDFTableResp( new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) .setMessage(e.getMessage()), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index d0053594ba16..b5208c76f3aa 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -256,7 +256,7 @@ private TRuntimeConfiguration getRuntimeConfiguration() { runtimeConfiguration.setAllTriggerInformation( getTriggerManager().getTriggerTable(false).getAllTriggerInformation()); runtimeConfiguration.setAllUDFInformation( - getUDFManager().getUDFTable().getAllUDFInformation()); + getUDFManager().getAllUDFTable().getAllUDFInformation()); runtimeConfiguration.setAllPipeInformation( getPipeManager().getPipePluginCoordinator().getPipePluginTable().getAllPipePluginMeta()); runtimeConfiguration.setAllTTLInformation( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java index 51f997192a91..1ae4b5b9640d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/UDFInfo.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.persistence; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.executable.ExecutableManager; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; @@ -28,9 +29,10 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.consensus.request.read.function.GetFunctionTablePlan; import org.apache.iotdb.confignode.consensus.request.read.function.GetUDFJarPlan; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; -import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.UpdateFunctionPlan; import org.apache.iotdb.confignode.consensus.response.JarResp; import org.apache.iotdb.confignode.consensus.response.function.FunctionTableResp; import org.apache.iotdb.consensus.common.DataSet; @@ -90,9 +92,10 @@ public void releaseUDFTableLock() { } /** Validate whether the UDF can be created. */ - public void validate(String udfName, String jarName, String jarMD5) + public void validate(Model model, String udfName, String jarName, String jarMD5) throws UDFManagementException { - if (udfTable.containsUDF(udfName)) { + if (udfTable.containsUDF(model, udfName) + && udfTable.getUDFInformation(model, udfName).isAvailable()) { throw new UDFManagementException( String.format("Failed to create UDF [%s], the same name UDF has been created", udfName)); } @@ -106,9 +109,10 @@ public void validate(String udfName, String jarName, String jarMD5) } /** Validate whether the UDF can be dropped. */ - public void validate(String udfName) throws UDFManagementException { - if (udfTable.containsUDF(udfName)) { - return; + public UDFInformation getUDFInformation(Model model, String udfName) + throws UDFManagementException { + if (udfTable.containsUDF(model, udfName)) { + return udfTable.getUDFInformation(model, udfName); } throw new UDFManagementException( String.format("Failed to drop UDF [%s], this UDF has not been created", udfName)); @@ -141,10 +145,16 @@ public TSStatus addUDFInTable(CreateFunctionPlan physicalPlan) { } } - public DataSet getUDFTable() { + public DataSet getUDFTable(GetFunctionTablePlan plan) { return new FunctionTableResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), - udfTable.getAllNonBuiltInUDFInformation()); + udfTable.getUDFInformationList(plan.getModel())); + } + + public DataSet getAllUDFTable() { + return new FunctionTableResp( + new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), + udfTable.getAllInformationList()); } public JarResp getUDFJar(GetUDFJarPlan physicalPlan) { @@ -165,17 +175,22 @@ public JarResp getUDFJar(GetUDFJarPlan physicalPlan) { return new JarResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), jarList); } - public TSStatus dropFunction(DropFunctionPlan req) { - String udfName = req.getFunctionName(); - if (udfTable.containsUDF(udfName)) { - existedJarToMD5.remove(udfTable.getUDFInformation(udfName).getJarName()); - udfTable.removeUDFInformation(udfName); + public TSStatus dropFunction(Model model, String functionName) { + if (udfTable.containsUDF(model, functionName)) { + existedJarToMD5.remove(udfTable.getUDFInformation(model, functionName).getJarName()); + udfTable.removeUDFInformation(model, functionName); } return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } + public TSStatus updateFunction(UpdateFunctionPlan req) { + UDFInformation udfInformation = req.getUdfInformation(); + udfTable.addUDFInformation(udfInformation.getFunctionName(), udfInformation); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + @TestOnly - public Map getRawUDFTable() { + public Map> getRawUDFTable() { return udfTable.getTable(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index fad1c5eb5b4a..29ce77ae5e29 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.persistence.executor; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSchemaNode; import org.apache.iotdb.commons.auth.AuthException; @@ -33,6 +34,7 @@ import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan; import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan; import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan; +import org.apache.iotdb.confignode.consensus.request.read.function.GetFunctionTablePlan; import org.apache.iotdb.confignode.consensus.request.read.function.GetUDFJarPlan; import org.apache.iotdb.confignode.consensus.request.read.model.GetModelInfoPlan; import org.apache.iotdb.confignode.consensus.request.read.model.ShowModelPlan; @@ -80,7 +82,9 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; -import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.DropTableModelFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.DropTreeModelFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.UpdateFunctionPlan; import org.apache.iotdb.confignode.consensus.request.write.model.CreateModelPlan; import org.apache.iotdb.confignode.consensus.request.write.model.DropModelInNodePlan; import org.apache.iotdb.confignode.consensus.request.write.model.DropModelPlan; @@ -341,9 +345,11 @@ public DataSet executeQueryPlan(final ConfigPhysicalReadPlan req) case SHOW_CQ: return cqInfo.showCQ(); case GetFunctionTable: - return udfInfo.getUDFTable(); + return udfInfo.getUDFTable((GetFunctionTablePlan) req); case GetFunctionJar: return udfInfo.getUDFJar((GetUDFJarPlan) req); + case GetAllFunctionTable: + return udfInfo.getAllUDFTable(); case ShowModel: return modelInfo.showModel((ShowModelPlan) req); case GetModelInfo: @@ -470,8 +476,14 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan) return clusterInfo.updateClusterId((UpdateClusterIdPlan) physicalPlan); case CreateFunction: return udfInfo.addUDFInTable((CreateFunctionPlan) physicalPlan); - case DropFunction: - return udfInfo.dropFunction((DropFunctionPlan) physicalPlan); + case UpdateFunction: + return udfInfo.updateFunction((UpdateFunctionPlan) physicalPlan); + case DropTreeModelFunction: + return udfInfo.dropFunction( + Model.TREE, ((DropTreeModelFunctionPlan) physicalPlan).getFunctionName()); + case DropTableModelFunction: + return udfInfo.dropFunction( + Model.TABLE, ((DropTableModelFunctionPlan) physicalPlan).getFunctionName()); case AddTriggerInTable: return triggerInfo.addTriggerInTable((AddTriggerInTablePlan) physicalPlan); case DeleteTriggerInTable: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 75ffcc413c19..a524150d0d76 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -161,6 +161,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; import org.apache.iotdb.confignode.rpc.thrift.TLoginReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; @@ -839,12 +840,12 @@ public TSStatus createFunction(TCreateFunctionReq req) { @Override public TSStatus dropFunction(TDropFunctionReq req) { - return configManager.dropFunction(req.getUdfName()); + return configManager.dropFunction(req); } @Override - public TGetUDFTableResp getUDFTable() { - return configManager.getUDFTable(); + public TGetUDFTableResp getUDFTable(TGetUdfTableReq req) { + return configManager.getUDFTable(req); } @Override diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index 7e7201d920e8..4e357225c14b 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.consensus.request; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; @@ -78,7 +79,8 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; -import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.DropTableModelFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.function.DropTreeModelFunctionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; @@ -1474,7 +1476,7 @@ public void UpdateTriggerLocationPlanTest() throws IOException { @Test public void CreateFunctionPlanTest() throws IOException { UDFInformation udfInformation = - new UDFInformation("test1", "test1", false, true, "test1.jar", "12345"); + new UDFInformation("test1", "test1", Model.TREE, true, true, "test1.jar", "12345"); CreateFunctionPlan createFunctionPlan0 = new CreateFunctionPlan(udfInformation, new Binary(new byte[] {1, 2, 3})); CreateFunctionPlan createFunctionPlan1 = @@ -1485,11 +1487,17 @@ public void CreateFunctionPlanTest() throws IOException { @Test public void DropFunctionPlanTest() throws IOException { - DropFunctionPlan dropFunctionPlan0 = new DropFunctionPlan("test"); - DropFunctionPlan dropFunctionPlan1 = - (DropFunctionPlan) - ConfigPhysicalPlan.Factory.create(dropFunctionPlan0.serializeToByteBuffer()); - Assert.assertEquals(dropFunctionPlan0, dropFunctionPlan1); + DropTreeModelFunctionPlan dropTreeModelFunctionPlan0 = new DropTreeModelFunctionPlan("test"); + DropTreeModelFunctionPlan dropTreeModelFunctionPlan1 = + (DropTreeModelFunctionPlan) + ConfigPhysicalPlan.Factory.create(dropTreeModelFunctionPlan0.serializeToByteBuffer()); + Assert.assertEquals(dropTreeModelFunctionPlan0, dropTreeModelFunctionPlan1); + + DropTableModelFunctionPlan dropTableModelFunctionPlan0 = new DropTableModelFunctionPlan("test"); + DropTableModelFunctionPlan dropTableModelFunctionPlan1 = + (DropTableModelFunctionPlan) + ConfigPhysicalPlan.Factory.create(dropTableModelFunctionPlan0.serializeToByteBuffer()); + Assert.assertEquals(dropTableModelFunctionPlan0, dropTableModelFunctionPlan1); } @Test diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/ConvertToThriftRespTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/ConvertToThriftRespTest.java index 262d5882e5ca..8d8eadb93afb 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/ConvertToThriftRespTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/ConvertToThriftRespTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.consensus.response; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; @@ -51,15 +52,16 @@ public void convertFunctionRespTest() throws IOException { new FunctionTableResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), ImmutableList.of( - new UDFInformation("test1", "test1", false, true, "test1.jar", "12345"), - new UDFInformation("test2", "test2", false, true, "test2.jar", "12342"))); + new UDFInformation("test1", "test1", Model.TREE, true, true, "test1.jar", "12345"), + new UDFInformation( + "test2", "test2", Model.TREE, true, true, "test2.jar", "12342"))); TGetUDFTableResp tGetUDFTableResp = functionTableResp.convertToThriftResponse(); Assert.assertEquals(functionTableResp.getStatus(), tGetUDFTableResp.status); Assert.assertEquals( - functionTableResp.getAllUdfInformation().get(0), + functionTableResp.getUdfInformation().get(0), UDFInformation.deserialize(tGetUDFTableResp.allUDFInformation.get(0))); Assert.assertEquals( - functionTableResp.getAllUdfInformation().get(1), + functionTableResp.getUdfInformation().get(1), UDFInformation.deserialize(tGetUDFTableResp.allUDFInformation.get(1))); } @@ -116,15 +118,16 @@ public void convertJarRespTest() throws IOException { new FunctionTableResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), ImmutableList.of( - new UDFInformation("test1", "test1", false, true, "test1.jar", "12345"), - new UDFInformation("test2", "test2", false, true, "test2.jar", "12342"))); + new UDFInformation("test1", "test1", Model.TREE, true, true, "test1.jar", "12345"), + new UDFInformation( + "test2", "test2", Model.TREE, true, true, "test2.jar", "12342"))); TGetUDFTableResp tGetUDFTableResp = functionTableResp.convertToThriftResponse(); Assert.assertEquals(functionTableResp.getStatus(), tGetUDFTableResp.status); Assert.assertEquals( - functionTableResp.getAllUdfInformation().get(0), + functionTableResp.getUdfInformation().get(0), UDFInformation.deserialize(tGetUDFTableResp.allUDFInformation.get(0))); Assert.assertEquals( - functionTableResp.getAllUdfInformation().get(1), + functionTableResp.getUdfInformation().get(1), UDFInformation.deserialize(tGetUDFTableResp.allUDFInformation.get(1))); } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java index 708ea779fad7..dc8b577f28e5 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/UDFInfoTest.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.confignode.persistence; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.udf.UDFInformation; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; @@ -61,13 +62,14 @@ public static void cleanup() throws IOException { @Test public void testSnapshot() throws TException, IOException, IllegalPathException { UDFInformation udfInformation = - new UDFInformation("test1", "test1", false, true, "test1.jar", "12345"); + new UDFInformation("test1", "test1", Model.TREE, true, true, "test1.jar", "12345"); CreateFunctionPlan createFunctionPlan = new CreateFunctionPlan(udfInformation, new Binary(new byte[] {1, 2, 3})); udfInfo.addUDFInTable(createFunctionPlan); udfInfoSaveBefore.addUDFInTable(createFunctionPlan); - udfInformation = new UDFInformation("test2", "test2", false, true, "test2.jar", "123456"); + udfInformation = + new UDFInformation("test2", "test2", Model.TREE, true, true, "test2.jar", "123456"); createFunctionPlan = new CreateFunctionPlan(udfInformation, new Binary(new byte[] {1, 2, 3})); udfInfo.addUDFInTable(createFunctionPlan); udfInfoSaveBefore.addUDFInTable(createFunctionPlan); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index add89dd31625..e9bfb6ce6472 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -124,6 +124,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; import org.apache.iotdb.confignode.rpc.thrift.TLoginReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; @@ -888,9 +889,9 @@ public TSStatus dropFunction(TDropFunctionReq req) throws TException { } @Override - public TGetUDFTableResp getUDFTable() throws TException { + public TGetUDFTableResp getUDFTable(TGetUdfTableReq req) throws TException { return executeRemoteCallWithRetry( - () -> client.getUDFTable(), resp -> !updateConfigNodeLeader(resp.status)); + () -> client.getUDFTable(req), resp -> !updateConfigNodeLeader(resp.status)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index d028fb1f1820..d05d61de49df 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -2446,7 +2446,8 @@ public TSStatus createFunction(TCreateFunctionInstanceReq req) { @Override public TSStatus dropFunction(TDropFunctionInstanceReq req) { try { - UDFManagementService.getInstance().deregister(req.getFunctionName(), req.isNeedToDeleteJar()); + UDFManagementService.getInstance() + .deregister(req.model, req.getFunctionName(), req.isNeedToDeleteJar()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (Exception e) { return new TSStatus(TSStatusCode.DROP_UDF_ON_DATANODE_ERROR.getStatusCode()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java index 96454706b4b8..bc87b710a6e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java @@ -106,6 +106,7 @@ private ColumnHeaderConstant() { public static final String FUNCTION_NAME = "FunctionName"; public static final String FUNCTION_TYPE = "FunctionType"; public static final String CLASS_NAME_UDF = "ClassName(UDF)"; + public static final String FUNCTION_STATE = "State"; // column names for show triggers statement public static final String TRIGGER_NAME = "TriggerName"; @@ -416,7 +417,8 @@ private ColumnHeaderConstant() { ImmutableList.of( new ColumnHeader(FUNCTION_NAME, TSDataType.TEXT), new ColumnHeader(FUNCTION_TYPE, TSDataType.TEXT), - new ColumnHeader(CLASS_NAME_UDF, TSDataType.TEXT)); + new ColumnHeader(CLASS_NAME_UDF, TSDataType.TEXT), + new ColumnHeader(FUNCTION_STATE, TSDataType.TEXT)); public static final List showTriggersColumnHeaders = ImmutableList.of( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java index 45ca2169a032..a0b78cc8cd6e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/UDAFAccumulator.java @@ -90,7 +90,7 @@ private void reflectAndValidateUDF( List childExpressionDataTypes, Map attributes, boolean isInputRaw) { - udaf = (UDAF) UDFManagementService.getInstance().reflect(functionName); + udaf = UDFManagementService.getInstance().reflect(functionName, UDAF.class); state = udaf.createState(); final UDFParameters parameters = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java index 80677d5adece..5e9c9603e31d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.config; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CountDatabaseTask; @@ -323,13 +324,13 @@ public IConfigTask visitCreateFunction( @Override public IConfigTask visitDropFunction( DropFunctionStatement dropFunctionStatement, MPPQueryContext context) { - return new DropFunctionTask(dropFunctionStatement); + return new DropFunctionTask(Model.TREE, dropFunctionStatement.getUdfName()); } @Override public IConfigTask visitShowFunctions( ShowFunctionsStatement showFunctionsStatement, MPPQueryContext context) { - return new ShowFunctionsTask(); + return new ShowFunctionsTask(Model.TREE); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index d8a0cb734066..bf7bc065d49e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.executor; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; @@ -60,6 +61,7 @@ import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager; import org.apache.iotdb.commons.udf.service.UDFClassLoader; import org.apache.iotdb.commons.udf.service.UDFExecutableManager; +import org.apache.iotdb.commons.udf.service.UDFManagementService; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; @@ -104,6 +106,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetUdfTableReq; import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq; import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferReq; import org.apache.iotdb.confignode.rpc.thrift.TPipeConfigTransferResp; @@ -206,7 +209,6 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountTimeSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateContinuousQueryStatement; -import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateFunctionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTriggerStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.DeleteDatabaseStatement; @@ -273,7 +275,6 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.iotdb.trigger.api.Trigger; import org.apache.iotdb.trigger.api.enums.FailureStrategy; -import org.apache.iotdb.udf.api.UDF; import com.google.common.util.concurrent.SettableFuture; import org.apache.commons.codec.digest.DigestUtils; @@ -302,6 +303,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; @@ -492,26 +494,31 @@ public SettableFuture deleteDatabase( @Override public SettableFuture createFunction( - CreateFunctionStatement createFunctionStatement) { + Model model, + String udfName, + String className, + Optional stringURI, + Class baseClazz) { SettableFuture future = SettableFuture.create(); - String udfName = createFunctionStatement.getUdfName(); - String className = createFunctionStatement.getClassName(); + if (UDFManagementService.getInstance().checkIsBuiltInFunctionName(model, udfName)) { + future.setException( + new IoTDBException( + String.format( + "Failed to create UDF [%s], the given function name conflicts with the built-in function name.", + udfName.toUpperCase()), + TSStatusCode.CREATE_UDF_ERROR.getStatusCode())); + return future; + } try (ConfigNodeClient client = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - TCreateFunctionReq tCreateFunctionReq = new TCreateFunctionReq(udfName, className, false); + TCreateFunctionReq tCreateFunctionReq = + new TCreateFunctionReq(udfName, className, false).setModel(model); String libRoot = UDFExecutableManager.getInstance().getLibRoot(); String jarFileName; ByteBuffer jarFile; String jarMd5; - if (createFunctionStatement.isUsingURI()) { - String uriString = createFunctionStatement.getUriString(); - if (uriString == null || uriString.isEmpty()) { - future.setException( - new IoTDBException( - "URI is empty, please specify the URI.", - TSStatusCode.UDF_DOWNLOAD_ERROR.getStatusCode())); - return future; - } + if (stringURI.isPresent()) { + String uriString = stringURI.get(); jarFileName = new File(uriString).getName(); try { URI uri = new URI(uriString); @@ -545,16 +552,10 @@ public SettableFuture createFunction( jarMd5 = DigestUtils.md5Hex(Files.newInputStream(Paths.get(libRoot))); } } catch (IOException | URISyntaxException e) { - LOGGER.warn( - "Failed to get executable for UDF({}) using URI: {}.", - createFunctionStatement.getUdfName(), - createFunctionStatement.getUriString(), - e); + LOGGER.warn("Failed to get executable for UDF({}) using URI: {}.", udfName, uriString, e); future.setException( new IoTDBException( - "Failed to get executable for UDF '" - + createFunctionStatement.getUdfName() - + "', please check the URI.", + "Failed to get executable for UDF '" + udfName + "', please check the URI.", TSStatusCode.TRIGGER_DOWNLOAD_ERROR.getStatusCode())); return future; } @@ -573,8 +574,8 @@ public SettableFuture createFunction( // try to create instance, this request will fail if creation is not successful try (UDFClassLoader classLoader = new UDFClassLoader(libRoot)) { // ensure that jar file contains the class and the class is a UDF - Class clazz = Class.forName(createFunctionStatement.getClassName(), true, classLoader); - UDF udf = (UDF) clazz.getDeclaredConstructor().newInstance(); + Class clazz = Class.forName(className, true, classLoader); + baseClazz.cast(clazz.getDeclaredConstructor().newInstance()); } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException @@ -582,15 +583,16 @@ public SettableFuture createFunction( | InvocationTargetException | ClassCastException e) { LOGGER.warn( - "Failed to create function when try to create UDF({}) instance first.", - createFunctionStatement.getUdfName(), + "Failed to create function when try to create {}({}) instance first.", + baseClazz.getSimpleName(), + udfName, e); future.setException( new IoTDBException( "Failed to load class '" - + createFunctionStatement.getClassName() + + className + "', because it's not found in jar file or is invalid: " - + createFunctionStatement.getUriString(), + + stringURI.orElse(null), TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode())); return future; } @@ -613,11 +615,19 @@ public SettableFuture createFunction( } @Override - public SettableFuture dropFunction(String udfName) { + public SettableFuture dropFunction(Model model, String udfName) { SettableFuture future = SettableFuture.create(); + if (UDFManagementService.getInstance().checkIsBuiltInFunctionName(model, udfName)) { + future.setException( + new IoTDBException( + String.format("Built-in function %s can not be deregistered.", udfName.toUpperCase()), + TSStatusCode.DROP_UDF_ERROR.getStatusCode())); + return future; + } try (ConfigNodeClient client = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TSStatus executionStatus = client.dropFunction(new TDropFunctionReq(udfName)); + final TSStatus executionStatus = + client.dropFunction(new TDropFunctionReq(udfName).setModel(model)); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { LOGGER.warn("[{}] Failed to drop function {}.", executionStatus, udfName); @@ -632,11 +642,11 @@ public SettableFuture dropFunction(String udfName) { } @Override - public SettableFuture showFunctions() { + public SettableFuture showFunctions(Model model) { SettableFuture future = SettableFuture.create(); try (ConfigNodeClient client = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - TGetUDFTableResp getUDFTableResp = client.getUDFTable(); + TGetUDFTableResp getUDFTableResp = client.getUDFTable(new TGetUdfTableReq(model)); if (getUDFTableResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { future.setException( new IoTDBException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java index 1bc3a8d4354f..1773ad94d7dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.executor; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TFlushReq; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq; @@ -43,7 +44,6 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountDatabaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CountTimeSlotListStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateContinuousQueryStatement; -import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateFunctionStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTriggerStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.DeleteDatabaseStatement; @@ -96,6 +96,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; public interface IConfigTaskExecutor { @@ -110,11 +111,16 @@ public interface IConfigTaskExecutor { SettableFuture deleteDatabase(DeleteDatabaseStatement deleteDatabaseStatement); - SettableFuture createFunction(CreateFunctionStatement createFunctionStatement); + SettableFuture createFunction( + Model model, + String udfName, + String className, + Optional stringURI, + Class baseClazz); - SettableFuture dropFunction(String udfName); + SettableFuture dropFunction(Model model, String udfName); - SettableFuture showFunctions(); + SettableFuture showFunctions(Model model); SettableFuture createTrigger(CreateTriggerStatement createTriggerStatement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/CreateFunctionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/CreateFunctionTask.java index 0dea986da82b..5bd093a1fdec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/CreateFunctionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/CreateFunctionTask.java @@ -19,24 +19,46 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.metadata; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateFunction; import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateFunctionStatement; +import org.apache.iotdb.udf.api.UDF; +import org.apache.iotdb.udf.api.relational.SQLFunction; import com.google.common.util.concurrent.ListenableFuture; +import java.util.Optional; + public class CreateFunctionTask implements IConfigTask { - private final CreateFunctionStatement createFunctionStatement; + private final Model model; + private final String udfName; + private final String className; + private final Optional uriString; + private final Class baseClazz; public CreateFunctionTask(CreateFunctionStatement createFunctionStatement) { - this.createFunctionStatement = createFunctionStatement; + this.udfName = createFunctionStatement.getUdfName(); + this.className = createFunctionStatement.getClassName(); + this.uriString = createFunctionStatement.getUriString(); + this.baseClazz = UDF.class; // Tree Model + this.model = Model.TREE; + } + + public CreateFunctionTask(CreateFunction createFunctionStatement) { + this.udfName = createFunctionStatement.getUdfName(); + this.className = createFunctionStatement.getClassName(); + this.uriString = createFunctionStatement.getUriString(); + this.baseClazz = SQLFunction.class; // Table Model + this.model = Model.TABLE; } @Override public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) throws InterruptedException { - return configTaskExecutor.createFunction(createFunctionStatement); + return configTaskExecutor.createFunction(model, udfName, className, uriString, baseClazz); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropFunctionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropFunctionTask.java index 5aa8133bf0a8..9eff17015990 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropFunctionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropFunctionTask.java @@ -19,24 +19,26 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.metadata; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; -import org.apache.iotdb.db.queryengine.plan.statement.metadata.DropFunctionStatement; import com.google.common.util.concurrent.ListenableFuture; public class DropFunctionTask implements IConfigTask { + private final Model model; private final String udfName; - public DropFunctionTask(DropFunctionStatement dropFunctionStatement) { - udfName = dropFunctionStatement.getUdfName(); + public DropFunctionTask(Model model, String udfName) { + this.model = model; + this.udfName = udfName; } @Override public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) throws InterruptedException { - return configTaskExecutor.dropFunction(udfName); + return configTaskExecutor.dropFunction(model, udfName); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowFunctionsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowFunctionsTask.java index d9d3a6d32f39..a0a8506d0b60 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowFunctionsTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowFunctionsTask.java @@ -19,10 +19,13 @@ package org.apache.iotdb.db.queryengine.plan.execution.config.metadata; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.udf.UDFInformation; +import org.apache.iotdb.commons.udf.UDFType; import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction; import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; -import org.apache.iotdb.commons.udf.service.UDFManagementService; +import org.apache.iotdb.commons.udf.builtin.BuiltinTimeSeriesGeneratingFunction; +import org.apache.iotdb.commons.udf.utils.TreeUDFUtils; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; @@ -42,11 +45,14 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; +import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_STATE_AVAILABLE; +import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_STATE_UNAVAILABLE; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_SCALAR; -import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF; import static org.apache.iotdb.commons.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDTF; @@ -55,9 +61,29 @@ public class ShowFunctionsTask implements IConfigTask { + private static final Map BINARY_MAP = new HashMap<>(); + + static { + BINARY_MAP.put(FUNCTION_TYPE_NATIVE, BytesUtils.valueOf(FUNCTION_TYPE_NATIVE)); + BINARY_MAP.put(FUNCTION_TYPE_BUILTIN_UDTF, BytesUtils.valueOf(FUNCTION_TYPE_BUILTIN_UDTF)); + BINARY_MAP.put(FUNCTION_TYPE_EXTERNAL_UDTF, BytesUtils.valueOf(FUNCTION_TYPE_EXTERNAL_UDTF)); + BINARY_MAP.put(FUNCTION_TYPE_EXTERNAL_UDAF, BytesUtils.valueOf(FUNCTION_TYPE_EXTERNAL_UDAF)); + BINARY_MAP.put(FUNCTION_TYPE_BUILTIN_SCALAR, BytesUtils.valueOf(FUNCTION_TYPE_BUILTIN_SCALAR)); + BINARY_MAP.put(FUNCTION_TYPE_UNKNOWN, BytesUtils.valueOf(FUNCTION_TYPE_UNKNOWN)); + BINARY_MAP.put(FUNCTION_STATE_AVAILABLE, BytesUtils.valueOf(FUNCTION_STATE_AVAILABLE)); + BINARY_MAP.put(FUNCTION_STATE_UNAVAILABLE, BytesUtils.valueOf(FUNCTION_STATE_UNAVAILABLE)); + BINARY_MAP.put("", BytesUtils.valueOf("")); + } + + private final Model model; + + public ShowFunctionsTask(Model model) { + this.model = model; + } + @Override public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) { - return configTaskExecutor.showFunctions(); + return configTaskExecutor.showFunctions(model); } public static void buildTsBlock( @@ -74,15 +100,13 @@ public static void buildTsBlock( udfInformations.add(udfInformation); } } - // native and built-in functions - udfInformations.addAll( - UDFManagementService.getInstance().getAllBuiltInTimeSeriesGeneratingInformation()); udfInformations.sort(Comparator.comparing(UDFInformation::getFunctionName)); + appendBuiltInTimeSeriesGeneratingFunctions(builder); for (UDFInformation udfInformation : udfInformations) { appendUDFInformation(builder, udfInformation); } - appendNativeFunctions(builder); + appendBuiltInAggregationFunctions(builder); appendBuiltInScalarFunctions(builder); DatasetHeader datasetHeader = DatasetHeaderFactory.getShowFunctionsHeader(); future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader)); @@ -91,50 +115,76 @@ public static void buildTsBlock( private static void appendUDFInformation(TsBlockBuilder builder, UDFInformation udfInformation) { builder.getTimeColumnBuilder().writeLong(0L); builder.getColumnBuilder(0).writeBinary(BytesUtils.valueOf(udfInformation.getFunctionName())); - builder.getColumnBuilder(1).writeBinary(BytesUtils.valueOf(getFunctionType(udfInformation))); + builder.getColumnBuilder(1).writeBinary(getFunctionType(udfInformation)); builder.getColumnBuilder(2).writeBinary(BytesUtils.valueOf(udfInformation.getClassName())); + builder.getColumnBuilder(3).writeBinary(getFunctionState(udfInformation)); + builder.declarePosition(); } - private static void appendNativeFunctions(TsBlockBuilder builder) { - final Binary functionType = BytesUtils.valueOf(FUNCTION_TYPE_NATIVE); - final Binary className = BytesUtils.valueOf(""); + private static void appendBuiltInTimeSeriesGeneratingFunctions(TsBlockBuilder builder) { + final Binary functionType = BINARY_MAP.get(FUNCTION_TYPE_BUILTIN_UDTF); + final Binary functionState = BINARY_MAP.get(FUNCTION_STATE_AVAILABLE); + for (BuiltinTimeSeriesGeneratingFunction function : + BuiltinTimeSeriesGeneratingFunction.values()) { + builder.getTimeColumnBuilder().writeLong(0L); + builder + .getColumnBuilder(0) + .writeBinary(BytesUtils.valueOf(function.getFunctionName().toUpperCase())); + builder.getColumnBuilder(1).writeBinary(functionType); + builder.getColumnBuilder(2).writeBinary(BytesUtils.valueOf(function.getClassName())); + builder.getColumnBuilder(3).writeBinary(functionState); + builder.declarePosition(); + } + } + + private static void appendBuiltInAggregationFunctions(TsBlockBuilder builder) { + final Binary functionType = BINARY_MAP.get(FUNCTION_TYPE_NATIVE); + final Binary functionState = BINARY_MAP.get(FUNCTION_STATE_AVAILABLE); + final Binary className = BINARY_MAP.get(""); for (String functionName : BuiltinAggregationFunction.getNativeFunctionNames()) { builder.getTimeColumnBuilder().writeLong(0L); builder.getColumnBuilder(0).writeBinary(BytesUtils.valueOf(functionName.toUpperCase())); builder.getColumnBuilder(1).writeBinary(functionType); builder.getColumnBuilder(2).writeBinary(className); + builder.getColumnBuilder(3).writeBinary(functionState); builder.declarePosition(); } } private static void appendBuiltInScalarFunctions(TsBlockBuilder builder) { - final Binary functionType = BytesUtils.valueOf(FUNCTION_TYPE_BUILTIN_SCALAR); - final Binary className = BytesUtils.valueOf(""); + final Binary functionType = BINARY_MAP.get(FUNCTION_TYPE_BUILTIN_SCALAR); + final Binary functionState = BINARY_MAP.get(FUNCTION_STATE_AVAILABLE); + final Binary className = BINARY_MAP.get(""); for (String functionName : BuiltinScalarFunction.getNativeFunctionNames()) { builder.getTimeColumnBuilder().writeLong(0L); builder.getColumnBuilder(0).writeBinary(BytesUtils.valueOf(functionName.toUpperCase())); builder.getColumnBuilder(1).writeBinary(functionType); builder.getColumnBuilder(2).writeBinary(className); + builder.getColumnBuilder(3).writeBinary(functionState); builder.declarePosition(); } } - private static String getFunctionType(UDFInformation udfInformation) { - String functionType = FUNCTION_TYPE_UNKNOWN; - if (udfInformation.isBuiltin()) { - if (UDFManagementService.getInstance().isUDTF(udfInformation.getFunctionName())) { - functionType = FUNCTION_TYPE_BUILTIN_UDTF; - } else if (UDFManagementService.getInstance().isUDAF(udfInformation.getFunctionName())) { - functionType = FUNCTION_TYPE_BUILTIN_UDAF; + private static Binary getFunctionType(UDFInformation udfInformation) { + UDFType type = udfInformation.getUdfType(); + if (udfInformation.isAvailable()) { + if (type.isTreeModel()) { + if (TreeUDFUtils.isUDTF(udfInformation.getFunctionName())) { + return BINARY_MAP.get(FUNCTION_TYPE_EXTERNAL_UDTF); + } else if (TreeUDFUtils.isUDAF(udfInformation.getFunctionName())) { + return BINARY_MAP.get(FUNCTION_TYPE_EXTERNAL_UDAF); + } } + } + return BINARY_MAP.get(FUNCTION_TYPE_UNKNOWN); + } + + private static Binary getFunctionState(UDFInformation udfInformation) { + if (udfInformation.isAvailable()) { + return BINARY_MAP.get(FUNCTION_STATE_AVAILABLE); } else { - if (UDFManagementService.getInstance().isUDTF(udfInformation.getFunctionName())) { - functionType = FUNCTION_TYPE_EXTERNAL_UDTF; - } else if (UDFManagementService.getInstance().isUDAF(udfInformation.getFunctionName())) { - functionType = FUNCTION_TYPE_EXTERNAL_UDAF; - } + return BINARY_MAP.get(FUNCTION_STATE_UNAVAILABLE); } - return functionType; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java index 27ff875efd09..a63db1f3ceb6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/expression/multi/FunctionExpression.java @@ -23,7 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction; import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; -import org.apache.iotdb.commons.udf.service.UDFManagementService; +import org.apache.iotdb.commons.udf.utils.TreeUDFUtils; import org.apache.iotdb.db.queryengine.common.NodeRef; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.plan.expression.Expression; @@ -126,7 +126,7 @@ private void initializeFunctionType() { functionType = FunctionType.BUILT_IN_AGGREGATION_FUNCTION; } else if (BuiltinScalarFunction.getNativeFunctionNames().contains(lowerCaseFunctionName)) { functionType = FunctionType.BUILT_IN_SCALAR_FUNCTION; - } else if (UDFManagementService.getInstance().isUDAF(functionName)) { + } else if (TreeUDFUtils.isUDAF(functionName)) { functionType = FunctionType.UDAF; } else { functionType = FunctionType.UDTF; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index 1a39b2377e6a..ea0351b1a2dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -228,6 +228,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.io.BaseEncoding; import org.antlr.v4.runtime.tree.TerminalNode; +import org.apache.commons.lang3.StringUtils; import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.enums.TSDataType; @@ -878,19 +879,21 @@ public Statement visitCreateFunction(CreateFunctionContext ctx) { return new CreateFunctionStatement( parseIdentifier(ctx.udfName.getText()), parseStringLiteral(ctx.className.getText()), - false); + Optional.empty()); } else { String uriString = parseAndValidateURI(ctx.uriClause()); return new CreateFunctionStatement( parseIdentifier(ctx.udfName.getText()), parseStringLiteral(ctx.className.getText()), - true, - uriString); + Optional.of(uriString)); } } private String parseAndValidateURI(IoTDBSqlParser.UriClauseContext ctx) { String uriString = parseStringLiteral(ctx.uri().getText()); + if (StringUtils.isEmpty(uriString)) { + throw new SemanticException("URI is empty, please specify the URI."); + } try { new URI(uriString); } catch (URISyntaxException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CreateFunctionStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CreateFunctionStatement.java index 854a5de9e7d9..c8eabd9a22ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CreateFunctionStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/CreateFunctionStatement.java @@ -32,31 +32,19 @@ import java.util.Collections; import java.util.List; +import java.util.Optional; public class CreateFunctionStatement extends Statement implements IConfigStatement { private final String udfName; private final String className; + private final Optional uriString; - private String uriString; - - private final boolean usingURI; - - public CreateFunctionStatement(String udfName, String className, boolean usingURI) { - super(); - statementType = StatementType.CREATE_FUNCTION; - this.udfName = udfName; - this.className = className; - this.usingURI = usingURI; - } - - public CreateFunctionStatement( - String udfName, String className, boolean usingURI, String uriString) { + public CreateFunctionStatement(String udfName, String className, Optional uriString) { super(); statementType = StatementType.CREATE_FUNCTION; this.udfName = udfName; this.className = className; - this.usingURI = usingURI; this.uriString = uriString; } @@ -68,14 +56,10 @@ public String getClassName() { return className; } - public String getUriString() { + public Optional getUriString() { return uriString; } - public boolean isUsingURI() { - return usingURI; - } - @Override public R accept(StatementVisitor visitor, C context) { return visitor.visitCreateFunction(this, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDAFInformationInferrer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDAFInformationInferrer.java index 2ec587b23fba..eb51bcc348d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDAFInformationInferrer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDAFInformationInferrer.java @@ -65,7 +65,7 @@ private UDAFConfigurations reflectAndGetConfigurations( List childExpressionDataTypes, Map attributes) throws Exception { - UDAF udaf = (UDAF) UDFManagementService.getInstance().reflect(functionName); + UDAF udaf = UDFManagementService.getInstance().reflect(functionName, UDAF.class); UDFParameters parameters = UDFParametersFactory.buildUdfParameters( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFExecutor.java index 854ebee073c6..f93de5c1dc4e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFExecutor.java @@ -92,7 +92,7 @@ private void reflectAndValidateUDF( List childExpressionDataTypes, Map attributes) { - udtf = (UDTF) UDFManagementService.getInstance().reflect(functionName); + udtf = UDFManagementService.getInstance().reflect(functionName, UDTF.class); final UDFParameters parameters = UDFParametersFactory.buildUdfParameters( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFInformationInferrer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFInformationInferrer.java index 717223576eee..01baa8379ba3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFInformationInferrer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFInformationInferrer.java @@ -83,7 +83,7 @@ private UDTFConfigurations reflectAndGetConfigurations( List childExpressionDataTypes, Map attributes) throws Exception { - UDTF udtf = (UDTF) UDFManagementService.getInstance().reflect(functionName); + UDTF udtf = UDFManagementService.getInstance().reflect(functionName, UDTF.class); UDFParameters parameters = UDFParametersFactory.buildUdfParameters( childExpressions, childExpressionDataTypes, attributes); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 6540d21ea93d..fc2100d45d1a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.service; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; @@ -943,7 +944,10 @@ private void prepareUDFResources() throws StartupException { // Create instances of udf and do registration try { for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) { - UDFManagementService.getInstance().doRegister(udfInformation); + if (udfInformation.isAvailable()) { + Model model = UDFManagementService.getInstance().checkAndGetModel(udfInformation); + UDFManagementService.getInstance().doRegister(model, udfInformation); + } } } catch (Exception e) { throw new StartupException(e); @@ -952,8 +956,12 @@ private void prepareUDFResources() throws StartupException { logger.debug("successfully registered all the UDFs, which takes {} ms.", (endTime - startTime)); if (logger.isDebugEnabled()) { for (UDFInformation udfInformation : - UDFManagementService.getInstance().getAllUDFInformation()) { - logger.debug("get udf: {}", udfInformation.getFunctionName()); + UDFManagementService.getInstance().getUDFInformation(Model.TREE)) { + logger.debug("get tree udf: {}", udfInformation.getFunctionName()); + } + for (UDFInformation udfInformation : + UDFManagementService.getInstance().getUDFInformation(Model.TABLE)) { + logger.debug("get table udf: {}", udfInformation.getFunctionName()); } } } @@ -990,7 +998,7 @@ private List getJarListForUDF() { try { // Local jar has conflicts with jar on config node, add current triggerInformation to // list - if (UDFManagementService.getInstance().isLocalJarConflicted(udfInformation)) { + if (UDFExecutableManager.getInstance().isLocalJarConflicted(udfInformation)) { res.add(udfInformation); } } catch (UDFManagementException e) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index 0d454b0d6400..0455038ed149 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java @@ -186,6 +186,8 @@ private IoTDBConstant() {} public static final String FUNCTION_TYPE_EXTERNAL_UDAF = "external UDAF"; public static final String FUNCTION_TYPE_EXTERNAL_UDTF = "external UDTF"; public static final String FUNCTION_TYPE_UNKNOWN = "UNKNOWN"; + public static final String FUNCTION_STATE_AVAILABLE = "AVAILABLE"; + public static final String FUNCTION_STATE_UNAVAILABLE = "UNAVAILABLE"; public static final String COLUMN_TRIGGER_NAME = "trigger name"; public static final String COLUMN_TRIGGER_STATUS = "status"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFInformation.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFInformation.java index b09cc47d7389..0b03fd01db8b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFInformation.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFInformation.java @@ -19,6 +19,8 @@ package org.apache.iotdb.commons.udf; +import org.apache.iotdb.common.rpc.thrift.Model; + import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -32,41 +34,43 @@ public class UDFInformation { private String functionName; private String className; - private boolean isBuiltin; + private UDFType udfType; + // jarName and jarMD5 are null if isUsingURI is false private boolean isUsingURI; - private String jarName; private String jarMD5; private UDFInformation() {} - public UDFInformation(String functionName, String className) { - this.functionName = functionName.toUpperCase(); - this.className = className; - } - - public UDFInformation( - String functionName, String className, boolean isBuiltin, boolean isUsingURI) { - this.functionName = functionName.toUpperCase(); - this.className = className; - this.isBuiltin = isBuiltin; - this.isUsingURI = isUsingURI; - } - public UDFInformation( String functionName, String className, - boolean isBuiltin, + Model model, + boolean available, boolean isUsingURI, String jarName, String jarMD5) { this.functionName = functionName.toUpperCase(); this.className = className; - this.isBuiltin = isBuiltin; this.isUsingURI = isUsingURI; this.jarName = jarName; this.jarMD5 = jarMD5; + if (Model.TREE.equals(model)) { + this.udfType = available ? UDFType.TREE_AVAILABLE : UDFType.TREE_UNAVAILABLE; + } else if (Model.TABLE.equals(model)) { + this.udfType = available ? UDFType.TABLE_AVAILABLE : UDFType.TABLE_UNAVAILABLE; + } else { + throw new IllegalArgumentException("Unknown UDF type: " + model); + } + } + + // Only used for built-in UDF + public UDFInformation(String functionName, String className, UDFType udfType) { + this.functionName = functionName.toUpperCase(); + this.className = className; + this.udfType = udfType; + this.isUsingURI = false; } public String getFunctionName() { @@ -77,8 +81,8 @@ public String getClassName() { return className; } - public boolean isBuiltin() { - return isBuiltin; + public UDFType getUdfType() { + return udfType; } public String getJarName() { @@ -101,8 +105,8 @@ public void setClassName(String className) { this.className = className; } - public void setBuiltin(boolean builtin) { - isBuiltin = builtin; + public void setUdfType(UDFType udfType) { + this.udfType = udfType; } public void setJarName(String jarName) { @@ -117,6 +121,18 @@ public void setUsingURI(boolean usingURI) { isUsingURI = usingURI; } + public void setAvailable(boolean available) { + if (this.udfType.isTreeModel()) { + this.udfType = available ? UDFType.TREE_AVAILABLE : UDFType.TREE_UNAVAILABLE; + } else { + this.udfType = available ? UDFType.TABLE_AVAILABLE : UDFType.TABLE_UNAVAILABLE; + } + } + + public boolean isAvailable() { + return udfType.isAvailable(); + } + public ByteBuffer serialize() throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); @@ -127,7 +143,7 @@ public ByteBuffer serialize() throws IOException { public void serialize(DataOutputStream outputStream) throws IOException { ReadWriteIOUtils.write(functionName, outputStream); ReadWriteIOUtils.write(className, outputStream); - ReadWriteIOUtils.write(isBuiltin, outputStream); + udfType.serialize(outputStream); ReadWriteIOUtils.write(isUsingURI, outputStream); if (isUsingURI) { ReadWriteIOUtils.write(jarName, outputStream); @@ -139,7 +155,7 @@ public static UDFInformation deserialize(ByteBuffer byteBuffer) { UDFInformation udfInformation = new UDFInformation(); udfInformation.setFunctionName(ReadWriteIOUtils.readString(byteBuffer)); udfInformation.setClassName(ReadWriteIOUtils.readString(byteBuffer)); - udfInformation.setBuiltin(ReadWriteIOUtils.readBool(byteBuffer)); + udfInformation.setUdfType(UDFType.deserialize(byteBuffer)); boolean isUsingURI = ReadWriteIOUtils.readBool(byteBuffer); udfInformation.setUsingURI(isUsingURI); if (isUsingURI) { @@ -159,7 +175,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; UDFInformation that = (UDFInformation) o; - return isBuiltin == that.isBuiltin + return udfType.equals(that.udfType) && Objects.equals(functionName, that.functionName) && Objects.equals(className, that.className) && Objects.equals(jarName, that.jarName) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFTable.java index fe03137837a8..73f0705cc1f4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFTable.java @@ -19,7 +19,7 @@ package org.apache.iotdb.commons.udf; -import org.apache.iotdb.commons.udf.builtin.BuiltinTimeSeriesGeneratingFunction; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.udf.service.UDFClassLoader; import org.apache.iotdb.commons.utils.TestOnly; @@ -28,90 +28,96 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +/** + * UDFTable is a table that stores UDF information. On DataNode, it stores all UDF information. On + * ConfigNode, it does not store built-in UDF information. + */ public class UDFTable { - private final Map udfInformationMap; - /** maintain a map for creating instance */ - private final Map> functionToClassMap; + /** model -> functionName -> information * */ + private final Map> udfInformationMap; + + /** maintain a map for creating instance, model -> functionName -> class */ + private final Map>> functionToClassMap; public UDFTable() { udfInformationMap = new ConcurrentHashMap<>(); functionToClassMap = new ConcurrentHashMap<>(); - registerBuiltinTimeSeriesGeneratingFunctions(); - } - - private void registerBuiltinTimeSeriesGeneratingFunctions() { - for (BuiltinTimeSeriesGeneratingFunction builtinTimeSeriesGeneratingFunction : - BuiltinTimeSeriesGeneratingFunction.values()) { - String functionName = builtinTimeSeriesGeneratingFunction.getFunctionName(); - udfInformationMap.put( - functionName, - new UDFInformation( - functionName.toUpperCase(), - builtinTimeSeriesGeneratingFunction.getClassName(), - true, - false)); - functionToClassMap.put( - functionName.toUpperCase(), builtinTimeSeriesGeneratingFunction.getFunctionClass()); - } + udfInformationMap.put(Model.TREE, new ConcurrentHashMap<>()); + udfInformationMap.put(Model.TABLE, new ConcurrentHashMap<>()); + functionToClassMap.put(Model.TREE, new ConcurrentHashMap<>()); + functionToClassMap.put(Model.TABLE, new ConcurrentHashMap<>()); } public void addUDFInformation(String functionName, UDFInformation udfInformation) { - udfInformationMap.put(functionName.toUpperCase(), udfInformation); + if (udfInformation.getUdfType().isTreeModel()) { + udfInformationMap.get(Model.TREE).put(functionName.toUpperCase(), udfInformation); + } else { + udfInformationMap.get(Model.TABLE).put(functionName.toUpperCase(), udfInformation); + } } - public void removeUDFInformation(String functionName) { - udfInformationMap.remove(functionName.toUpperCase()); + public void removeUDFInformation(Model model, String functionName) { + udfInformationMap.get(model).remove(functionName.toUpperCase()); } - public UDFInformation getUDFInformation(String functionName) { - return udfInformationMap.get(functionName.toUpperCase()); + public UDFInformation getUDFInformation(Model model, String functionName) { + return udfInformationMap.get(model).get(functionName.toUpperCase()); } - public void addFunctionAndClass(String functionName, Class clazz) { - functionToClassMap.put(functionName.toUpperCase(), clazz); + public void addFunctionAndClass(Model model, String functionName, Class clazz) { + functionToClassMap.get(model).put(functionName.toUpperCase(), clazz); } - public Class getFunctionClass(String functionName) { - return functionToClassMap.get(functionName.toUpperCase()); + public Class getFunctionClass(Model model, String functionName) { + return functionToClassMap.get(model).get(functionName.toUpperCase()); } - public void removeFunctionClass(String functionName) { - functionToClassMap.remove(functionName.toUpperCase()); + public void removeFunctionClass(Model model, String functionName) { + functionToClassMap.get(model).remove(functionName.toUpperCase()); } public void updateFunctionClass(UDFInformation udfInformation, UDFClassLoader classLoader) throws ClassNotFoundException { Class functionClass = Class.forName(udfInformation.getClassName(), true, classLoader); - functionToClassMap.put(udfInformation.getFunctionName().toUpperCase(), functionClass); + if (udfInformation.getUdfType().isTreeModel()) { + functionToClassMap + .get(Model.TREE) + .put(udfInformation.getFunctionName().toUpperCase(), functionClass); + } else { + functionToClassMap + .get(Model.TABLE) + .put(udfInformation.getFunctionName().toUpperCase(), functionClass); + } } - public UDFInformation[] getAllUDFInformation() { - return udfInformationMap.values().toArray(new UDFInformation[0]); + public List getUDFInformationList(Model model) { + return new ArrayList<>(udfInformationMap.get(model).values()); } - public List getAllNonBuiltInUDFInformation() { + public List getAllInformationList() { return udfInformationMap.values().stream() - .filter(udfInformation -> !udfInformation.isBuiltin()) + .flatMap(map -> map.values().stream()) .collect(Collectors.toList()); } - public boolean containsUDF(String udfName) { - return udfInformationMap.containsKey(udfName); + public boolean containsUDF(Model model, String udfName) { + return udfInformationMap.get(model).containsKey(udfName.toUpperCase()); } @TestOnly - public Map getTable() { + public Map> getTable() { return udfInformationMap; } public void serializeUDFTable(OutputStream outputStream) throws IOException { - List nonBuiltInUDFInformation = getAllNonBuiltInUDFInformation(); + List nonBuiltInUDFInformation = getAllInformationList(); ReadWriteIOUtils.write(nonBuiltInUDFInformation.size(), outputStream); for (UDFInformation udfInformation : nonBuiltInUDFInformation) { ReadWriteIOUtils.write(udfInformation.serialize(), outputStream); @@ -122,18 +128,19 @@ public void deserializeUDFTable(InputStream inputStream) throws IOException { int size = ReadWriteIOUtils.readInt(inputStream); while (size > 0) { UDFInformation udfInformation = UDFInformation.deserialize(inputStream); - udfInformationMap.put(udfInformation.getFunctionName(), udfInformation); + if (udfInformation.getUdfType().isTreeModel()) { + udfInformationMap.get(Model.TREE).put(udfInformation.getFunctionName(), udfInformation); + } else { + udfInformationMap.get(Model.TABLE).put(udfInformation.getFunctionName(), udfInformation); + } size--; } } - // only clear external UDFs public void clear() { - udfInformationMap.forEach( - (K, V) -> { - if (!V.isBuiltin()) { - udfInformationMap.remove(K); - } - }); + udfInformationMap.get(Model.TREE).clear(); + udfInformationMap.get(Model.TABLE).clear(); + functionToClassMap.get(Model.TREE).clear(); + functionToClassMap.get(Model.TABLE).clear(); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFType.java new file mode 100644 index 000000000000..979866519b2f --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/UDFType.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.udf; + +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** UDFType is an enum class that represents the type of UDF. */ +public enum UDFType { + TREE_AVAILABLE((byte) 0), + /** + * TREE_BUILT_IN will not appear in the snapshot file or raft log. It is just a placeholder for + * some unforeseen circumstances. + */ + TREE_BUILT_IN((byte) 1), + TREE_UNAVAILABLE((byte) 2), + TABLE_AVAILABLE((byte) 3), + TABLE_UNAVAILABLE((byte) 4); + + private final byte type; + + UDFType(byte type) { + this.type = type; + } + + public void serialize(DataOutputStream stream) throws IOException { + ReadWriteIOUtils.write(type, stream); + } + + public static UDFType deserialize(ByteBuffer buffer) { + byte type = ReadWriteIOUtils.readByte(buffer); + for (UDFType udfType : UDFType.values()) { + if (udfType.type == type) { + return udfType; + } + } + throw new IllegalArgumentException("Unknown UDFType: " + type); + } + + public boolean isTreeModel() { + return this == TREE_AVAILABLE || this == TREE_BUILT_IN || this == TREE_UNAVAILABLE; + } + + public boolean isTableModel() { + return this == TABLE_AVAILABLE || this == TABLE_UNAVAILABLE; + } + + public boolean isAvailable() { + return this == TREE_AVAILABLE || this == TREE_BUILT_IN || this == TABLE_AVAILABLE; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java index ece0e3104a66..304fdcb040fd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java @@ -31,7 +31,10 @@ import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; /** All built-in UDFs need to register their function names and classes here. */ public enum BuiltinTimeSeriesGeneratingFunction { @@ -96,6 +99,16 @@ public enum BuiltinTimeSeriesGeneratingFunction { private final Class functionClass; private final String className; + private static final Set NATIVE_FUNCTION_NAMES = + new HashSet<>( + Arrays.stream(BuiltinTimeSeriesGeneratingFunction.values()) + .map(BuiltinTimeSeriesGeneratingFunction::getFunctionName) + .collect(Collectors.toList())); + + public static Set getNativeFunctionNames() { + return NATIVE_FUNCTION_NAMES; + } + /** * Set of functions are mappable but DeviceView of them also need special process. Now there is no * function satisfies this. diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java index b5b3cdc8927c..b5b1132ad8a7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFExecutableManager.java @@ -21,12 +21,17 @@ import org.apache.iotdb.commons.executable.ExecutableManager; import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.commons.udf.UDFInformation; +import org.apache.iotdb.udf.api.exception.UDFManagementException; +import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; public class UDFExecutableManager extends ExecutableManager { @@ -53,6 +58,47 @@ public static synchronized UDFExecutableManager setupAndGetInstance( return INSTANCE; } + /** check whether local jar is correct according to md5 */ + public boolean isLocalJarConflicted(UDFInformation udfInformation) throws UDFManagementException { + String functionName = udfInformation.getFunctionName(); + // A jar with the same name exists, we need to check md5 + String existedMd5 = ""; + String md5FilePath = functionName + ".txt"; + + // if meet error when reading md5 from txt, we need to compute it again + boolean hasComputed = false; + if (hasFileUnderTemporaryRoot(md5FilePath)) { + try { + existedMd5 = readTextFromFileUnderTemporaryRoot(md5FilePath); + hasComputed = true; + } catch (IOException e) { + LOGGER.warn("Error occurred when trying to read md5 of {}", md5FilePath); + } + } + if (!hasComputed) { + try { + existedMd5 = + DigestUtils.md5Hex( + Files.newInputStream( + Paths.get( + UDFExecutableManager.getInstance().getInstallDir() + + File.separator + + udfInformation.getJarName()))); + // save the md5 in a txt under UDF temporary lib + saveTextAsFileUnderTemporaryRoot(existedMd5, md5FilePath); + } catch (IOException e) { + String errorMessage = + String.format( + "Failed to registered function %s, " + + "because error occurred when trying to compute md5 of jar file for function %s ", + functionName, functionName); + LOGGER.warn(errorMessage, e); + throw new UDFManagementException(errorMessage); + } + } + return !existedMd5.equals(udfInformation.getJarMD5()); + } + public static UDFExecutableManager getInstance() { return INSTANCE; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java index 689fc0a4d13c..74a0d5e0e7bc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java @@ -19,29 +19,25 @@ package org.apache.iotdb.commons.udf.service; +import org.apache.iotdb.common.rpc.thrift.Model; import org.apache.iotdb.commons.udf.UDFInformation; import org.apache.iotdb.commons.udf.UDFTable; +import org.apache.iotdb.commons.udf.UDFType; import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction; +import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; +import org.apache.iotdb.commons.udf.builtin.BuiltinTimeSeriesGeneratingFunction; import org.apache.iotdb.commons.utils.TestOnly; -import org.apache.iotdb.udf.api.UDAF; import org.apache.iotdb.udf.api.UDF; -import org.apache.iotdb.udf.api.UDTF; import org.apache.iotdb.udf.api.exception.UDFManagementException; +import org.apache.iotdb.udf.api.relational.SQLFunction; -import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.List; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; public class UDFManagementService { @@ -53,6 +49,19 @@ public class UDFManagementService { private UDFManagementService() { lock = new ReentrantLock(); udfTable = new UDFTable(); + // register tree model built-in functions + for (BuiltinTimeSeriesGeneratingFunction builtinTimeSeriesGeneratingFunction : + BuiltinTimeSeriesGeneratingFunction.values()) { + String functionName = builtinTimeSeriesGeneratingFunction.getFunctionName(); + udfTable.addUDFInformation( + functionName, + new UDFInformation( + functionName.toUpperCase(), + builtinTimeSeriesGeneratingFunction.getClassName(), + UDFType.TREE_BUILT_IN)); + udfTable.addFunctionAndClass( + Model.TREE, functionName, builtinTimeSeriesGeneratingFunction.getFunctionClass()); + } } public void acquireLock() { @@ -63,128 +72,83 @@ public void releaseLock() { lock.unlock(); } - /** invoked by config leader for validation before registration */ - public void validate(UDFInformation udfInformation) { + public void register(UDFInformation udfInformation, ByteBuffer jarFile) throws Exception { + Model model = checkAndGetModel(udfInformation); try { acquireLock(); - checkIfRegistered(udfInformation); + checkIfRegistered(model, udfInformation); + saveJarFile(udfInformation.getJarName(), jarFile); + doRegister(model, udfInformation); } finally { releaseLock(); } } - public void register(UDFInformation udfInformation, ByteBuffer jarFile) throws Exception { - try { - acquireLock(); - checkIfRegistered(udfInformation); - saveJarFile(udfInformation.getJarName(), jarFile); - doRegister(udfInformation); - } finally { - releaseLock(); + public Model checkAndGetModel(UDFInformation udfInformation) { + if (!udfInformation.isAvailable()) { + throw new UDFManagementException("UDFInformation is not available"); + } + Model model; + if (udfInformation.getUdfType().isTreeModel()) { + model = Model.TREE; + } else { + model = Model.TABLE; } + return model; } - /** temp code for stand-alone */ - public void register(UDFInformation udfInformation) throws Exception { - try { - acquireLock(); - checkIfRegistered(udfInformation); - doRegister(udfInformation); - } finally { - releaseLock(); + private Class getBaseClass(Model model) { + if (Model.TREE.equals(model)) { + return UDF.class; + } else { + return SQLFunction.class; } } - private void checkIsBuiltInAggregationFunctionName(UDFInformation udfInformation) + public boolean checkIsBuiltInFunctionName(Model model, String functionName) throws UDFManagementException { - String functionName = udfInformation.getFunctionName(); - String className = udfInformation.getClassName(); - if (!BuiltinAggregationFunction.getNativeFunctionNames().contains(functionName.toLowerCase())) { - return; + if (Model.TREE.equals(model)) { + return BuiltinAggregationFunction.getNativeFunctionNames() + .contains(functionName.toLowerCase()) + || BuiltinTimeSeriesGeneratingFunction.getNativeFunctionNames() + .contains(functionName.toUpperCase()) + || BuiltinScalarFunction.getNativeFunctionNames().contains(functionName.toLowerCase()); + } else { + // TODO: Table model UDF + return false; } - - String errorMessage = - String.format( - "Failed to register UDF %s(%s), because the given function name conflicts with the built-in function name", - functionName, className); - - LOGGER.warn(errorMessage); - throw new UDFManagementException(errorMessage); } - private void checkIfRegistered(UDFInformation udfInformation) throws UDFManagementException { - checkIsBuiltInAggregationFunctionName(udfInformation); + private void checkIfRegistered(Model model, UDFInformation udfInformation) + throws UDFManagementException { + if (checkIsBuiltInFunctionName(model, udfInformation.getFunctionName())) { + String errorMessage = + String.format( + "Failed to register UDF %s(%s), because the given function name conflicts with the built-in function name", + udfInformation.getFunctionName(), udfInformation.getClassName()); + + LOGGER.warn(errorMessage); + throw new UDFManagementException(errorMessage); + } String functionName = udfInformation.getFunctionName(); String className = udfInformation.getClassName(); - UDFInformation information = udfTable.getUDFInformation(functionName); + UDFInformation information = udfTable.getUDFInformation(model, functionName); if (information == null) { return; } - if (information.isBuiltin()) { + if (UDFExecutableManager.getInstance().hasFileUnderInstallDir(udfInformation.getJarName()) + && UDFExecutableManager.getInstance().isLocalJarConflicted(udfInformation)) { String errorMessage = String.format( - "Failed to register UDF %s(%s), because the given function name is the same as a built-in UDF function name.", - functionName, className); + "Failed to register function %s(%s), " + + "because existed md5 of jar file for function %s is different from the new jar file. ", + functionName, className, functionName); LOGGER.warn(errorMessage); throw new UDFManagementException(errorMessage); - } else { - if (UDFExecutableManager.getInstance().hasFileUnderInstallDir(udfInformation.getJarName()) - && isLocalJarConflicted(udfInformation)) { - String errorMessage = - String.format( - "Failed to register function %s, " - + "because existed md5 of jar file for function %s is different from the new jar file. ", - functionName, functionName); - LOGGER.warn(errorMessage); - throw new UDFManagementException(errorMessage); - } } } - /** check whether local jar is correct according to md5 */ - public boolean isLocalJarConflicted(UDFInformation udfInformation) throws UDFManagementException { - String functionName = udfInformation.getFunctionName(); - // A jar with the same name exists, we need to check md5 - String existedMd5 = ""; - String md5FilePath = functionName + ".txt"; - - // if meet error when reading md5 from txt, we need to compute it again - boolean hasComputed = false; - if (UDFExecutableManager.getInstance().hasFileUnderTemporaryRoot(md5FilePath)) { - try { - existedMd5 = - UDFExecutableManager.getInstance().readTextFromFileUnderTemporaryRoot(md5FilePath); - hasComputed = true; - } catch (IOException e) { - LOGGER.warn("Error occurred when trying to read md5 of {}", md5FilePath); - } - } - if (!hasComputed) { - try { - existedMd5 = - DigestUtils.md5Hex( - Files.newInputStream( - Paths.get( - UDFExecutableManager.getInstance().getInstallDir() - + File.separator - + udfInformation.getJarName()))); - // save the md5 in a txt under UDF temporary lib - UDFExecutableManager.getInstance() - .saveTextAsFileUnderTemporaryRoot(existedMd5, md5FilePath); - } catch (IOException e) { - String errorMessage = - String.format( - "Failed to registered function %s, " - + "because error occurred when trying to compute md5 of jar file for function %s ", - functionName, functionName); - LOGGER.warn(errorMessage, e); - throw new UDFManagementException(errorMessage); - } - } - return !existedMd5.equals(udfInformation.getJarMD5()); - } - private void saveJarFile(String jarName, ByteBuffer byteBuffer) throws IOException { if (byteBuffer != null) { UDFExecutableManager.getInstance().saveToInstallDir(byteBuffer, jarName); @@ -195,20 +159,20 @@ private void saveJarFile(String jarName, ByteBuffer byteBuffer) throws IOExcepti * Only call this method directly for registering new data node, otherwise you need to call * register(). */ - public void doRegister(UDFInformation udfInformation) throws UDFManagementException { + public void doRegister(Model model, UDFInformation udfInformation) throws UDFManagementException { String functionName = udfInformation.getFunctionName(); String className = udfInformation.getClassName(); try { UDFClassLoader currentActiveClassLoader = UDFClassLoaderManager.getInstance().updateAndGetActiveClassLoader(); - updateAllRegisteredClasses(currentActiveClassLoader); + updateAllRegisteredClasses(model, currentActiveClassLoader); Class functionClass = Class.forName(className, true, currentActiveClassLoader); // ensure that it is a UDF class - UDF udf = (UDF) functionClass.getDeclaredConstructor().newInstance(); + getBaseClass(model).cast(functionClass.getDeclaredConstructor().newInstance()); udfTable.addUDFInformation(functionName, udfInformation); - udfTable.addFunctionAndClass(functionName, functionClass); + udfTable.addFunctionAndClass(model, functionName, functionClass); } catch (IOException | InstantiationException | InvocationTargetException @@ -225,31 +189,23 @@ public void doRegister(UDFInformation udfInformation) throws UDFManagementExcept } } - private void updateAllRegisteredClasses(UDFClassLoader activeClassLoader) + private void updateAllRegisteredClasses(Model model, UDFClassLoader activeClassLoader) throws ClassNotFoundException { - for (UDFInformation information : getAllUDFInformation()) { - if (!information.isBuiltin()) { - udfTable.updateFunctionClass(information, activeClassLoader); - } + for (UDFInformation information : getUDFInformation(model)) { + udfTable.updateFunctionClass(information, activeClassLoader); } } - public void deregister(String functionName, boolean needToDeleteJar) throws Exception { + public void deregister(Model model, String functionName, boolean needToDeleteJar) + throws Exception { try { acquireLock(); - UDFInformation information = udfTable.getUDFInformation(functionName); + UDFInformation information = udfTable.getUDFInformation(Model.TREE, functionName); if (information == null) { return; } - if (information.isBuiltin()) { - String errorMessage = - String.format( - "Built-in function %s can not be deregistered.", functionName.toUpperCase()); - LOGGER.warn(errorMessage); - throw new UDFManagementException(errorMessage); - } - udfTable.removeUDFInformation(functionName); - udfTable.removeFunctionClass(functionName); + udfTable.removeUDFInformation(model, functionName); + udfTable.removeFunctionClass(model, functionName); if (needToDeleteJar) { UDFExecutableManager.getInstance().removeFileUnderLibRoot(information.getJarName()); UDFExecutableManager.getInstance() @@ -260,8 +216,17 @@ public void deregister(String functionName, boolean needToDeleteJar) throws Exce } } - public UDF reflect(String functionName) { - UDFInformation information = udfTable.getUDFInformation(functionName); + public T reflect(String functionName, Class clazz) { + Model model; + if (UDF.class.isAssignableFrom(clazz)) { + model = Model.TREE; + } else if (SQLFunction.class.isAssignableFrom(clazz)) { + model = Model.TABLE; + } else { + throw new UDFManagementException( + "Unsupported UDF class type. Only UDF and SQLFunction are supported."); + } + UDFInformation information = udfTable.getUDFInformation(model, functionName); if (information == null) { String errorMessage = String.format( @@ -272,7 +237,11 @@ public UDF reflect(String functionName) { } try { - return (UDF) udfTable.getFunctionClass(functionName).getDeclaredConstructor().newInstance(); + return clazz.cast( + udfTable + .getFunctionClass(Model.TREE, functionName) + .getDeclaredConstructor() + .newInstance()); } catch (InstantiationException | InvocationTargetException | NoSuchMethodException @@ -286,69 +255,24 @@ public UDF reflect(String functionName) { } } - public UDFInformation[] getAllUDFInformation() { - return udfTable.getAllUDFInformation(); + public UDFInformation[] getUDFInformation(Model model) { + return udfTable.getUDFInformationList(model).toArray(new UDFInformation[0]); } - public List getAllBuiltInTimeSeriesGeneratingInformation() { - return Arrays.stream(getAllUDFInformation()) - .filter(UDFInformation::isBuiltin) - .collect(Collectors.toList()); - } - - public boolean isUDTF(String functionName) { - Class udfClass = udfTable.getFunctionClass(functionName); - UDFInformation information = udfTable.getUDFInformation(functionName); - if (udfClass != null) { + @TestOnly + public void deregisterAll() throws UDFManagementException { + for (UDFInformation information : getUDFInformation(Model.TREE)) { try { - return udfClass.getDeclaredConstructor().newInstance() instanceof UDTF; - } catch (InstantiationException - | InvocationTargetException - | NoSuchMethodException - | IllegalAccessException e) { - String errorMessage = - String.format( - "Failed to reflect UDTF %s(%s) instance, because %s", - functionName, information.getClassName(), e); - LOGGER.warn(errorMessage, e); - throw new RuntimeException(errorMessage); + deregister(Model.TREE, information.getFunctionName(), false); + } catch (Exception e) { + throw new UDFManagementException(e.getMessage()); } - } else { - return false; } - } - - public boolean isUDAF(String functionName) { - Class udfClass = udfTable.getFunctionClass(functionName); - UDFInformation information = udfTable.getUDFInformation(functionName); - if (udfClass != null) { + for (UDFInformation information : getUDFInformation(Model.TABLE)) { try { - return udfClass.getDeclaredConstructor().newInstance() instanceof UDAF; - } catch (InstantiationException - | InvocationTargetException - | NoSuchMethodException - | IllegalAccessException e) { - String errorMessage = - String.format( - "Failed to reflect UDAF %s(%s) instance, because %s", - functionName, information.getClassName(), e); - LOGGER.warn(errorMessage, e); - throw new RuntimeException(errorMessage); - } - } else { - return false; - } - } - - @TestOnly - public void deregisterAll() throws UDFManagementException { - for (UDFInformation information : getAllUDFInformation()) { - if (!information.isBuiltin()) { - try { - deregister(information.getFunctionName(), false); - } catch (Exception e) { - throw new UDFManagementException(e.getMessage()); - } + deregister(Model.TABLE, information.getFunctionName(), false); + } catch (Exception e) { + throw new UDFManagementException(e.getMessage()); } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/TreeUDFUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/TreeUDFUtils.java new file mode 100644 index 000000000000..16e745438b20 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/utils/TreeUDFUtils.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.udf.utils; + +import org.apache.iotdb.commons.udf.service.UDFManagementService; +import org.apache.iotdb.udf.api.UDAF; +import org.apache.iotdb.udf.api.UDTF; + +public class TreeUDFUtils { + public static boolean isUDTF(String functionName) { + try { + UDFManagementService.getInstance().reflect(functionName, UDTF.class); + return true; + } catch (Throwable e) { + return false; + } + } + + public static boolean isUDAF(String functionName) { + try { + UDFManagementService.getInstance().reflect(functionName, UDAF.class); + return true; + } catch (Throwable e) { + return false; + } + } +} diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift index 6944695f7412..6494922890ec 100644 --- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift +++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift @@ -302,3 +302,8 @@ enum TrainingState { FAILED, DROPPING } + +enum Model{ + TREE, + TABLE +} \ No newline at end of file diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 741dbfe52561..7fdf78e71528 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -443,10 +443,16 @@ struct TCreateFunctionReq { 4: optional string jarName 5: optional binary jarFile 6: optional string jarMD5 + 7: optional common.Model model } struct TDropFunctionReq { 1: required string udfName + 2: optional common.Model model +} + +struct TGetUdfTableReq { + 1: required common.Model model } // Get UDF table from config node @@ -1413,7 +1419,7 @@ service IConfigNodeRPCService { /** * Return the UDF table */ - TGetUDFTableResp getUDFTable() + TGetUDFTableResp getUDFTable(TGetUdfTableReq req) /** * Return the UDF jar list of the jar name list diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index f540e8a84326..cfdd634a92f4 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -208,6 +208,7 @@ struct TCreateFunctionInstanceReq { struct TDropFunctionInstanceReq { 1: required string functionName 2: required bool needToDeleteJar + 3: optional common.Model model } struct TCreateTriggerInstanceReq {