需求背景
- Room_additions在SqlServer中大概单表7亿数据,计划在尽量不影响业务的情况下平滑迁移至Mysql分布式库。
技术选型
- SqlServer CDC + Kafka + Kafka Connect
数据流程
技术介绍
SqlServer CDC
变更数据捕获Change Data Capture(简称CDC),SqlServer自带的功能,粒度控制到具体表,开启CDC后源表的INSERT/UPDATE/DELETE操作会记录在特定的变更日志表中。
Kafka
Kafka是一个分布式的流处理平台,也是一种高吞吐量的分布式发布订阅消息系统.....(略)
(中文文档:https://kafka.apachecn.org/intro.html)
它有几个核心概念:
- Topic:消息的类别,从物理上划分为一个或多个分片(Partition),可简单理解为消息的存放地址。
- Broker:代理,用来存储消息内容,集群中的每个服务器就是一个Broker。
- Producer:向Kafa发送消息的进程,生产者会根据Topic发布消息。
- Consumer:负责订阅和消费消息的进程,Topic中的一个Partition只能由一个Consumer消费,一个Consumer可订阅Topic中的多个Partition。
- ConsumerGroup:每个group都是独立消费Topic,可理解成每个group都收到相同的消息。
Kafka Connect
Kafka Connect是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。负责管理 Kafka 和另一个系统之间的数据复制。是一个基于 Kafka 单独的应用服务。
简言之,就是帮助Kafka连接其他系统。这里有两个概念:Source 和 Sink。
Source负责导入数据到 Kafka,Sink 负责从 Kafka 导出数据,它们都被称为 Connector。
我们这次数据迁移就主要用到了 Source 部分负责采集SqlServer CDC的变化数据。
核心概念:
-
Connector:决定数据应该从哪里复制过来(SourceConnetor),以及数据应该写入到那里去(SinkConnector),Connector不参与实际工作,它负责管理Task,一个Connector实例是一个逻辑Job,这个逻辑Job就是负责管理Kafka和其他系统之间数据的拷贝。
用户可以通过Rest API来对Connector进行维护,见下图:
-
Task:数据写入kafka和数据从kafka读出的实现。
-
Worker:运行connectors和tasks的进程(一个Worker里可以运行多个Connector实例,每个Connector实例里会把一个Job分成多个Task)。
-
Converter:kafka connect和其他存储系统直接发送或者接受数据之间转换数据。
KafKa Connect工作模式:
Kafka Connect的工作模式分为两种,分别是standalone模式和distributed模式。
- standalone模式:
所有的工作都在一个独立的Worker进程中完成,比较简单,适合开发环境,但是由于只有一个进程,不具备容错性。
- distributed模式:
为Kafka Connect提供了可扩展性和自动容错能力。可以启动多个Workers,这些Worker采用相同的group.id,通过自动协调,在多个Workers之间调度执行connectors和tasks。
如果其中一个Worker发生故障,其他Workers将会检测到这种变化进行rebalance,然后重新分配connectors以及tasks。如下图:
在distributed模式下,每个Work的配置、工作状态、读写位置索引都有专门的三个Topic负责记录:
config.storage.topic:用以保存connector和task的配置信息。
status.storage.topic:用以保存connetor的状态信息。
offset.storage.topic:用以保存connector的位置信息。
创建Kafka Connect
自定义Connector
connect是Kafka提供的一个单独的程序接口定义,具体的实现可以自己根据业务来开发。
-
SourceConnector
-
SinkConnector
引入三方Connector
这里我们选择的就是Debezium,这是一个非常强大三方插件,是对kafka connect的实现,提供了各大数据仓库的SourceConnect,只需下载我们所需的SqlServerConnect。
下载并解压后,进入kafka安装目录打开distributed配置文件:
vim config/connect-distributed.properties
再在文件中填写插件所在的解压位置:
plugin.path=/usr/soft/kafka/kafka_2.13_2.7.0/plugins
运行Kafka Connect
环境准备
- Zookeeper集群(apache-zookeeper-3.6.3)
- Kafka集群(kafka_2.13-2.7.0)
- JDK8(Debezium插件要求至少JDK8+,所以单独申请机器)
- SqlServer开启CDC(测试Hotel_product库Room_additions表已开启)
其中,Zookeeper和Kafka集群已在新申请的测试服务器上部署好(10.160.92.186、10.160.92.250、10.160.92.249)
创建并运行
1、启动connect-distributed
bin/connect-distributed.sh config/connect-distributed.properties
2、通过Rest API创建一个Debezium给我们提供的SqlServerConnector,相关的入参可看官方文档:
-POST http://10.160.92.250:8083/connectors
-Body
{
"name": "sqlserverCdcRoomAdditions2",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"database.server.name" : "MSSQLSERVER",
"database.hostname" : "hotelproduct-writer-test.elong.com",
"database.port" : "1639",
"database.user" : "bdg_product_write",
"database.password" : "nopass.2",
"database.dbname" : "Hotel_product",
"table.whitelist": "dbo.Room_additions",
"schemas.enable" : "false",
"mode":"incrementing",
"incrementing.column.name": "Id",
"database.history.kafka.bootstrap.servers" : "10.160.92.186:9092,10.160.92.250:9092,10.160.92.249:9092",
"database.history.kafka.topic": "hitstory-topic.room_additions",
"value.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"decimal.handling.mode": "string",
"snapshot.mode": "schema_only"
}
}
创建好了后会自动帮我们运行这个Connector
3、查看创建好的connetor信息:
http://10.160.92.250:8083/connectors
4、查看创建的Topic(执行创建后会自动生成所需Topic)
- connect-configs、connect-offsets、connect-status之前说过是给Work用的Topic,Kafka Connect会自己维护这三个Topic。
- hitstory-topic.room_additions是记录room_additions表历史的Topic,这也是Connect维护。
- MESSQLSERVER.dbo.Room_additions则是采集CDC的Topic,这个就是我们消费端需要监听的Topic
这里可以再用大白话解释下Kafka Connect的几个核心概念:
- Connector:想采集一张表就需要抓取该表的CDC数据,则需要创建一个对应的Connector,这里Connector的实现是Debezium提供的SqlServerConnector,定义其巴拉巴拉的采集规则。当然也可以创建一个MysqlConnector等。
- Task:Connector定义好了需要进程去真正的执行采集,这个时候就是Task在工作,一个Connector可以在集群不同机器上创建多个Task同时工作分担压力。
- Worker:一台实实在在的机器,就是一个Worker,这台Worker上可能负责记录着不同Connector信息,也可能运行了多个Task。
- Converter:不用在意,具体的实现人家已经帮我们做过了,比如创建Connector的Body参数中的"value.converter":"org.apache.kafka.connect.json.JsonConverter"帮我们把采集的数据转成json格式,绝大情况我们也都是用的json格式。
5、消费测试
5.1、启动一个Consumer脚本
bin/kafka-console-consumer.sh --bootstrap-server 10.160.92.186:9092,10.160.92.250:9092,10.160.92.249:9092 --group default_group --topic MSSQLSERVER.dbo.Room_additions
5.2、在SqlServer端对测试表进行插入
insert into [dbo].[Room_additions] ( [Hotel_id], [Room_type_id], [Room_unique_id], [Addition_id], [Start_date], [End_date], [Addition_price], [Addition_price_currency], [addition_num], [addition_type], [Notes], [Op_date], [Operator], [op_ip]) values ( '40901041', '0004', '0004', '0', '1900-01-01 00:00:00.000', '2050-01-01 00:00:00.000', '0.00', 'RMB', '-1', '0', '', '2011-03-01 13:28:36.253', N'杨雯雯1', '192.168.30.103');
5.3、Consumer收到消息
至此,SqlServer CDC + Kafka + Kafka Connect进行数据迁移的技术预研至此结束,具备满足需求的可行性。
实战数据迁移
迁移流程图
迁移要点
技术预研可行性没问题,现在我们要把SqlServer中的Room_additions表迁移至Mysql分片库,在程序中如何进行平滑的数据迁移要考虑到以下几点:
- 接口收口:找到程序中所有对原SqlServer的Room_additions表有过读写操作的数据持久层,对其进行HotSwitch的开关控制新老表的切换(控制读写走SqlServer还是走Mysql)。
- Consumer实现:主要负责从Topic获取到数据后对数据进行Mysql的写入操作。
- Producer改造:Kafka connect其实就扮演整个发送消息的producer,不是我们自己开发的,但我们需要对其进行改造,将采集到的CDC数据以自定义的分片策略往Topic的不同Partition分片上发送数据。
- 如何平滑迁移:启动整套迁移流程后,消费端追增量,要保证幂等性。
- 数据验证:迁移完成后的数据验证工作,验证成功方可打开开关切换Mysql的读写。
接口收口
定义开关:RoomAdditionsTransferSwitch,对读写接口进行新老表的切换。
写接口:
检索后发现只有在jproduct-api项目中只有1处insert的语句还在用,无delete,无update
public int insertSroomAdditions(List<SroomAdditionsModel> sroomAdditions)
对其数据层封装HotSwitch开关,控制写SqlServer还是写Mysql分片库。
读接口:
public List<SroomAdditionsModel> findSroomAdditionsModelList(String shotelId, String roomTypeId)
private List<RoomAdditionDefine> findRoomAdditionDefine(Map<String, Object> param
同样,也是加上HotSwitch开关,控制其读SqlServer还是Mysql。
Consumer实现
- 详见jproduct-tool-api项目:SyncRoomAdditionBusiness.java
Producer改造
背景:
- 数据迁移需保证数据的前后顺序(在SqlServer先后执行了A、B、C语句,那么在Mysql中也应该依次执行A、B、C)。
- 数据迁移是从SqlServer的单库表迁移至Mysql的分布式库表,分布式Mysql有一套我们的分片逻辑(基于hotel_id)。
- 看迁移流程图,如果不自定义Partitioner,SqlServer中的数据变化同一个hotel_id可能会发送到不同的Partition分片,每个分片又对应一个Consumer,Consumer之间的顺序得不到保障。
- 综合1、2、3两点,必须要在Producer端发送消息前对数据进行分片策略。具体的策略是:基于hotel_id取余,这样就能保证相同的hotel_id数据一定落在相同的Partition分片上。
如何平滑迁移
- 增量同步。开启CDC + Kafka Connect,保证Topic中有实时SqlServer同步数据发送到Kafka的Topic。
- 开启Consumer增量消费,进行增量对比,确保消费进程正常后,暂停消费。
- 以2之后的时间点作为结束时间,让DBA从SqlServer全量导出整个Room_additions表的备份文件。
- 基于备份文件进行DDA分片逻辑的文件拆分,拆成16个子文件,并拆分验证。
- 文件拆分完后给到DBA16个子文件,让DBA将其分别导入16个Mysql分片库,并全量对比。
- 重新开启Consumer消费,开始追Topic中的增量,消费逻辑中需保证幂等性:判断主键id是否存在再进行写入(此时肯定会有部分重复数据)。
- 业务读接口HotSwitch开关切Mysql读取。
- 业务写接口HotSwitch开关切Mysql写入。
- 观察Consumer消费状态,等消费数量降低至0后,整个平滑迁移流程完毕。若消费一直未停,说明最初写接口收口没有收干净,仍然存在写入接口未设置开关,整个迁移流程失败(因为消费进来的SqlServer数据,Mysql数据自增ID最终会和消费数据重合)。所以,接口收口环境至关重要。
相关辅助工具
在上述要点中,下划线标注处都是需要开发对应的辅助工具。
- 增量对比:详见jproduct-tool-api项目接口:
/tools/checkData
- 文件拆分:详见jproduct-tool-api项目接口:
/tools/splitInvDataNew
- 拆分验证:这里没单独写工具,主要靠细心和人工验证分片的方式来处理
- 全量对比:和增量对比一样,只是body参数有所不同:
/tools/checkData
如何接入迁移流程
抓取端
- 启动好kafka connect进程
bin/connect-distributed.sh config/connect-distributed.properties
- 让DBA帮忙把需要迁移的表打开CDC功能(CDC粒度控制的具体某张表)
- RestAPI创建并运行connector(配置好相关参数,主要是表名)
消费端
消费代码在jproduct-tool-api项目中
- 表映射的Entity实现SyncBaseTable.java接口,实现getId()和getHotelId(),以及用于对比的compareFileds()方法。
- 创建业务类继承AbstractSyncBusiness.java模板类,实现insert、update、delete、selectSqlServerBatchList、getMysqlOne等方法。
- 以上,则完成了消费端的新表迁移迭代开发,启动消费(
/tools/syncTable
)和增量/全量对比都可运行(/tools/checkData
)
遇到的坑
- SqlServer是money字段类型转化后为被改为其他编码格式。起初迫于无奈对其debezium插件源码进行改造强壮String,后发现官方文档有money转String的配置入口:decimal.handling.mode=string(吃了没文化的亏)
https://debezium.io/documentation/reference/1.7/connectors/sqlserver.html#sql-server-temporal-values - IDEA版本太低不支持加入JDK11(debezium开发要求至少jdk11,但第1点用配置文件解决掉了,所以没必要了)。
- 二次开发Debezium需要升级IDEA,升级后单元测试却又连不上数据库(JDK11的安全级别更高,支持TLS1.0、1.1、1.2)
https://blog.csdn.net/weixin_53083143/article/details/118521981 - debug debezium的convert源码,发现(NULL)问题影响schema(当初表设计不规范导致 NOT NULL DEFAULT NULL)
- 改了debezium对decimal-special的源码打包,打包失败
http://www.kailing.pub/article/index/arcid/267.html - 本地环境出现过kafka connect服务挂过几次(factor不要设置为1,partition也设大,解决)
- connect连接不上,报错:io.debezium.DebeziumException: The db history topic or its content is fully or partially missing。(解决:因为新建了连接器配置需要更改database.history.kafka.topic的名称)
https://stackoverflow.com/questions/63747101/io-debezium-debeziumexception-the-db-history-topic-or-its-content-is-fully-or-p - 自定义partitioner不生效(创建connect的时候添加配置没用,强行在代码中添加配置也不行)原因:KafkaProducer是有很多种(有负责history的,也有负责监听cdc的,要在distribute配置文件里面添加配置才是工作producer)。不过最终解决方案单独开发Partitioner打包到libs目录下即可:/libs/sync_partitioner-1.0.0.jar
- producer没搞清楚,把自定义partitioner弄到batabaseHistory的producer中去了,后边又弄到offset/status/config中去了。实际在connector-producer-sqlserverCdcRoomAdditions-0中
- SqlServer创建序列库,需要提前开一个开关,但是最后发现并不需要走序列库。cdc采集的数据会包含SqlServer的主键id
- kafka集群模式下poll阻塞
https://www.cnblogs.com/cxhfuujust/p/9066354.html
参考资料
- https://debezium.io/documentation/reference/1.7/connectors/sqlserver.html
- https://kafka.apachecn.org/documentation.html#connect
- https://docs.microsoft.com/zh-cn/sql/relational-databases/track-changes/about-change-data-capture-sql-server?view=sql-server-ver15
- https://www.cnblogs.com/EminemJK/p/14688907.html
- https://www.modb.pro/db/33683
- https://kafka.apachecn.org/
- https://blog.51cto.com/zero01/2498682
- https://www.lushuiwan.com/1989.html
- https://www.shangmayuan.com/a/f6fc744659434d8b85413815.html
- https://www.cnblogs.com/gnivor/p/5318319.html
- https://github.com/debezium/debezium
- https://www.cnblogs.com/xiongmozhou/p/14991623.html
- https://xiaomingtongxie.gitbooks.io/kafka-tutorial-cn/content/
- http://www.kailing.pub/article/index/arcid/267.html
- https://www.orchome.com/66
- https://docs.confluent.io/platform/current/connect/concepts.html#distributed-workers
- https://blog.csdn.net/sinat_39809957/article/details/121017220
- https://www.h3399.cn/201912/744746.html
- https://blog.csdn.net/hopdog/article/details/102622809
- https://zhuanlan.zhihu.com/p/144870495?from_voters_page=true
- https://blog.csdn.net/u013709332/article/details/76106331
- https://www.modb.pro/db/33683
- https://www.orchome.com/344