Skip to content

Commit

Permalink
[pulsar-admin] allow create functions with package URL (#11666)
Browse files Browse the repository at this point in the history
Fix #11665

### Motivation

Allow user to create function with package URL with pulsar-admin.

### Modifications

- allow passing valid package URL from pulsar-admin functions
- added tests
  • Loading branch information
freeznet authored Aug 17, 2021
1 parent fbfbd0e commit de86f4f
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,18 @@ public IObjectFactory getObjectFactory() {
private static final String JAR_NAME = CmdFunctionsTest.class.getClassLoader().getResource("dummyexamples.jar").getFile();
private static final String GO_EXEC_FILE_NAME = "test-go-function-with-url";
private static final String PYTHON_FILE_NAME = "test-go-function-with-url";
private static final String URL ="file:" + JAR_NAME;
private static final String URL_WITH_GO ="file:" + GO_EXEC_FILE_NAME;
private static final String URL_WITH_PY ="file:" + PYTHON_FILE_NAME;
private static final String URL = "file:" + JAR_NAME;
private static final String URL_WITH_GO = "file:" + GO_EXEC_FILE_NAME;
private static final String URL_WITH_PY = "file:" + PYTHON_FILE_NAME;
private static final String FN_NAME = TEST_NAME + "-function";
private static final String INPUT_TOPIC_NAME = TEST_NAME + "-input-topic";
private static final String OUTPUT_TOPIC_NAME = TEST_NAME + "-output-topic";
private static final String TENANT = TEST_NAME + "-tenant";
private static final String NAMESPACE = TEST_NAME + "-namespace";
private static final String PACKAGE_URL = "function://sample/ns1/jardummyexamples@1";
private static final String PACKAGE_GO_URL = "function://sample/ns1/godummyexamples@1";
private static final String PACKAGE_PY_URL = "function://sample/ns1/pydummyexamples@1";
private static final String PACKAGE_INVALID_URL = "functionsample.jar";

private PulsarAdmin admin;
private Functions functions;
Expand Down Expand Up @@ -361,6 +365,89 @@ public void testCreatePyFunctionWithFileUrl() throws Exception {
verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
}

@Test
public void testCreateFunctionWithPackageUrl() throws Exception {
cmd.run(new String[] {
"create",
"--name", FN_NAME,
"--inputs", INPUT_TOPIC_NAME,
"--output", OUTPUT_TOPIC_NAME,
"--jar", PACKAGE_URL,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
});

CreateFunction creater = cmd.getCreater();

assertEquals(FN_NAME, creater.getFunctionName());
assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
}

@Test
public void testCreateGoFunctionWithPackageUrl() throws Exception {
cmd.run(new String[] {
"create",
"--name", "test-go-function",
"--inputs", INPUT_TOPIC_NAME,
"--output", OUTPUT_TOPIC_NAME,
"--go", PACKAGE_GO_URL,
"--tenant", "sample",
"--namespace", "ns1",
});

CreateFunction creater = cmd.getCreater();

assertEquals("test-go-function", creater.getFunctionName());
assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
}

@Test
public void testCreatePyFunctionWithPackageUrl() throws Exception {
cmd.run(new String[] {
"create",
"--name", "test-py-function",
"--inputs", INPUT_TOPIC_NAME,
"--output", OUTPUT_TOPIC_NAME,
"--py", PACKAGE_PY_URL,
"--tenant", "sample",
"--namespace", "ns1",
"--className", "process_python_function",
});

CreateFunction creater = cmd.getCreater();

assertEquals("test-py-function", creater.getFunctionName());
assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
verify(functions, times(1)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
}

@Test
public void testCreateFunctionWithInvalidPackageUrl() throws Exception {
cmd.run(new String[] {
"create",
"--name", FN_NAME,
"--inputs", INPUT_TOPIC_NAME,
"--output", OUTPUT_TOPIC_NAME,
"--jar", PACKAGE_INVALID_URL,
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
});

CreateFunction creater = cmd.getCreater();

assertEquals(FN_NAME, creater.getFunctionName());
assertEquals(INPUT_TOPIC_NAME, creater.getInputs());
assertEquals(OUTPUT_TOPIC_NAME, creater.getOutput());
verify(functions, times(0)).createFunctionWithUrl(any(FunctionConfig.class), anyString());
}

@Test
public void testCreateFunctionWithoutBasicArguments() throws Exception {
cmd.run(new String[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import java.util.Arrays;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.packages.management.core.common.PackageType;

/**
* Helper class to work with configuration.
Expand All @@ -34,7 +36,13 @@ public class Utils {

public static boolean isFunctionPackageUrlSupported(String functionPkgUrl) {
return isNotBlank(functionPkgUrl) && (functionPkgUrl.startsWith(HTTP)
|| functionPkgUrl.startsWith(FILE));
|| functionPkgUrl.startsWith(FILE)
|| hasPackageTypePrefix(functionPkgUrl));
}

public static boolean hasPackageTypePrefix(String destPkgUrl) {
return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString())
&& destPkgUrl.contains("://"));
}

public static void inferMissingFunctionName(FunctionConfig functionConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void registerFunction(final String tenant,
// validate parameters
try {
if (isPkgUrlProvided) {
if (hasPackageTypePrefix(functionPkgUrl)) {
if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
componentPackageFile = downloadPackageFile(functionPkgUrl);
} else {
if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
Expand Down Expand Up @@ -323,7 +323,7 @@ public void updateFunction(final String tenant,
// validate parameters
try {
if (isNotBlank(functionPkgUrl)) {
if (hasPackageTypePrefix(functionPkgUrl)) {
if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
componentPackageFile = downloadPackageFile(functionName);
} else {
try {
Expand Down Expand Up @@ -759,10 +759,6 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant

}

private static boolean hasPackageTypePrefix(String destPkgUrl) {
return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
}

private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
return downloadPackageFile(worker(), packageName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void registerSink(final String tenant,
// validate parameters
try {
if (isPkgUrlProvided) {
if (hasPackageTypePrefix(sinkPkgUrl)) {
if (Utils.hasPackageTypePrefix(sinkPkgUrl)) {
componentPackageFile = downloadPackageFile(sinkPkgUrl);
} else {
if (!Utils.isFunctionPackageUrlSupported(sinkPkgUrl)) {
Expand Down Expand Up @@ -323,7 +323,7 @@ public void updateSink(final String tenant,
// validate parameters
try {
if (isNotBlank(sinkPkgUrl)) {
if (hasPackageTypePrefix(sinkPkgUrl)) {
if (Utils.hasPackageTypePrefix(sinkPkgUrl)) {
componentPackageFile = downloadPackageFile(sinkPkgUrl);
} else {
try {
Expand Down Expand Up @@ -749,10 +749,6 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant
return SinkConfigUtils.convert(sinkConfig, sinkDetails);
}

private static boolean hasPackageTypePrefix(String destPkgUrl) {
return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
}

private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
return FunctionsImpl.downloadPackageFile(worker(), packageName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void registerSource(final String tenant,
// validate parameters
try {
if (isPkgUrlProvided) {
if (hasPackageTypePrefix(sourcePkgUrl)) {
if (Utils.hasPackageTypePrefix(sourcePkgUrl)) {
componentPackageFile = downloadPackageFile(sourcePkgUrl);
} else {
if (!Utils.isFunctionPackageUrlSupported(sourcePkgUrl)) {
Expand Down Expand Up @@ -321,7 +321,7 @@ public void updateSource(final String tenant,
// validate parameters
try {
if (isNotBlank(sourcePkgUrl)) {
if (hasPackageTypePrefix(sourcePkgUrl)) {
if (Utils.hasPackageTypePrefix(sourcePkgUrl)) {
componentPackageFile = downloadPackageFile(sourcePkgUrl);
} else {
try {
Expand Down Expand Up @@ -746,10 +746,6 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant
return SourceConfigUtils.convert(sourceConfig, sourceDetails);
}

private static boolean hasPackageTypePrefix(String destPkgUrl) {
return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
}

private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
return FunctionsImpl.downloadPackageFile(worker(), packageName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public class PackagesCliTest extends TestRetrySupport {
public final void setup() throws Exception {
incrementSetupNumber();
PulsarClusterSpec spec = PulsarClusterSpec.builder()
.clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6)))
.brokerEnvs(getPackagesManagementServiceEnvs())
.build();
.clusterName(String.format("%s-%s", clusterNamePrefix, RandomStringUtils.randomAlphabetic(6)))
.brokerEnvs(getPackagesManagementServiceEnvs())
.build();
pulsarCluster = PulsarCluster.forSpec(spec);
pulsarCluster.start();
}
Expand Down Expand Up @@ -88,13 +88,13 @@ public void testPackagesOperationsWithoutUploadingPackages() throws Exception {
public void testPackagesOperationsWithUploadingPackages() throws Exception {
String testPackageName = "function://public/default/test@v1";
ContainerExecResult result = runPackagesCommand("upload", "--description", "a test package",
"--path", PulsarCluster.ADMIN_SCRIPT, testPackageName);
"--path", PulsarCluster.ADMIN_SCRIPT, testPackageName);
assertEquals(result.getExitCode(), 0);

BrokerContainer container = pulsarCluster.getBroker(0);
String downloadFile = "tmp-file-" + RandomStringUtils.randomAlphabetic(8);
String[] downloadCmd = new String[]{PulsarCluster.ADMIN_SCRIPT, "packages", "download",
"--path", downloadFile, testPackageName};
"--path", downloadFile, testPackageName};
result = container.execCmd(downloadCmd);
assertEquals(result.getExitCode(), 0);

Expand All @@ -119,7 +119,7 @@ public void testPackagesOperationsWithUploadingPackages() throws Exception {

String contact = "test@apache.org";
result = runPackagesCommand("update-metadata", "--description", "a test package",
"--contact", contact, "-PpropertyA=A", testPackageName);
"--contact", contact, "-PpropertyA=A", testPackageName);
assertEquals(result.getExitCode(), 0);

result = runPackagesCommand("get-metadata", testPackageName);
Expand Down

0 comments on commit de86f4f

Please sign in to comment.