Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][fn] Allow unknown fields in connectors config #20116

Merged
merged 6 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,12 @@ validateConnectorConfig: false
# If it is set to true, you must ensure that it has been initialized by "bin/pulsar initialize-cluster-metadata" command.
initializedDlogMetadata: false

# Whether to ignore unknown properties when deserializing the connector configuration.
# After upgrading a connector to a new version with a new configuration, the new configuration may not be compatible with the old connector.
# In case of rollback, it's required to also rollback the connector configuration.
# Ignoring unknown fields makes possible to keep the new configuration and only rollback the connector.
ignoreUnknownConfigFields: false

###########################
# Arbitrary Configuration
###########################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class InstanceConfig {
private boolean exposePulsarAdminClientEnabled = false;
private int metricsPort;
private List<String> additionalJavaRuntimeArguments = Collections.emptyList();
private boolean ignoreUnknownConfigFields;

/**
* Get the string representation of {@link #getInstanceId()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
Expand All @@ -59,6 +62,7 @@
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
Expand Down Expand Up @@ -94,6 +98,7 @@
import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
Expand Down Expand Up @@ -855,10 +860,7 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
if (sourceSpec.getConfigs().isEmpty()) {
this.source.open(new HashMap<>(), contextImpl);
} else {
this.source.open(
ObjectMapperFactory.getMapper().reader().forType(new TypeReference<Map<String, Object>>() {
}).readValue(sourceSpec.getConfigs())
, contextImpl);
this.source.open(parseComponentConfig(sourceSpec.getConfigs()), contextImpl);
}
if (this.source instanceof PulsarSource) {
contextImpl.setInputConsumers(((PulsarSource) this.source).getInputConsumers());
Expand All @@ -870,6 +872,56 @@ private void setupInput(ContextImpl contextImpl) throws Exception {
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
private Map<String, Object> parseComponentConfig(String connectorConfigs) throws IOException {
return parseComponentConfig(connectorConfigs, instanceConfig, componentClassLoader, componentType);
}

static Map<String, Object> parseComponentConfig(String connectorConfigs,
InstanceConfig instanceConfig,
ClassLoader componentClassLoader,
org.apache.pulsar.functions.proto.Function
.FunctionDetails.ComponentType componentType)
throws IOException {
final Map<String, Object> config = ObjectMapperFactory
.getMapper()
.reader()
.forType(new TypeReference<Map<String, Object>>() {})
.readValue(connectorConfigs);
if (instanceConfig.isIgnoreUnknownConfigFields() && componentClassLoader instanceof NarClassLoader) {
final String configClass;
if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
configClass = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSourceConfigClass();
} else if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
configClass = ConnectorUtils
.getConnectorDefinition((NarClassLoader) componentClassLoader).getSinkConfigClass();
} else {
return config;
}
if (configClass != null) {
final Object configInstance = Reflections.createInstance(configClass,
Thread.currentThread().getContextClassLoader());
final List<String> allFields =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that this is correct.

ObjectMapper should follow Java Beans conventions and use getter/setters together with public fields.
We should use some ObjectMapper utilities here in order to ensure that we are doing the right thing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've found a way to use the same deserializer used by jackson mapper, so this should cover all the cases when jackson mapper is used by the sink/source

Reflections
.getAllFields(configInstance.getClass())
.stream()
.map(Field::getName)
.collect(Collectors.toList());

for (String s : config.keySet()) {
if (!allFields.contains(s)) {
log.warn("Field '{}' not defined in the {} configuration {}, the field will be ignored",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should be logged as "ERROR", WARNINGs tend to be ignored by alert systems

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

s,
componentType,
configClass);
config.remove(s);
}
}
}
}
nicoloboschi marked this conversation as resolved.
Show resolved Hide resolved
return config;
}


private void setupOutput(ContextImpl contextImpl) throws Exception {

Expand Down Expand Up @@ -940,9 +992,8 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
log.debug("Opening Sink with SinkSpec {} and contextImpl: {} ", sinkSpec,
contextImpl.toString());
}
this.sink.open(ObjectMapperFactory.getMapper().reader().forType(
new TypeReference<Map<String, Object>>() {
}).readValue(sinkSpec.getConfigs()), contextImpl);
final Map<String, Object> config = parseComponentConfig(sinkSpec.getConfigs());
this.sink.open(config, contextImpl);
}
} catch (Exception e) {
log.error("Sink open produced uncaught exception: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,34 @@
*/
package org.apache.pulsar.functions.instance;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Map;

