Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Module Name] class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer #4023

Closed
1 task
chenwei182729 opened this issue Jan 31, 2023 · 6 comments
Labels

Comments

@chenwei182729
Copy link

chenwei182729 commented Jan 31, 2023

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

当使用Seatunnel自已的引擎,sink到kafka时,报如下异常:
可能异常产生原因是: org.apache.kafka.common.serialization.ByteArraySerializer 使用的类加载器为SeatunnelChildFirstClassLoader,org.apache.kafka.common.serialization.Serializer使用的类加载器为AppClassLoader,

Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:365) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender.<init>(KafkaNoTransactionSender.java:42) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:99) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:82) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:196) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$14(SeaTunnelTask.java:323) ~[classes/:?]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_202]
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_202]
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) ~[?:1.8.0_202]
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_202]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_202]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_202]
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[?:1.8.0_202]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[?:1.8.0_202]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_202]
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_202]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:321) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$run$0(NotifyTaskRestoreOperation.java:87) ~[classes/:?]
	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:47) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.run(NotifyTaskRestoreOperation.java:81) ~[classes/:?]
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61) ~[hazelcast-5.1.jar:5.1]
	at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:40) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:230) ~[classes/:?]
	... 10 more

SeaTunnel Version

2.3.1

SeaTunnel Config

env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "BATCH"
  execution.checkpoint.interval = 5000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    result_table_name = "fake"
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
    parallelism = 3
  }
}

transform {
}

sink {
  kafka {
    topic = "quickstart-seatunnel"
    bootstrap.servers="127.0.0.1:9092"
    partition=1
    format=json
    kafka.request.timeout.ms=60000
    kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
    kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
  }
}

Running Command

org.apache.seatunnel.engine.client.SeaTunnelClientTest 类加,使用test方式运行

@Test
    public void testExecuteJob_kafka() {
        Common.setDeployMode(DeployMode.CLIENT);
        String filePath = TestUtils.getResource("/batch_fakesource_to_kafka.conf");
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName("fake_to_kafka");

        JobExecutionEnvironment jobExecutionEnv = CLIENT.createExecutionContext(filePath, jobConfig);

        try {
            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
            CompletableFuture<JobStatus> objectCompletableFuture = CompletableFuture.supplyAsync(() -> {
                return clientJobProxy.waitForJobComplete();
            });

            await().atMost(180000, TimeUnit.MILLISECONDS)
                    .untilAsserted(() -> Assertions.assertTrue(
                            objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));

        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }


### Error Exception

```log
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:365) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender.<init>(KafkaNoTransactionSender.java:42) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:99) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:82) ~[connector-kafka-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT]
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:196) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$14(SeaTunnelTask.java:323) ~[classes/:?]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_202]
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_202]
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) ~[?:1.8.0_202]
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_202]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_202]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_202]
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[?:1.8.0_202]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[?:1.8.0_202]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_202]
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_202]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:321) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$run$0(NotifyTaskRestoreOperation.java:87) ~[classes/:?]
	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:47) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.run(NotifyTaskRestoreOperation.java:81) ~[classes/:?]
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241) ~[hazelcast-5.1.jar:5.1]
	at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61) ~[hazelcast-5.1.jar:5.1]
	at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:40) ~[classes/:?]
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:230) ~[classes/:?]
	... 10 more


### Flink or Spark Version

_No response_

### Java or Scala Version

1.8.0

### Screenshots

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
@hailin0
Copy link
Member

hailin0 commented Jan 31, 2023

@Hisoka-X #3817

@Hisoka-X
Copy link
Member

Hisoka-X commented Feb 1, 2023

Can you use dev branch test again?

@chenwei182729
Copy link
Author

Bug is fixed

@hailin0
Copy link
Member

hailin0 commented Apr 26, 2024

link #6355

@LLyKy
Copy link

LLyKy commented Oct 18, 2024

i got this same error with flink

Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:399)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:430)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:415)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709)
... 18 more

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)

    i need help

@mcagriaktas
Copy link

mcagriaktas commented Jan 25, 2025

Hello, I was getting the error in flink 1.20.0 and kafka 3.8.0, also I'm using flink-kafka-connector 3.4.0:

libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-streaming-scala" % "1.20.0" % "provided",
"org.apache.flink" % "flink-connector-kafka" % "3.4.0-1.20",
"org.apache.flink" %% "flink-scala" % "1.20.0" % "provided",
"com.github.luben" % "zstd-jni" % "1.5.5-5",
"org.apache.kafka" % "kafka-clients" % "3.4.0"
)

I fixed the error in flink's config.yml:

# default is child-first
classloader.resolve-order: parent-first 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants