diff --git a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java similarity index 100% rename from pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java rename to pulsar-client-admin-api/src/main/java/org/apache/pulsar/packages/management/core/common/PackageType.java diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index d7f68a4b790f0..3e32961d8040f 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -82,14 +82,18 @@ public IObjectFactory getObjectFactory() { private static final String JAR_NAME = CmdFunctionsTest.class.getClassLoader().getResource("dummyexamples.jar").getFile(); private static final String GO_EXEC_FILE_NAME = "test-go-function-with-url"; private static final String PYTHON_FILE_NAME = "test-go-function-with-url"; - private static final String URL ="file:" + JAR_NAME; - private static final String URL_WITH_GO ="file:" + GO_EXEC_FILE_NAME; - private static final String URL_WITH_PY ="file:" + PYTHON_FILE_NAME; + private static final String URL = "file:" + JAR_NAME; + private static final String URL_WITH_GO = "file:" + GO_EXEC_FILE_NAME; + private static final String URL_WITH_PY = "file:" + PYTHON_FILE_NAME; private static final String FN_NAME = TEST_NAME + "-function"; private static final String INPUT_TOPIC_NAME = TEST_NAME + "-input-topic"; private static final String OUTPUT_TOPIC_NAME = TEST_NAME + "-output-topic"; private static final String TENANT = TEST_NAME + "-tenant"; private static final String NAMESPACE = TEST_NAME + "-namespace"; + private static final String PACKAGE_URL = "function://sample/ns1/jardummyexamples@1"; + private static final String PACKAGE_GO_URL = "function://sample/ns1/godummyexamples@1"; + private static final String PACKAGE_PY_URL = "function://sample/ns1/pydummyexamples@1"; + private static final String PACKAGE_INVALID_URL = "functionsample.jar"; private PulsarAdmin admin; private Functions functions; @@ -361,6 +365,89 @@ public void testCreatePyFunctionWithFileUrl() throws Exception { verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString()); } + @Test + public void testCreateFunctionWithPackageUrl() throws Exception { + cmd.run(new String[] { + "create", + "--name", FN_NAME, + "--inputs", INPUT_TOPIC_NAME, + "--output", OUTPUT_TOPIC_NAME, + "--jar", PACKAGE_URL, + "--tenant", "sample", + "--namespace", "ns1", + "--className", DummyFunction.class.getName(), + }); + + CreateFunction creater = cmd.getCreater(); + + assertEquals(FN_NAME, creater.getFunctionName()); + assertEquals(INPUT_TOPIC_NAME, creater.getInputs()); + assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput()); + verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString()); + } + + @Test + public void testCreateGoFunctionWithPackageUrl() throws Exception { + cmd.run(new String[] { + "create", + "--name", "test-go-function", + "--inputs", INPUT_TOPIC_NAME, + "--output", OUTPUT_TOPIC_NAME, + "--go", PACKAGE_GO_URL, + "--tenant", "sample", + "--namespace", "ns1", + }); + + CreateFunction creater = cmd.getCreater(); + + assertEquals("test-go-function", creater.getFunctionName()); + assertEquals(INPUT_TOPIC_NAME, creater.getInputs()); + assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput()); + verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString()); + } + + @Test + public void testCreatePyFunctionWithPackageUrl() throws Exception { + cmd.run(new String[] { + "create", + "--name", "test-py-function", + "--inputs", INPUT_TOPIC_NAME, + "--output", OUTPUT_TOPIC_NAME, + "--py", PACKAGE_PY_URL, + "--tenant", "sample", + "--namespace", "ns1", + "--className", "process_python_function", + }); + + CreateFunction creater = cmd.getCreater(); + + assertEquals("test-py-function", creater.getFunctionName()); + assertEquals(INPUT_TOPIC_NAME, creater.getInputs()); + assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput()); + verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString()); + } + + @Test + public void testCreateFunctionWithInvalidPackageUrl() throws Exception { + cmd.run(new String[] { + "create", + "--name", FN_NAME, + "--inputs", INPUT_TOPIC_NAME, + "--output", OUTPUT_TOPIC_NAME, + "--jar", PACKAGE_INVALID_URL, + "--tenant", "sample", + "--namespace", "ns1", + "--className", DummyFunction.class.getName(), + }); + + CreateFunction creater = cmd.getCreater(); + + assertEquals(FN_NAME, creater.getFunctionName()); + assertEquals(INPUT_TOPIC_NAME, creater.getInputs()); + assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput()); + verify(functions, times(0)).createFunctionWithUrl(any(FunctionConfig.class), anyString()); + } + @Test public void testCreateFunctionWithoutBasicArguments() throws Exception { cmd.run(new String[] { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java index 6f6a6840aba02..2de38bbe02fbc 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Utils.java @@ -21,8 +21,10 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE; import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; +import java.util.Arrays; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.packages.management.core.common.PackageType; /** * Helper class to work with configuration. @@ -34,7 +36,13 @@ public class Utils { public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) { return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(HTTP) - || functionPkgUrl.startsWith(FILE)); + || functionPkgUrl.startsWith(FILE) + || hasPackageTypePrefix(functionPkgUrl)); + } + + public static boolean hasPackageTypePrefix(String destPkgUrl) { + return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()) + && destPkgUrl.contains("://")); } public static void inferMissingFunctionName(FunctionConfig functionConfig) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 5aa438922db8b..8c3585100cb95 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -154,7 +154,7 @@ public void registerFunction(final String tenant, // validate parameters try { if (isPkgUrlProvided) { - if (hasPackageTypePrefix(functionPkgUrl)) { + if (Utils.hasPackageTypePrefix(functionPkgUrl)) { componentPackageFile = downloadPackageFile(functionPkgUrl); } else { if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) { @@ -323,7 +323,7 @@ public void updateFunction(final String tenant, // validate parameters try { if (isNotBlank(functionPkgUrl)) { - if (hasPackageTypePrefix(functionPkgUrl)) { + if (Utils.hasPackageTypePrefix(functionPkgUrl)) { componentPackageFile = downloadPackageFile(functionName); } else { try { @@ -759,10 +759,6 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant } - private static boolean hasPackageTypePrefix(String destPkgUrl) { - return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString())); - } - private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException { return downloadPackageFile(worker(), packageName); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java index 6bd80d5cc92bc..31a72347de04c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java @@ -153,7 +153,7 @@ public void registerSink(final String tenant, // validate parameters try { if (isPkgUrlProvided) { - if (hasPackageTypePrefix(sinkPkgUrl)) { + if (Utils.hasPackageTypePrefix(sinkPkgUrl)) { componentPackageFile = downloadPackageFile(sinkPkgUrl); } else { if (!Utils.isFunctionPackageUrlSupported(sinkPkgUrl)) { @@ -323,7 +323,7 @@ public void updateSink(final String tenant, // validate parameters try { if (isNotBlank(sinkPkgUrl)) { - if (hasPackageTypePrefix(sinkPkgUrl)) { + if (Utils.hasPackageTypePrefix(sinkPkgUrl)) { componentPackageFile = downloadPackageFile(sinkPkgUrl); } else { try { @@ -749,10 +749,6 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant return SinkConfigUtils.convert(sinkConfig, sinkDetails); } - private static boolean hasPackageTypePrefix(String destPkgUrl) { - return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString())); - } - private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException { return FunctionsImpl.downloadPackageFile(worker(), packageName); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java index 2a0edb187e989..1e9148bd658ae 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java @@ -153,7 +153,7 @@ public void registerSource(final String tenant, // validate parameters try { if (isPkgUrlProvided) { - if (hasPackageTypePrefix(sourcePkgUrl)) { + if (Utils.hasPackageTypePrefix(sourcePkgUrl)) { componentPackageFile = downloadPackageFile(sourcePkgUrl); } else { if (!Utils.isFunctionPackageUrlSupported(sourcePkgUrl)) { @@ -321,7 +321,7 @@ public void updateSource(final String tenant, // validate parameters try { if (isNotBlank(sourcePkgUrl)) { - if (hasPackageTypePrefix(sourcePkgUrl)) { + if (Utils.hasPackageTypePrefix(sourcePkgUrl)) { componentPackageFile = downloadPackageFile(sourcePkgUrl); } else { try { @@ -746,10 +746,6 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant return SourceConfigUtils.convert(sourceConfig, sourceDetails); } - private static boolean hasPackageTypePrefix(String destPkgUrl) { - return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString())); - } - private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException { return FunctionsImpl.downloadPackageFile(worker(), packageName); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java index 2f213a0b50d44..f6e5db7a6c101 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PackagesCliTest.java @@ -45,9 +45,9 @@ public class PackagesCliTest extends TestRetrySupport { public final void setup() throws Exception { incrementSetupNumber(); PulsarClusterSpec spec = PulsarClusterSpec.builder() - .clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6))) - .brokerEnvs(getPackagesManagementServiceEnvs()) - .build(); + .clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6))) + .brokerEnvs(getPackagesManagementServiceEnvs()) + .build(); pulsarCluster = PulsarCluster.forSpec(spec); pulsarCluster.start(); } @@ -88,13 +88,13 @@ public void testPackagesOperationsWithoutUploadingPackages() throws Exception { public void testPackagesOperationsWithUploadingPackages() throws Exception { String testPackageName = "function://public/default/test@v1"; ContainerExecResult result = runPackagesCommand("upload", "--description", "a test package", - "--path", PulsarCluster.ADMIN_SCRIPT, testPackageName); + "--path", PulsarCluster.ADMIN_SCRIPT, testPackageName); assertEquals(result.getExitCode(), 0); BrokerContainer container = pulsarCluster.getBroker(0); String downloadFile = "tmp-file-" + RandomStringUtils.randomAlphabetic(8); String[] downloadCmd = new String[]{PulsarCluster.ADMIN_SCRIPT, "packages", "download", - "--path", downloadFile, testPackageName}; + "--path", downloadFile, testPackageName}; result = container.execCmd(downloadCmd); assertEquals(result.getExitCode(), 0); @@ -119,7 +119,7 @@ public void testPackagesOperationsWithUploadingPackages() throws Exception { String contact = "test@apache.org"; result = runPackagesCommand("update-metadata", "--description", "a test package", - "--contact", contact, "-PpropertyA=A", testPackageName); + "--contact", contact, "-PpropertyA=A", testPackageName); assertEquals(result.getExitCode(), 0); result = runPackagesCommand("get-metadata", testPackageName);