import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SinkSpecOrBuilder;
import org.apache.pulsar.functions.proto.Function.SourceSpecOrBuilder;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.jetbrains.annotations.NotNull;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class JavaInstanceRunnableTest {
Expand Down Expand Up @@ -159,7 +162,7 @@ public void testFunctionResultNull() throws Exception {

@NotNull
private JavaInstanceRunnable getJavaInstanceRunnable(boolean autoAck,
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees processingGuarantees) throws Exception {
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees processingGuarantees) throws Exception {
FunctionDetails functionDetails = FunctionDetails.newBuilder()
.setAutoAck(autoAck)
.setProcessingGuarantees(processingGuarantees).build();
Expand All @@ -184,23 +187,71 @@ public void testStatsManagerNull() throws Exception {

@Test
public void testSinkConfigParsingPreservesOriginalType() throws Exception {
SinkSpecOrBuilder sinkSpec = mock(SinkSpecOrBuilder.class);
when(sinkSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}");
Map<String, Object> parsedConfig =
new ObjectMapper().readValue(sinkSpec.getConfigs(), new TypeReference<Map<String, Object>>() {
});
final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
"{\"ttl\": 9223372036854775807}",
new InstanceConfig(),
null,
FunctionDetails.ComponentType.SINK
);
Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class);
Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE);
}

@Test
public void testSourceConfigParsingPreservesOriginalType() throws Exception {
SourceSpecOrBuilder sourceSpec = mock(SourceSpecOrBuilder.class);
when(sourceSpec.getConfigs()).thenReturn("{\"ttl\": 9223372036854775807}");
Map<String, Object> parsedConfig =
new ObjectMapper().readValue(sourceSpec.getConfigs(), new TypeReference<Map<String, Object>>() {
});
final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
"{\"ttl\": 9223372036854775807}",
new InstanceConfig(),
null,
FunctionDetails.ComponentType.SOURCE
);
Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class);
Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE);
}


public static class ConnectorTestConfig1 {
public String field1;
}

@DataProvider(name = "configIgnoreUnknownFields")
public static Object[][] configIgnoreUnknownFields() {
return new Object[][]{
{false, FunctionDetails.ComponentType.SINK},
{true, FunctionDetails.ComponentType.SINK},
{false, FunctionDetails.ComponentType.SOURCE},
{true, FunctionDetails.ComponentType.SOURCE}
};
}

@Test(dataProvider = "configIgnoreUnknownFields")
public void testSinkConfigIgnoreUnknownFields(boolean ignoreUnknownConfigFields,
FunctionDetails.ComponentType type) throws Exception {
NarClassLoader narClassLoader = mock(NarClassLoader.class);
final ConnectorDefinition connectorDefinition = new ConnectorDefinition();
if (type == FunctionDetails.ComponentType.SINK) {
connectorDefinition.setSinkConfigClass(ConnectorTestConfig1.class.getName());
} else {
connectorDefinition.setSourceConfigClass(ConnectorTestConfig1.class.getName());
}
when(narClassLoader.getServiceDefinition(any())).thenReturn(ObjectMapperFactory
.getMapper().writer().writeValueAsString(connectorDefinition));
final InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);

