Canal + RabbitMQ 数据同步实战:从填坑到打通全链路 Canal RabbitMQ 数据同步实战从填坑到打通全链路一次完整的 MySQL 实时数据同步实践记录从零搭建、版本踩坑到成功打通 Canal RabbitMQ 全链路的完整过程一、背景与目标在微服务架构中经常需要将 MySQL 的数据变更实时同步到其他系统如缓存、搜索引擎、数据仓库等。Canal 作为阿里巴巴开源的 MySQL Binlog 解析组件是实现这一目标的常用工具。本文目标是在macOS本地完成 Canal 的安装与配置将 Canal 捕获的 MySQL 变更数据投递到RabbitMQ为下游消费者提供标准化的 JSON 数据环境信息操作系统macOS (Apple Silicon)MySQL8.0.45官方.dmg安装Canal1.1.7TCP 模式RabbitMQ4.2.5JavaJDK 17Spring Boot3.3.4二、MySQL 准备开启 Binlog2.1 创建配置文件Canal 基于 MySQL 的主从复制协议工作因此必须开启 binlog。编辑/etc/my.cnf[mysqld] server-id 1 log-bin mysql-bin binlog-format ROW2.2 重启 MySQL由于是官方.dmg安装通过系统设置中的 MySQL 面板重启即可。2.3 验证 Binlog 是否开启SHOWVARIABLESLIKElog_bin;SHOWVARIABLESLIKEbinlog_format;SHOWVARIABLESLIKEserver_id;2.4 创建 Canal 专用账号CREATEUSERcanal%IDENTIFIEDBYcanal;GRANTSELECT,REPLICATIONSLAVE,REPLICATIONCLIENTON*.*TOcanal%;FLUSHPRIVILEGES;三、Canal 安装与版本踩坑3.1 安装 Canal从 GitHub Releases 下载canal.deployer-1.1.7.tar.gztar-zxvfcanal.deployer-1.1.7.tar.gz-C~/canal-1.1.7cd~/canal-1.1.73.2 配置 Canalconf/canal.properties# 工作模式先使用 TCP 模式验证核心功能 canal.serverMode tcpconf/example/instance.properties# MySQL 连接信息 canal.instance.master.address 127.0.0.1:3306 canal.instance.dbUsername canal canal.instance.dbPassword canal # 监听所有表 canal.instance.filter.regex .*\\..*3.3 启动 Canalshbin/startup.shtail-flogs/example/example.log3.4 ⚠️ 版本踩坑RabbitMQ 直连模式 Bug在尝试使用 Canal 的 RabbitMQ 直连模式时遇到了反复出现的错误PRECONDITION_FAILED - unknown exchange type 1这个问题在Canal 1.1.7和1.1.8版本中都存在。无论如何在canal.properties中配置rabbitmq.exchange.type directCanal 内部都固执地使用了无效的整数值1或2导致连接失败。结论Canal 1.1.x 系列的 RabbitMQ 直连模式存在无法绕过的缺陷。四、解决方案TCP 模式 Java 客户端中转4.1 架构调整放弃 Canal 的 RabbitMQ 直连模式改用TCP 模式 Spring Boot 客户端方案MySQL 变更 → Canal (TCP 模式) → Spring Boot 客户端 → RabbitMQ这种方案的优势Canal 的 TCP 模式是其最成熟、最稳定的工作方式客户端可以对数据进行精细化处理完全绕开 Canal 的 MQ 连接 Bug获得对数据流的绝对控制权4.2 客户端项目结构Canal-Java-Client/ ├── pom.xml └── src/main/java/com/dp/canaljavaclient/ ├── CanalJavaClientApplication.java ├── config/ │ ├── CanalConfig.java │ └── RabbitMQConfig.java ├── service/ │ └── CanalConsumerService.java └── resources/ └── application.yml4.3 关键依赖parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.3.4/version/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.7/version/dependencydependencygroupIdcom.alibaba.fastjson2/groupIdartifactIdfastjson2/artifactIdversion2.0.53/version/dependency/dependencies4.4 核心服务实现Canal 配置类DataComponentConfigurationProperties(prefixcanal)publicclassCanalConfig{privateStringhost127.0.0.1;privateintport11111;privateStringdestinationexample;privateStringusername;privateStringpassword;privateintbatchSize100;privateStringfilterRegex.*\\..*;privatelongidleSleepMs1000;}Canal 消费者服务核心Slf4jServiceRequiredArgsConstructorpublicclassCanalConsumerService{privatefinalCanalConfigcanalConfig;privatefinalRabbitTemplaterabbitTemplate;PostConstructpublicvoidinit(){startConsumer();}privatevoidstartConsumer(){ThreadconsumerThreadnewThread(()-{try{CanalConnectorconnectorCanalConnectors.newSingleConnector(newInetSocketAddress(canalConfig.getHost(),canalConfig.getPort()),canalConfig.getDestination(),canalConfig.getUsername(),canalConfig.getPassword());connector.connect();connector.subscribe(canalConfig.getFilterRegex());connector.rollback();while(running){Messagemessageconnector.getWithoutAck(canalConfig.getBatchSize());if(message.getId()-1){Thread.sleep(canalConfig.getIdleSleepMs());continue;}handleMessage(message);connector.ack(message.getId());}}catch(Exceptione){log.error(Canal 消费者异常,e);}});consumerThread.setDaemon(true);consumerThread.start();}privatevoidhandleMessage(Messagemessage){for(CanalEntry.Entryentry:message.getEntries()){if(entry.getEntryType()!CanalEntry.EntryType.ROWDATA)continue;CanalEntry.RowChangerowChangeCanalEntry.RowChange.parseFrom(entry.getStoreValue());if(rowChange.getIsDdl())continue;StringtableNameentry.getHeader().getTableName();StringschemaNameentry.getHeader().getSchemaName();StringeventTyperowChange.getEventType().toString();// 提取列数据避免序列化整个 RowChange 对象ListMapString,ObjectrowDataListnewArrayList();for(CanalEntry.RowDatarowData:rowChange.getRowDatasList()){MapString,ObjectrowMapnewLinkedHashMap();for(CanalEntry.Columncolumn:rowData.getAfterColumnsList()){rowMap.put(column.getName(),column.getValue());}rowDataList.add(rowMap);}// 构造简洁的消息体MapString,ObjectsimpleMessagenewLinkedHashMap();simpleMessage.put(schema,schemaName);simpleMessage.put(table,tableName);simpleMessage.put(event,eventType);simpleMessage.put(timestamp,System.currentTimeMillis());simpleMessage.put(data,rowDataList);StringjsonMessageJSON.toJSONString(simpleMessage);rabbitTemplate.convertAndSend(canal.exchange,canal.routingkey,jsonMessage);log.info(✅ 已转发: {}.{} - {}, 数据行数: {},schemaName,tableName,eventType,rowDataList.size());}}}RabbitMQ 配置ConfigurationpublicclassRabbitMQConfig{publicstaticfinalStringEXCHANGE_NAMEcanal.exchange;publicstaticfinalStringQUEUE_NAMEcanal.queue;publicstaticfinalStringROUTING_KEYcanal.routingkey;BeanpublicTopicExchangecanalExchange(){returnnewTopicExchange(EXCHANGE_NAME,true,false);}BeanpublicQueuecanalQueue(){returnnewQueue(QUEUE_NAME,true);}BeanpublicBindingcanalBinding(){returnBindingBuilder.bind(canalQueue()).to(canalExchange()).with(ROUTING_KEY);}}4.5 ⚠️ 踩坑Fastjson2 序列化错误在处理数据时遇到了level too large: 2048的序列化错误原因是CanalEntry.RowChange对象内部存在深层嵌套结构。解决方案不直接序列化整个RowChange对象而是手动提取需要的afterColumns字段构造简洁的 JSON 消息。4.6 配置文件spring:application:name:Canal-Java-Clientrabbitmq:host:127.0.0.1port:5672username:guestpassword:guestvirtual-host:/Canal:host:127.0.0.1port:11111destination:examplebatchSize:100filterRegex:.*\\..*idleSleepMs:1000logging:level:com.dp.canaljavaclient:DEBUG五、验证与测试5.1 启动流程# 1. 启动 CanalTCP 模式cd~/canal-1.1.7shbin/startup.sh# 2. 启动 Spring Boot 客户端java-jartarget/canal-java-client-0.0.1-SNAPSHOT.jar# 3. 在 MySQL 中执行变更INSERT INTO feng.user(name, age, email)VALUES(test,25,testexample.com);5.2 查看结果客户端日志✅ 已转发: feng.user - INSERT, 数据行数: 1RabbitMQ 管理界面http://localhost:15672队列: canal.queue Ready: 1消息内容{schema:feng,table:user,event:INSERT,timestamp:1782295977691,data:[{id:4,name:test,age:25,email:testexample.com}]}六、成功经验总结6.1 核心方法论区分“捷径”与“正道”方案结果教训Canal RabbitMQ 直连模式❌ 失败存在无法绕过的 Bug果断放弃Canal TCP 模式 客户端✅ 成功官方最稳定的工作方式启示当组件的外围功能存在长期 Bug 时果断使用其核心功能 自己编写适配层往往能获得更强的稳定性和控制权。6.2 关键决策先验证核心链路第一时间用 TCP 模式验证 Canal 的 binlog 解析功能是否正常自主构建客户端编写轻量级 Spring Boot 客户端作为 Canal 和 RabbitMQ 之间的桥梁精细化数据处理手动提取字段避免序列化深层嵌套对象使用固定的 Routing Key简化绑定关系确保消息正确路由6.3 最终架构┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ ┌─────────────┐ │ MySQL │────▶│ Canal │────▶│ Spring Boot 3 │────▶│ RabbitMQ │ │ Binlog │ │ TCP 模式 │ │ JDK 17 客户端 │ │ Queue │ │ feng.user │ │ 11111端口 │ │ 拉取 → 转换 → 转发 │ │ Ready: 1 │ └─────────────┘ └─────────────┘ └─────────────────────┘ └─────────────┘ │ ▼ ┌─────────────┐ │ 下游消费者 │ │ (待扩展) │ └─────────────┘七、扩展建议7.1 多表路由// 按表名路由到不同队列if(user.equals(tableName)){rabbitTemplate.convertAndSend(canal.exchange,user.queue,jsonMessage);}elseif(order.equals(tableName)){rabbitTemplate.convertAndSend(canal.exchange,order.queue,jsonMessage);}7.2 下游消费者ComponentSlf4jpublicclassCanalMessageConsumer{RabbitListener(queuescanal.queue)publicvoidhandle(Stringmessage){log.info(收到消息: {},message);// 更新缓存、同步 ES、触发通知...}}测试http://localhost:15672/#/queues/%2F/canal.queue八、结语本文完整记录了从零搭建 Canal RabbitMQ 数据同步管道的过程重点分享了版本踩坑经验Canal 1.1.x 的 RabbitMQ 直连模式存在 bug稳健的替代方案TCP 模式 Spring Boot 客户端实际问题解决序列化异常、消息路由、配置优化这套方案已在本地环境完全验证通过可作为微服务架构中数据同步的基础设施。项目源码地址https://github.com/lvan-jone/canal-java-client.git欢迎交流讨论