SqlServer数据迁移至MySQL集群实践

in 博文 with 2 comment

需求背景

技术选型

数据流程

image.png

技术介绍

SqlServer CDC

变更数据捕获Change Data Capture(简称CDC),SqlServer自带的功能,粒度控制到具体表,开启CDC后源表的INSERT/UPDATE/DELETE操作会记录在特定的变更日志表中。

(官方介绍:https://docs.microsoft.com/zh-cn/sql/relational-databases/track-changes/about-change-data-capture-sql-server?view=sql-server-ver15)

Kafka

Kafka是一个分布式的流处理平台,也是一种高吞吐量的分布式发布订阅消息系统.....(略)

(中文文档:https://kafka.apachecn.org/intro.html)

它有几个核心概念

Kafka Connect

Kafka Connect是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。负责管理 Kafka 和另一个系统之间的数据复制。是一个基于 Kafka 单独的应用服务。

简言之,就是帮助Kafka连接其他系统。这里有两个概念:SourceSink

Source负责导入数据到 Kafka,Sink 负责从 Kafka 导出数据,它们都被称为 Connector。
image.png

我们这次数据迁移就主要用到了 Source 部分负责采集SqlServer CDC的变化数据。

核心概念

KafKa Connect工作模式

Kafka Connect的工作模式分为两种,分别是standalone模式和distributed模式。

所有的工作都在一个独立的Worker进程中完成,比较简单,适合开发环境,但是由于只有一个进程,不具备容错性。

为Kafka Connect提供了可扩展性和自动容错能力。可以启动多个Workers,这些Worker采用相同的group.id,通过自动协调,在多个Workers之间调度执行connectors和tasks。

如果其中一个Worker发生故障,其他Workers将会检测到这种变化进行rebalance,然后重新分配connectors以及tasks。如下图:

image.png

在distributed模式下,每个Work的配置、工作状态、读写位置索引都有专门的三个Topic负责记录:

config.storage.topic:用以保存connector和task的配置信息。

status.storage.topic:用以保存connetor的状态信息。

offset.storage.topic:用以保存connector的位置信息。

创建Kafka Connect

自定义Connector

connect是Kafka提供的一个单独的程序接口定义,具体的实现可以自己根据业务来开发。

引入三方Connector

这里我们选择的就是Debezium,这是一个非常强大三方插件,是对kafka connect的实现,提供了各大数据仓库的SourceConnect,只需下载我们所需的SqlServerConnect

下载并解压后,进入kafka安装目录打开distributed配置文件:

vim config/connect-distributed.properties

再在文件中填写插件所在的解压位置:

plugin.path=/usr/soft/kafka/kafka_2.13_2.7.0/plugins

运行Kafka Connect

环境准备

其中,Zookeeper和Kafka集群已在新申请的测试服务器上部署好(10.160.92.186、10.160.92.250、10.160.92.249)

创建并运行

1、启动connect-distributed

bin/connect-distributed.sh config/connect-distributed.properties

2、通过Rest API创建一个Debezium给我们提供的SqlServerConnector,相关的入参可看官方文档

-POST http://10.160.92.250:8083/connectors

-Body 

{

 "name": "sqlserverCdcRoomAdditions2",

 "config": {

    "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",

    "database.server.name" : "MSSQLSERVER",

    "database.hostname" : "hotelproduct-writer-test.elong.com",

    "database.port" : "1639",

    "database.user" : "bdg_product_write",

    "database.password" : "nopass.2",

    "database.dbname" : "Hotel_product",

    "table.whitelist": "dbo.Room_additions",

    "schemas.enable" : "false",

    "mode":"incrementing",

    "incrementing.column.name": "Id",

    "database.history.kafka.bootstrap.servers" : "10.160.92.186:9092,10.160.92.250:9092,10.160.92.249:9092",

    "database.history.kafka.topic": "hitstory-topic.room_additions",

    "value.converter.schemas.enable":"false",

    "value.converter":"org.apache.kafka.connect.json.JsonConverter",

    "decimal.handling.mode": "string",

    "snapshot.mode": "schema_only"

  }

}

创建好了后会自动帮我们运行这个Connector

3、查看创建好的connetor信息:

http://10.160.92.250:8083/connectors

4、查看创建的Topic(执行创建后会自动生成所需Topic)

image.png

这里可以再用大白话解释下Kafka Connect的几个核心概念:

5、消费测试

5.1、启动一个Consumer脚本

bin/kafka-console-consumer.sh --bootstrap-server 10.160.92.186:9092,10.160.92.250:9092,10.160.92.249:9092 --group default_group --topic MSSQLSERVER.dbo.Room_additions

5.2、在SqlServer端对测试表进行插入

insert into [dbo].[Room_additions] ( [Hotel_id], [Room_type_id], [Room_unique_id], [Addition_id], [Start_date], [End_date], [Addition_price], [Addition_price_currency], [addition_num], [addition_type], [Notes], [Op_date], [Operator], [op_ip]) values ( '40901041', '0004', '0004', '0', '1900-01-01 00:00:00.000', '2050-01-01 00:00:00.000', '0.00', 'RMB', '-1', '0', '', '2011-03-01 13:28:36.253', N'杨雯雯1', '192.168.30.103');

5.3、Consumer收到消息

image.png

至此,SqlServer CDC + Kafka + Kafka Connect进行数据迁移的技术预研至此结束,具备满足需求的可行性。

实战数据迁移

迁移流程图

image

迁移要点

技术预研可行性没问题,现在我们要把SqlServer中的Room_additions表迁移至Mysql分片库,在程序中如何进行平滑的数据迁移要考虑到以下几点:

  1. 接口收口:找到程序中所有对原SqlServer的Room_additions表有过读写操作的数据持久层,对其进行HotSwitch的开关控制新老表的切换(控制读写走SqlServer还是走Mysql)。
  2. Consumer实现:主要负责从Topic获取到数据后对数据进行Mysql的写入操作。
  3. Producer改造:Kafka connect其实就扮演整个发送消息的producer,不是我们自己开发的,但我们需要对其进行改造,将采集到的CDC数据以自定义的分片策略往Topic的不同Partition分片上发送数据。
  4. 如何平滑迁移:启动整套迁移流程后,消费端追增量,要保证幂等性。
  5. 数据验证:迁移完成后的数据验证工作,验证成功方可打开开关切换Mysql的读写。

接口收口

定义开关:RoomAdditionsTransferSwitch,对读写接口进行新老表的切换。

写接口

检索后发现只有在jproduct-api项目中只有1处insert的语句还在用,无delete,无update

对其数据层封装HotSwitch开关,控制写SqlServer还是写Mysql分片库。

读接口

同样,也是加上HotSwitch开关,控制其读SqlServer还是Mysql。

Consumer实现

Producer改造

背景

  1. 数据迁移需保证数据的前后顺序(在SqlServer先后执行了A、B、C语句,那么在Mysql中也应该依次执行A、B、C)。
  2. 数据迁移是从SqlServer的单库表迁移至Mysql的分布式库表,分布式Mysql有一套我们的分片逻辑(基于hotel_id)。
  3. 看迁移流程图,如果不自定义Partitioner,SqlServer中的数据变化同一个hotel_id可能会发送到不同的Partition分片,每个分片又对应一个Consumer,Consumer之间的顺序得不到保障。
  4. 综合1、2、3两点,必须要在Producer端发送消息前对数据进行分片策略。具体的策略是:基于hotel_id取余,这样就能保证相同的hotel_id数据一定落在相同的Partition分片上。

如何平滑迁移

  1. 增量同步。开启CDC + Kafka Connect,保证Topic中有实时SqlServer同步数据发送到Kafka的Topic。
  2. 开启Consumer增量消费,进行增量对比,确保消费进程正常后,暂停消费。
  3. 以2之后的时间点作为结束时间,让DBA从SqlServer全量导出整个Room_additions表的备份文件。
  4. 基于备份文件进行DDA分片逻辑的文件拆分,拆成16个子文件,并拆分验证。
  5. 文件拆分完后给到DBA16个子文件,让DBA将其分别导入16个Mysql分片库,并全量对比。
  6. 重新开启Consumer消费,开始追Topic中的增量,消费逻辑中需保证幂等性:判断主键id是否存在再进行写入(此时肯定会有部分重复数据)。
  7. 业务读接口HotSwitch开关切Mysql读取。
  8. 业务写接口HotSwitch开关切Mysql写入。
  9. 观察Consumer消费状态,等消费数量降低至0后,整个平滑迁移流程完毕。若消费一直未停,说明最初写接口收口没有收干净,仍然存在写入接口未设置开关,整个迁移流程失败(因为消费进来的SqlServer数据,Mysql数据自增ID最终会和消费数据重合)。所以,接口收口环境至关重要。

相关辅助工具

在上述要点中,下划线标注处都是需要开发对应的辅助工具。

  1. 增量对比:详见jproduct-tool-api项目接口:/tools/checkData
  2. 文件拆分:详见jproduct-tool-api项目接口:/tools/splitInvDataNew
  3. 拆分验证:这里没单独写工具,主要靠细心和人工验证分片的方式来处理
  4. 全量对比:和增量对比一样,只是body参数有所不同:/tools/checkData

如何接入迁移流程

抓取端

  1. 启动好kafka connect进程 bin/connect-distributed.sh config/connect-distributed.properties
  2. 让DBA帮忙把需要迁移的表打开CDC功能(CDC粒度控制的具体某张表)
  3. RestAPI创建并运行connector(配置好相关参数,主要是表名)

消费端

消费代码在jproduct-tool-api项目中

  1. 表映射的Entity实现SyncBaseTable.java接口,实现getId()和getHotelId(),以及用于对比的compareFileds()方法。
  2. 创建业务类继承AbstractSyncBusiness.java模板类,实现insert、update、delete、selectSqlServerBatchList、getMysqlOne等方法。
  3. 以上,则完成了消费端的新表迁移迭代开发,启动消费(/tools/syncTable)和增量/全量对比都可运行(/tools/checkData

遇到的坑

  1. SqlServer是money字段类型转化后为被改为其他编码格式。起初迫于无奈对其debezium插件源码进行改造强壮String,后发现官方文档有money转String的配置入口:decimal.handling.mode=string(吃了没文化的亏)
    https://debezium.io/documentation/reference/1.7/connectors/sqlserver.html#sql-server-temporal-values
  2. IDEA版本太低不支持加入JDK11(debezium开发要求至少jdk11,但第1点用配置文件解决掉了,所以没必要了)。
  3. 二次开发Debezium需要升级IDEA,升级后单元测试却又连不上数据库(JDK11的安全级别更高,支持TLS1.0、1.1、1.2)
    https://blog.csdn.net/weixin_53083143/article/details/118521981
  4. debug debezium的convert源码,发现(NULL)问题影响schema(当初表设计不规范导致 NOT NULL DEFAULT NULL)
  5. 改了debezium对decimal-special的源码打包,打包失败
    http://www.kailing.pub/article/index/arcid/267.html
  6. 本地环境出现过kafka connect服务挂过几次(factor不要设置为1,partition也设大,解决)
  7. connect连接不上,报错:io.debezium.DebeziumException: The db history topic or its content is fully or partially missing。(解决:因为新建了连接器配置需要更改database.history.kafka.topic的名称)
    https://stackoverflow.com/questions/63747101/io-debezium-debeziumexception-the-db-history-topic-or-its-content-is-fully-or-p
  8. 自定义partitioner不生效(创建connect的时候添加配置没用,强行在代码中添加配置也不行)原因:KafkaProducer是有很多种(有负责history的,也有负责监听cdc的,要在distribute配置文件里面添加配置才是工作producer)。不过最终解决方案单独开发Partitioner打包到libs目录下即可:/libs/sync_partitioner-1.0.0.jar
  9. producer没搞清楚,把自定义partitioner弄到batabaseHistory的producer中去了,后边又弄到offset/status/config中去了。实际在connector-producer-sqlserverCdcRoomAdditions-0中
  10. SqlServer创建序列库,需要提前开一个开关,但是最后发现并不需要走序列库。cdc采集的数据会包含SqlServer的主键id
  11. kafka集群模式下poll阻塞
    https://www.cnblogs.com/cxhfuujust/p/9066354.html

参考资料