Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [connector-rocketmq] When checkpoint.interval is set too small, such as 2000 , it can trigger an NPE #6624

Closed
3 tasks done
YalikWang opened this issue Apr 1, 2024 · 0 comments
Labels

Comments

@YalikWang
Copy link
Contributor

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

When I cosume messages from rocketmq and print them to console,task failed with NPE.

SeaTunnel Version

2.3.4

SeaTunnel Config

env {
  parallelism = 4
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}

source {
 Rocketmq {
     name.srv.addr = "192.168.15.221:9876"
     topics = "TopicTest"
     acl.enabled = true
     access.key = "***"
     secret.key = "***"
     consumer.group = "**"
  }
}

sink {
  Console {
  }
}

Running Command

bin/seatunnel.sh --config config/rocketmq-console.config -e local

Error Exception

2024-04-01 13:05:20,382 ERROR org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation - [localhost]:5801 [seatunnel-866372] [5.1] java.lang.NullPointerException
    at org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMqSourceReader.notifyCheckpointComplete(RocketMqSourceReader.java:247)
    at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.notifyCheckpointComplete(SourceFlowLifeCycle.java:328)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyCheckpointComplete$8(SeaTunnelTask.java:384)
    at org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky(ExceptionUtil.java:133)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyAllAction$13(SeaTunnelTask.java:404)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyAllAction(SeaTunnelTask.java:404)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyCheckpointComplete(SeaTunnelTask.java:384)
    at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
    at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
    at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
    at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
    at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)

Zeta or Flink or Spark Version

No response

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@YalikWang YalikWang added the bug label Apr 1, 2024
YalikWang added a commit to YalikWang/seatunnel that referenced this issue Apr 1, 2024
chaorongzhi pushed a commit to chaorongzhi/seatunnel that referenced this issue Aug 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant