Demo 演示 | 基于 Apache SeaTunnel 实现从 MySQL CDC 到 PostgreSQL 全量数据同步

Demo 演示 | 基于 Apache SeaTunnel 实现从 MySQL CDC 到 PostgreSQL 全量数据同步 本文详细演示了如何通过Apache SeaTunnel 2.3.9实现MySQL CDC ​到PostgreSQL的全量数据同步。话不多说我们开始学习 MySQL 同步到 PostgreSQL 场景版本要求MySQL -- MySQL-8.3PostgreSQL -- PostgreSQL-13.2Apache SeaTunnel -- Apache-SeaTunnel-2.3.9本文涉及到所有的配置文件可关注公众号回复关键词“Demo 01”获取。前置准备确认版本信息-- 查看版本信息 select version();开启主从复制-- 查看配置信息 show variables where variable_name in (log_bin, binlog_format, binlog_row_image, gtid_mode, enforce_gtid_consistency);MySQL CDC 数据同步需要读取 MySQL 的binlog日志。并且把 SeaTunnel 的集群节点作为 MySQL 集群的一个从节点。所以这里需要先确认 MySQL 开启了binlog以及开启了主从复制模式。注MySQL 的 8.0 版本以上binlog是默认开启的但主从复制模式需要手动开启。-- 开启主从复制,需要按顺序执行 -- SET GLOBAL gtid_modeOFF; -- SET GLOBAL enforce_gtid_consistencyOFF; SET GLOBAL gtid_modeOFF_PERMISSIVE; SET GLOBAL gtid_modeON_PERMISSIVE; SET GLOBAL enforce_gtid_consistencyON; SET GLOBAL gtid_modeON;开启用户权限用户权限需要具有复制权限所以这里有两个核心的权限REPLICATION SLAVE和REPLICATION CLIENT授权完成以后完成一下权限的刷新。-- 用户授权 CREATE USER test% IDENTIFIED BY password; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO test ; flush privileges;SeaTunnel 集群集群日志每个作业独立日志文件log4j2.properties首先是我们集群里面默认的配置是把日志输出到一个统一的文件里面去但是在生产上通常情况下对任务的管理都是基于每个独立的任务这里就需要把日志的输出模式改成每个作业输出独立的日志文件这样会比较方便生产过程中去对日志做一个监控以及问题的排查方便我们按照作业做任务的管理。############################ log output to file ############################# # rootLogger.appenderRef.file.ref fileAppender # 把日志的输出模式改成每个作业输出独立的日志文件 rootLogger.appenderRef.file.ref routingAppender ############################ log output to file #############################客户端配置生产上的集群一般情况下都会把安装到根目录的 opt 文件夹下客户端也建议把SEATUNNEL_HOME指向到/opt/seatunnel目录下。注意如果实际安装有多个版本或者并不在这个目录下建议去创建一个软链接尽量与服务器部署目录一致避免出现有一些类找不到。当前版本使用 SeaTunnel 的脚本去提交命令会引用到客户端的一些环境变量包括比如说 class 类的路径这里会使用绝对路径。如果服务端和客户端不一致那么在提交到集群的时候可能会报错。# 创建软链接 ln -s /opt/apache-seatunnel-2.3.9 /opt/seatunnel # 设置环境变量 export SEATUNNEL_HOME/opt/seatunnel环境变量配置如果是 Linus 服务器环境变量客户端推荐按照官方的示例把环境变量文件的配置项写到/etc/profile.d下。echo export SEATUNNEL_HOME/opt/seatunnel /etc/profile.d/seatunnel.sh echo export PATH$SEATUNNEL_HOME/bin:$PATH /etc/profile.d/seatunnel.sh source /etc/profile.d/seatunnel.sh任务配置注这里配置并不是包含全部的配置项只是把生产上常用到的一些配置选项做了一个说明。env { job.mode STREAMING job.name DEMO parallelism 3 checkpoint.interval 30000 # 30s checkpoint.timeout 30000 # 30s job.retry.times 3 job.retry.interval.seconds 3 # 3s }首先是 env 模块因为它是一个流式的所以需要指定它的配置模式是STREAMING。然后是配置任务名实际生产环境上可以按照库或者是按照表名字做一个匿名这样看到任务名就可以知道它所对应的作业便于对任务做管理。另外一个是并行度并行度这里使用的是 3这个并行度的设置可以依据实际的集群的大小以及数据库的这个配置情况做一些调整。其次是检查点的频率这里配置的是 30 秒。如果说有更高的要求可以调整为 10 秒或者更短一点配置检查点的超时时间。假如 Checkpoint 的超过了某一个时间要判定为作业失败这里也配置为 30 秒配置作业的自动重试次数这里一般采用的都是配置为 3 次每次从事的时间间隔配置为 3 秒当然这里也可以时间更长一点。hocon source { MySQL-CDC { base-url jdbc:mysql://192.168.8.101:3306/test?serverTimezoneAsia/Shanghai username test password 123456 database-names [test] # table-names [test.test_001,test.test_002] table-pattern test\\.test_.*# 第一个点为普通字符串需做转义 第二个点表示匹配1个任意字符 table-names-config [ {table:test.test_002,primaryKeys:[id]} ] startup.mode initial# 先全量同步历史再增量 snapshot.split.size 8096 snapshot.fetch.size 1024 server-id 6500-8500 connect.timeout.ms 30000 connect.max-retries 3 connection.pool.size 20 exactly_once false # 数据分析场景关闭精确一致性允许一定重复和缺失 提升性能 schema-changes.enabled true# 模式演变 开启 避免反复修改表字段,但可能影响下游任务。add rename drop } }另外一个核心重点就是 MySQL CDC 本身的配置项。这里需要说明一下最好指定一下 MySQL 连接所使用的时区避免出现datetime或者timestamp数据抽取的时候出现一些时区不对等的情况。其次是username和password这里需要配置具有主从复制从节点权限复制读取binlog日志权限的一个账号能够复制所有库下的所有表的查询权限能够读取到所有表的日志。通常情况下做这个数据库配置的时候都是每个 database 会放到一个任务里面所以在这里指定一下只去读取 database 为 test 对应的表。根据自己的需要做两个配置一个是我们的表名一个是表的正则表达式匹配区如果是首次同步要同步的表比较多或者要同步整个库或者整个库下面的部分表它有一些统一的规则比如说某个前缀或者某个后缀那么推荐使用正则表达式去做这个数据同步。这里说明一下配置正则表达式匹配的时候我们要包含 Database 名字和表名字。而 database 名和表名的中间要有一个点而这个点在正则表达式里面它要表示一个任意字符所以在这里我们要对它进行转义。​转义的方式是指使用两个反斜杠​比如说这里要匹配以 test 为开头的表那么在 test 的下划线的后面有一个点代表的含义是我要匹配任意数量的任意字符。第二个点代表匹配一个字符然后新代表匹配任意数量的这个字符是零到 N 这样的话我们就可以实现 test 库下的 test_为前缀表名。其次是我们对这个表还可以指定一些额外的配置。比如说我们假如说我们这里有张 test_002test_002 它是没有主键列的。但是我们做数据同步的时候可以指定逻辑上的主键方便我们做数据同步。启动模式​其次是我们的启动模式​。启动模式这里也是一个默认值就是一个初始化的操作可以实现先全量再增量。这也是我们生产常见的操作。其次是分片大小以及每批次获取的数量这里使用默认值即可。如果服务器性能比较好或者集群数量比较大也可以做一些调整。比较关键的是这里有一个server-idMySQL 在进行同步的时候SeaTunnel 的集群是要作为他要把自己伪装成 Mysql 的一个重复节点。那么在 MySQL 做主从复制的时候要求每个重节点就集群内的这些节点都要有自己唯一的server-id。在这里如果你不配置它会使用一个默认值而官方是推荐我们去指定的大家可以在官方文档里面找到。这里有一个要求server-id的范围一定要大于并行的数量否则的话会报错的。超时时间​接下来是连接的超时时间​。如果说我们的数据量比较大那么这个超时时间可以设置的久一点。还有自动重试之间连接词的大小如果表数据量比较多连接词也可以适当调大一点。精确一致性像我们可能更多的时候是做这个 CDC 的数据同步它是基于一个分析场景的不会涉及到一些非常关键的业务就是不会有特别大的这种精确一致性的高要求。所以建议是关闭精确一致性。如果确实是有这种强一致性的要求的话是可以开启的。因为如果说你开启的话这个同步的性能就会有些下降的。Schema evolution另外一个是模式演变Schema evolution。模式演变我是建议大家开启这样可以避免我们去比如说源端做一些新增字段或者说删除字段的时候我们可以不用去修改我们这个任务它可以自动跟随我们源端的修改做一些改变。但这里也会带来一个影响就是我们下游任务假如说我依赖的某张表名字发自动发生了变更那么我们下游的这个任务的设备语句可能就会报错了。所以大家根据自己的一个情况做一个平衡。现在这个模式演变Schema evolution根据官方的说明支持 add column、drop column、rename column 和 modify column. 不是所有的 ddl 语句全部支持像创建表、删除表这些是无法识别到的。这确实是一个非常好的功能建议大家开启。sink { jdbc { url jdbc:postgresql://192.168.8.101:5432/test driver org.postgresql.Driver user postgres password 123456 generate_sink_sql true database test table ${database_name}.${table_name} schema_save_mode CREATE_SCHEMA_WHEN_NOT_EXIST data_save_mode APPEND_DATA # enable_upsert false } }另外是 Sink 端Sink 端我们是要把这个数据插入到同步到 PostgreSQL 数据库在这里要配置 PostgreSQL 的点击信息这里除了配置驱动用户名和密码以外我们这里会推荐大家使用生成式 SQL 的功能。开启以后它可以去自动基于源端的表结构去生成,我们同步到 PostgreSQL 里面的这些间表语句插入一句删除语句还有这个基于主键的 update 语句都可以自己生成避免我们一些很复杂的逻辑。概念区分另外我们再把这个数据同步到 PostgreSQL 的时候要注意一下。因为 MySQL 只有 Database 和 Table 这两层的而没有 Schema 的概念。而在 PostgreSQL 里面它是有三层它分别有 Database、Schema 和 Table。那这就要求假如说我想把这个表和数据都同步到 PostgreSQL 的 test database 下那么这里就要指定。另外因为我们在做数据同步的时候会想要借用它的数据自动建表功能。所以说你去连接这个 PostgreSQL 的时候连接用户最好有建表的权限。占位符另外建表的时候这里可以去使用 Sink 的一个占位符功能。这个功能非常好用可以基于因为我们源端可以有很多表那么很多表它在进行同步的时候它每张表都会有不同的名。是这里我们就可以使用占位符让它自动去生成我们要创建什么表就不用挨个去指定了。这里也是支持一些拼接符号的具体的话可以参考我们文官方的文档。保存模式schema_save_mode也就是我们模式的保存模式我们在做整库同步的时候是非常好用的,可以帮我们节省线表步骤可以完成自动建表。另外一个是 APPEND_DATA。假如目标端已经有数据已经同步过一批任务了这个时候可以避免删除一些数据相对来讲比较安全。如果有特别需求的话也可以去指定其他的模式按照官方文档做修改检测。另外需要说明的是enable_upsert假如说我们能够保证数据源端它的数据是不会重复的,可以把upsert关闭。这样的话可以极大的提升数据同步的性能。但是如果说你不能保证你的数据源是没有重复的那么我们建议还是把这个选项设置为 false。它可以基于主键做一些 upsert。具体的参数还是可以参考我们官方的文档任务提交与监控在配置文件编写完成后我们可以通过 SeaTunnel 的命令行工具提交任务。以下是任务提交的示例命令./bin/start-seatunnel.sh --config /path/to/config.yaml --async关键参数说明config指定配置文件的路径。async表示以异步方式提交任务任务提交后命令行可以立即退出任务会在后台继续执行。任务提交后我们可以通过 SeaTunnel 的集群 UI 界面监控任务的执行状态。在 2.3.9 版本中SeaTunnel 提供了一个直观的 UI 界面可以查看任务的日志、执行状态以及数据量等信息。数据同步演示在本次演示中可以回到文章开头看具体的演示或者 B 站搜索相关视频我们创建了两张表test_001和test_002并在 MySQL 中插入了一些数据。通过 SeaTunnel 的同步任务这些数据被成功同步到了 PostgreSQL 中。我们还演示了数据的插入、删除、更新以及表结构的变更操作SeaTunnel 都能够实时地将这些变更同步到 PostgreSQL。关键点表结构同步SeaTunnel 支持表结构的自动同步。当源端 MySQL 表结构发生变化时目标端 PostgreSQL 的表结构也会自动更新。数据一致性SeaTunnel 保证了数据的一致性所有的插入、删除和更新操作都能够准确地同步到目标数据库。