final Map<String, Object> parsedConfig = JavaInstanceRunnable.parseComponentConfig(
"{\"field1\": \"value\", \"field2\": \"value2\"}",
instanceConfig,
narClassLoader,
type
);
if (ignoreUnknownConfigFields) {
Assert.assertEquals(parsedConfig.size(), 1);
Assert.assertEquals(parsedConfig.get("field1"), "value");
} else {
Assert.assertEquals(parsedConfig.size(), 2);
Assert.assertEquals(parsedConfig.get("field1"), "value");
Assert.assertEquals(parsedConfig.get("field2"), "value2");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ public class JavaInstanceStarter implements AutoCloseable {
+ "exposed to function context, default is disabled.", required = false)
public Boolean exposePulsarAdminClientEnabled = false;

@Parameter(names = "--ignore_unknown_config_fields",
description = "Whether to ignore unknown properties when deserializing the connector configuration.",
required = false)
public Boolean ignoreUnknownConfigFields = false;


private Server server;
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
Expand Down Expand Up @@ -177,6 +183,7 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL
instanceConfig.setClusterName(clusterName);
instanceConfig.setMaxPendingAsyncRequests(maxPendingAsyncRequests);
instanceConfig.setExposePulsarAdminClientEnabled(exposePulsarAdminClientEnabled);
instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);
Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
if (functionDetailsJsonString.charAt(0) == '\'') {
functionDetailsJsonString = functionDetailsJsonString.substring(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,14 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) {
args.add("--metrics_port");
args.add(String.valueOf(instanceConfig.getMetricsPort()));

// only the Java instance supports --pending_async_requests right now.
// params supported only by the Java instance runtime.
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
args.add("--pending_async_requests");
args.add(String.valueOf(instanceConfig.getMaxPendingAsyncRequests()));

if (instanceConfig.isIgnoreUnknownConfigFields()) {
args.add("--ignore_unknown_config_fields");
}
}

// state storage configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,17 @@ public String getFunctionAuthProviderClassName() {
)
private List<String> additionalJavaRuntimeArguments = new ArrayList<>();

@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "Whether to ignore unknown properties when deserializing the connector configuration. "
+ "After upgrading a connector to a new version with a new configuration, "
+ "the new configuration may not be compatible with the old connector. "
+ "In case of rollback, it's required to also rollback the connector configuration. "
+ "Ignoring unknown fields makes possible to keep the new configuration and "
+ "only rollback the connector."
)
private boolean ignoreUnknownConfigFields = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This configuration applies to the instances of the functions/connectors, and not to the function worker (that already ignores unknown fields)

what about 'functionsIgnoreUnknownConfigFields' ? (maybe we can do better, but connectors are actually functions)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that and I agree. But there are other properties that are meant to be used by the java runner like
maxPendingAsyncRequests, exposeAdminClientEnabled, additionalJavaRuntimeArguments so I'd prefer to stick with this convention


public String getFunctionMetadataTopic() {
return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.Fu
if (workerConfig.getAdditionalJavaRuntimeArguments() != null) {
instanceConfig.setAdditionalJavaRuntimeArguments(workerConfig.getAdditionalJavaRuntimeArguments());
}
instanceConfig.setIgnoreUnknownConfigFields(workerConfig.isIgnoreUnknownConfigFields());
return instanceConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
#
name: alluxio
description: Writes data into Alluxio
sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink
sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink
sinkConfigClass: org.apache.pulsar.io.alluxio.sink.AlluxioSinkConfig
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
name: dynamodb
description: DynamoDB connectors
sourceClass: org.apache.pulsar.io.dynamodb.DynamoDBSource
sourceConfigClass: org.apache.pulsar.io.dynamodb.DynamoDBSourceConfig
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
name: jdbc-clickhouse
description: JDBC sink for ClickHouse
sinkClass: org.apache.pulsar.io.jdbc.ClickHouseJdbcAutoSchemaSink
sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
name: jdbc-mariadb
description: JDBC sink for MariaDB
sinkClass: org.apache.pulsar.io.jdbc.MariadbJdbcAutoSchemaSink
sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
name: jdbc-openmldb
description: JDBC sink for OpenMLDB
sinkClass: org.apache.pulsar.io.jdbc.OpenMLDBJdbcAutoSchemaSink
sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
name: jdbc-postgres
description: JDBC sink for PostgreSQL
sinkClass: org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink
sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig