-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
从mysqlcdc同步至kafka时,使用自带引擎,报序列化错误问题,jar包版本更替过,也不太行,求解决
2025-12-05 10:18:36,344 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error.
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:281)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:277)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:390)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:184)
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48)
at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:44)
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:439)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:289)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:316)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:301)
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaInternalProducer.(KafkaInternalProducer.java:46)
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender.getTransactionProducer(KafkaTransactionSender.java:128)
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender.beginTransaction(KafkaTransactionSender.java:62)
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.(KafkaSinkWriter.java:103)
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:56)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:293)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:426)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:423)
at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$runInternal$0(NotifyTaskRestoreOperation.java:107)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419)
at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:365)
... 27 more
... 12 more
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)
... 2 more
SeaTunnel Version
2.3.8 web 1.0.2
SeaTunnel Config
env {
parallelism = 2
job.mode = "STREAMING"
checkpoint = 10000
}
source {
MySQL-CDC {
base-url = "jdbc:mysql://10.1.2.25:33**/***"
username = "***"
password = "***"
table-names = ["c.eda_handle"]
startup.mode = "initial"
catalog {
factory=MySQL
}
format = compatible_debezium_json
debezium = {
# include schema into kafka message
key.converter.schemas.enable = false
value.converter.schemas.enable = false
# include ddl
include.schema.changes = true
# topic prefix
database.server.name = "mysql_cdc_"
}
schema = {
fields = {
topic = string
key = string
value = string
}
}
}
}
sink {
kafka {
topic = "test-topics"
bootstrap.servers = "10.1.2.22:2161"
format = compatible_debezium_json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config{
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
}
}
}
Running Command
./bin/seatunnel.sh --config ./config/mysqlcdc_hdfs_.config.template -e localError Exception
*org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of** org.apache.kafka.common.serialization.Serialize
Zeta or Flink or Spark Version
2.3.8
Java or Scala Version
1.8
Screenshots
need help!!!!!!!!
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct