Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-admin] allow create functions with package URL #11666

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,18 @@ public IObjectFactory getObjectFactory() {
private static final String JAR_NAME = CmdFunctionsTest.class.getClassLoader().getResource("dummyexamples.jar").getFile();
private static final String GO_EXEC_FILE_NAME = "test-go-function-with-url";
private static final String PYTHON_FILE_NAME = "test-go-function-with-url";
private static final String URL ="file:" + JAR_NAME;
private static final String URL_WITH_GO ="file:" + GO_EXEC_FILE_NAME;
private static final String URL_WITH_PY ="file:" + PYTHON_FILE_NAME;
private static final String URL = "file:" + JAR_NAME;
private static final String URL_WITH_GO = "file:" + GO_EXEC_FILE_NAME;
private static final String URL_WITH_PY = "file:" + PYTHON_FILE_NAME;
private static final String FN_NAME = TEST_NAME + "-function";
private static final String INPUT_TOPIC_NAME = TEST_NAME + "-input-topic";
private static final String OUTPUT_TOPIC_NAME = TEST_NAME + "-output-topic";
private static final String TENANT = TEST_NAME + "-tenant";
private static final String NAMESPACE = TEST_NAME + "-namespace";
private static final String PACKAGE_URL = "function://sample/ns1/jardummyexamples@1";
private static final String PACKAGE_GO_URL = "function://sample/ns1/godummyexamples@1";
private static final String PACKAGE_PY_URL = "function://sample/ns1/pydummyexamples@1";
private static final String PACKAGE_INVALID_URL = "functionsample.jar";

private PulsarAdmin admin;
private Functions functions;
Expand Down Expand Up @@ -361,6 +365,89 @@ public void testCreatePyFunctionWithFileUrl() throws Exception {
verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
}

@Test
public void testCreateFunctionWithPackageUrl() throws Exception {
cmd.run(new String[] {
"create",
"--name", FN_NAME,
"--inputs", INPUT_TOPIC_NAME,
"--output", OUTPUT_TOPIC_NAME,
"--jar", PACKAGE_URL,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
});

CreateFunction creater = cmd.getCreater();

assertEquals(FN_NAME, creater.getFunctionName());
assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
}

@Test
public void testCreateGoFunctionWithPackageUrl() throws Exception {
cmd.run(new String[] {
"create",
"--name", "test-go-function",
"--inputs", INPUT_TOPIC_NAME,
"--output", OUTPUT_TOPIC_NAME,
"--go", PACKAGE_GO_URL,
"--tenant", "sample",
"--namespace", "ns1",
});

CreateFunction creater = cmd.getCreater();

assertEquals("test-go-function", creater.getFunctionName());
assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
}

@Test
public void testCreatePyFunctionWithPackageUrl() throws Exception {
cmd.run(new String[] {
"create",
"--name", "test-py-function",
"--inputs", INPUT_TOPIC_NAME,
"--output", OUTPUT_TOPIC_NAME,
"--py", PACKAGE_PY_URL,
"--tenant", "sample",
"--namespace", "ns1",
"--className", "process_python_function",
});

CreateFunction creater = cmd.getCreater();

assertEquals("test-py-function", creater.getFunctionName());
assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
}

@Test
public void testCreateFunctionWithInvalidPackageUrl() throws Exception {
cmd.run(new String[] {
"create",
"--name", FN_NAME,
"--inputs", INPUT_TOPIC_NAME,
"--output", OUTPUT_TOPIC_NAME,
"--jar", PACKAGE_INVALID_URL,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
});

CreateFunction creater = cmd.getCreater();

assertEquals(FN_NAME, creater.getFunctionName());
assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
verify(functions, times(0)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
}

@Test
public void testCreateFunctionWithoutBasicArguments() throws Exception {
cmd.run(new String[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import java.util.Arrays;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.packages.management.core.common.PackageType;

/**
* Helper class to work with configuration.
Expand All @@ -34,7 +36,9 @@ public class Utils {

public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) {
return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(HTTP)
|| functionPkgUrl.startsWith(FILE));
|| functionPkgUrl.startsWith(FILE)
|| Arrays.stream(PackageType.values()).anyMatch(type -> functionPkgUrl.startsWith(type.toString()))
freeznet marked this conversation as resolved.
Show resolved Hide resolved
&& functionPkgUrl.contains("://"));
}

public static void inferMissingFunctionName(FunctionConfig functionConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,8 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant
}

private static boolean hasPackageTypePrefix(String destPkgUrl) {
return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString())
&& destPkgUrl.contains("://"));
}

private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public class PackagesCliTest extends TestRetrySupport {
public final void setup() throws Exception {
incrementSetupNumber();
PulsarClusterSpec spec = PulsarClusterSpec.builder()
.clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6)))
.brokerEnvs(getPackagesManagementServiceEnvs())
.build();
.clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6)))
.brokerEnvs(getPackagesManagementServiceEnvs())
.build();
pulsarCluster = PulsarCluster.forSpec(spec);
pulsarCluster.start();
}
Expand Down Expand Up @@ -88,13 +88,13 @@ public void testPackagesOperationsWithoutUploadingPackages() throws Exception {
public void testPackagesOperationsWithUploadingPackages() throws Exception {
String testPackageName = "function://public/default/test@v1";
ContainerExecResult result = runPackagesCommand("upload", "--description", "a test package",
"--path", PulsarCluster.ADMIN_SCRIPT, testPackageName);
"--path", PulsarCluster.ADMIN_SCRIPT, testPackageName);
assertEquals(result.getExitCode(), 0);

BrokerContainer container = pulsarCluster.getBroker(0);
String downloadFile = "tmp-file-" + RandomStringUtils.randomAlphabetic(8);
String[] downloadCmd = new String[]{PulsarCluster.ADMIN_SCRIPT, "packages", "download",
"--path", downloadFile, testPackageName};
"--path", downloadFile, testPackageName};
result = container.execCmd(downloadCmd);
assertEquals(result.getExitCode(), 0);

Expand All @@ -119,7 +119,7 @@ public void testPackagesOperationsWithUploadingPackages() throws Exception {

String contact = "test@apache.org";
result = runPackagesCommand("update-metadata", "--description", "a test package",
"--contact", contact, "-PpropertyA=A", testPackageName);
"--contact", contact, "-PpropertyA=A", testPackageName);
assertEquals(result.getExitCode(), 0);

result = runPackagesCommand("get-metadata", testPackageName);
Expand Down