1. 单表追加同步把一张 MySQL 表同步到一张 PostgreSQL 表本节先从最简单的 MySQL 到 PostgreSQL 单表同步开始。这个场景非常基础从源库test1中读取一张表t_test_1w然后写入目标端 PostgreSQL 库test2中的test_1w表。本示例有几个特点不使用query不做字段转换不自动创建目标表使用追加写入模式适合先把最基础的 JDBC Source 和 JDBC Sink 跑通编写单表同步 HOCON下面是一个最简单的 MySQL 到 PostgreSQL 单表同步配置env { job { mode BATCH } parallelism 1 } source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w } } transform { } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert false } }这个任务的含义很简单从 test1.t_test_1w 读取数据 追加写入到 PostgreSQL 的 public.test_1w这里没有配置query而是直接使用table_path指定源表。理解配置和注意事项这个配置主要分为四部分env、source、transform 和 sink。env 表示任务运行方式。env { job { mode BATCH } parallelism 1 }mode BATCH表示这是一个批处理任务。任务会读取当前源表中的存量数据写入目标表后结束。parallelism 1表示并行度为 1。对于入门示例来说这样更简单也更容易观察任务执行结果。source 表示数据从哪里来。source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w } }这里使用的是Jdbc连接器连接源端 MySQL 数据库test1。其中最重要的是table_path test1.t_test_1w它表示读取test1数据库中的t_test_1w表。transform 暂时为空。transform { }这里不做字段映射、不做字段改名、不做 SQL 处理数据会直接从 source 流向 sink。sink 表示数据写到哪里去。sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert false } }这里连接的是目标端 PostgreSQL 数据库test2写入目标表test_1w。需要注意的是本示例不自动建表。所以在运行任务之前需要先在目标端 PostgreSQL 中创建好表-- PostgreSQL 目标端建表示例字段请根据源表结构调整CREATETABLEpublic.test_1w(idBIGINT,nameVARCHAR(128),emailVARCHAR(256),statusVARCHAR(32),create_timeTIMESTAMP);如果目标表不存在任务写入时会失败。另外这里使用的是追加写入模式data_save_mode APPEND_DATA它表示每次运行任务时都会把源表数据追加写入目标表。如果重复执行任务目标表中可能会出现重复数据。这里关闭了 upsertenable_upsert false因为这是最简单的追加同步场景不需要根据主键判断插入还是更新。如果后面要做主键更新再单独讲enable_upsert true和主键配置。适用场景这个配置适合下面几类场景第一次学习 MySQL 到 PostgreSQL 同步源端 MySQL 表和目标端 PostgreSQL 表结构一致目标表已经提前创建好只需要做一次全量数据搬迁允许数据追加写入用来验证 JDBC Source 和 JDBC Sink 是否正常比如下面这种情况就非常适合源表test1.t_test_1w目标表public.test_1w同步方式全量读取 追加写入建表方式目标表提前创建运行前可以先检查源表数据量 text SELECT COUNT(*) FROM test1.t_test_1w;再检查目标表是否存在SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_schema public AND table_name test_1w ORDER BY ordinal_position;如果目标表已经存在并且字段结构和源表一致就可以运行这个同步任务。2. 单表条件同步只同步符合条件的数据这一节我们在上一节基础上增加条件过滤只同步源表中符合条件的部分数据。比如源表test1.t_test_1w中有一个status字段我们只想同步status ACTIVE的数据到目标表public.test_1w。编写带条件的 HOCONHOCON 配置如下env { job { mode BATCH } parallelism 1 } source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w query SELECT * FROM t_test_1w WHERE status ACTIVE } } transform { } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert false } }核心变化是 source 里增加了query参数query SELECT * FROM t_test_1w WHERE status ACTIVE这样SeaTunnel 只会读取符合条件的数据。理解配置和注意事项**query 参数**用于 SQL 级别过滤数据可以写任意合法 SQL不使用 query 时会直接读取整个表对应上节示例例如SELECT*FROMt_test_1wWHEREstatusACTIVEANDcreate_time2026-06-01sink 配置保持和上一节一致追加写入APPEND_DATA每批写入batch_size条数据不做 upsert注意事项table_pathquery组合使用时会按照 query 获取数据query 中字段必须和源表字段一致否则写入目标表可能失败目标表依然需要提前创建适用场景这个配置适合只同步部分数据例如只同步status ACTIVE只同步最近一周新增的数据只同步某个部门的数据源表数据量大但目标表只需要一部分目标表已经创建结构和源表一致不希望一次同步整个表提高效率示例源表test1.t_test_1w目标表public.test_1w条件status ‘ACTIVE’同步方式全量读取符合条件 追加写入3. 单表字段映射源表字段和目标表字段不一致在实际项目中源端 MySQL 表和目标端 PostgreSQL 表字段不一定完全一致。比如源表t_test_1w有字段id, name, email目标表可能是user_id, full_name, email_address。本节演示如何使用FieldMapper在 transform 阶段完成字段映射。编写带字段映射的 HOCONHOCON 配置如下env { job { mode BATCH } parallelism 1 } source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w } } transform { FieldMapper { mappings [ { sourceField id, targetField user_id, targetType bigint } { sourceField name, targetField full_name, targetType varchar(128) } { sourceField email, targetField email_address, targetType varchar(256) } ] } } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert false } }核心变化在transform部分我们通过FieldMapper映射源表字段到目标表字段id → user_idname → full_nameemail → email_address理解配置和注意事项**transform.FieldMapper** 配置sourceField源表字段名targetField目标表字段名targetType目标字段类型可做类型转换注意事项映射的字段必须存在于源表否则会报错目标表必须提前创建并且字段类型尽量与targetType匹配不需要修改 sink 配置FieldMapper 会在写入前自动处理字段对应关系仍然使用追加模式APPEND_DATA不会覆盖目标表数据适用场景字段映射适合以下场景源表字段和目标表字段名称不一致目标表已经建好但不能修改字段名想在同步时做字段类型转换如int → bigint或varchar → varchar(256))保持数据一致性同时对接已有业务系统示例源表id, name, email 目标表user_id, full_name, email_address 同步方式全量读取 字段映射 追加写入4. 单表自动建表目标表不存在时自动创建在实际项目中目标表可能尚未创建。这时可以使用自动建表功能SeaTunnel 会根据源表结构自动在目标库创建表然后再写入数据。编写自动建表 HOCONHOCON 配置如下env { job { mode BATCH } parallelism 1 } source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w } } transform { } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert false schema_save_mode CREATE_SCHEMA_WHEN_NOT_EXIST } }核心变化在 sink 配置schema_save_mode CREATE_SCHEMA_WHEN_NOT_EXIST这个参数表示如果目标表不存在SeaTunnel 会自动根据源表结构创建已存在的表不会覆盖理解配置和注意事项**sink.schema_save_mode** 说明CREATE_SCHEMA_WHEN_NOT_EXIST目标表不存在时自动建表自动建表时字段类型和源表一致不会覆盖已有表安全追加数据配合generate_sink_sql true使用系统会生成完整 insert SQL注意事项自动建表功能依赖数据库账号有建表权限如果目标端 PostgreSQL 没有权限会报错enable_upsert仍然可选如果要做主键更新需要确保建表时设置主键适用场景自动建表适合以下场景目标表尚未创建想快速部署数据同步任务源表字段结构固定可以直接建表适合测试环境或开发环境快速搭建示例源表test1.t_test_1w 目标表public.test_1w不存在 同步方式全量读取 自动建表 追加写入5. 单表主键更新使用 upsert 避免重复数据在实际业务中源表可能会有更新或重复数据。如果直接追加写入目标表可能会产生重复记录。这时可以使用upsert功能根据主键判断插入或更新。编写带 upsert 的 HOCONHOCON 配置如下env { job { mode BATCH } parallelism 1 } source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w } } transform { FieldMapper { mappings [ { sourceField id, targetField user_id, targetType bigint } { sourceField name, targetField full_name, targetType varchar(128) } { sourceField email, targetField email_address, targetType varchar(256) } ] } } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert true primary_keys [user_id] } }核心变化在 sink 配置enable_upsert true primary_keys [user_id]enable_upsert true启用增量更新primary_keys指定目标表判断插入/更新的字段理解配置和注意事项**enable_upsert 注意事项**必须指定primary_keys否则任务会报错primary_keys最好选择目标表中的主键字段或有索引的字段否则 upsert 写入时需要频繁判断数据是否存在性能会比较差upsert 模式会在写入前根据主键判断记录是否存在存在 → 更新不存在 → 插入data_save_mode仍可使用APPEND_DATAupsert 会覆盖匹配主键的数据与generate_sink_sql配合使用系统自动生成带主键判断的 SQL其他注意事项确保目标表主键和primary_keys对应字段一致目标表必须存在或配合自动建表功能使用对大表开启 upsert 时建议设置合适的batch_size提升写入性能适用场景upsert 适合以下业务场景源表可能更新已有数据避免目标表产生重复记录保证目标表数据一致性配合字段映射使用实现源表字段和目标表字段不一致也能正确更新示例源表id, name, email 目标表user_id, full_name, email_address 同步方式增量更新 追加写入 字段映射 主键user_id6. 单表覆盖同步每次同步前清空目标表前面几个示例主要使用的是追加写入模式也就是每次同步都会把数据继续写入目标表。但在一些场景中目标表不需要保留历史数据只需要和源表当前数据保持一致。比如维表、字典表、配置表等每次同步都以源表最新数据为准。这时可以使用覆盖同步模式data_save_mode DROP_DATA它表示在写入目标表之前先清空目标表中的已有数据然后再重新写入本次同步的数据。编写覆盖同步 HOCONenv { job { mode BATCH } parallelism 1 } source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w } } transform { } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode DROP_DATA batch_size 1000 generate_sink_sql true enable_upsert false } }这个任务的含义是从 test1.t_test_1w 读取数据 写入 public.test_1w 写入前先清空目标表已有数据也就是说任务执行完成后目标表中只保留本次从源表同步过来的数据。理解配置和注意事项覆盖同步的核心配置是data_save_mode DROP_DATA它和追加同步最大的区别是APPEND_DATA保留目标表原有数据继续追加写入DROP_DATA先清空目标表原有数据再重新写入在覆盖同步场景中一般不需要开启 upsertenable_upsert false因为目标表已经在写入前被清空后续数据都是重新插入不需要再根据主键判断更新或插入。需要注意的是DROP_DATA会清空目标表数据所以使用前要确认目标表是否允许被清空。另外目标端 PostgreSQL 表字段需要和写入数据匹配。可以提前创建目标表-- PostgreSQL 目标端建表示例字段请根据源表结构调整CREATETABLEpublic.test_1w(idBIGINT,nameVARCHAR(128),emailVARCHAR(256),statusVARCHAR(32),create_timeTIMESTAMP);如果想让目标表不存在时自动创建也可以配合自动建表配置schema_save_mode CREATE_SCHEMA_WHEN_NOT_EXIST适用场景覆盖同步适合下面几类场景维表同步例如用户维表、部门维表、区域维表字典表同步例如状态码、类型码、枚举配置配置表同步例如业务参数、规则配置测试环境刷新数据每次以源表为准目标表不需要保留历史数据只需要最新快照比如下面这种情况就比较适合源表test1.t_test_1w 目标表public.test_1w 同步方式全量读取源表 写入方式清空目标表后重新写入 目标效果目标表和源表当前数据保持一致如果目标表需要保留历史数据建议使用APPEND_DATA。如果目标表需要根据主键更新已有数据建议使用enable_upsert true。7. 单表字段裁剪只同步需要的字段在很多场景中源表字段较多但目标表只需要其中部分字段。直接同步所有字段会增加网络和数据库压力也可能写入冗余数据。这时可以使用字段裁剪方式通过 SQL 查询或 FieldMapper 只同步需要的字段。编写字段裁剪 HOCONsink { Jdbc { database test2 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true } }这个配置的含义是只读取源表的id、name和create_time字段并写入目标表。理解配置和注意事项通过query选择需要的字段减少网络传输和目标表写入压力可以配合FieldMapper对字段重命名或类型转换目标表字段需和查询字段匹配否则写入可能失败适用场景源表字段多但目标表只关心部分字段开发或测试环境只需要部分数据验证需要减少冗余数据传输和存储压力示例目标表只需要 id, name, create_time 同步方式全量读取指定字段 追加写入8. 单表时间范围同步只同步某一天或某一周的数据在实际同步任务中并不是每次都需要同步整张表。如果源表数据量比较大或者任务是每天定时执行通常只需要同步某个时间范围内的数据比如昨天的数据、最近一周的数据或者某个固定日期的数据。这时可以通过query配置时间条件只读取指定时间范围的数据。编写时间范围同步 HOCON} source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 query SELECT * FROM t_test_1w WHERE create_time 2026-06-01 00:00:00 AND create_time 2026-06-02 00:00:00 } } transform { } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert false } }这个配置表示只同步2026-06-01当天的数据。理解配置和注意事项时间范围同步的核心是query中的时间条件AND create_time 2026-06-02 00:00:00这里建议使用“左闭右开”的时间范围这样可以避免跨天同步时出现重复或遗漏。如果是每天定时同步也可以把时间写成变量例如AND create_time ${end_date}需要注意的是时间字段最好建立索引否则源表数据量较大时查询速度可能会比较慢。适用场景每天同步昨天新增或变更的数据每周同步最近一周的数据源表数据量较大不适合每次全量同步需要按照创建时间、更新时间、业务时间进行范围过滤示例目标表public.test_1w 同步范围2026-06-01 当天数据 同步方式按时间范围过滤 追加写入9. 单表性能优化batch_size 和 parallelism 怎么设置当单表数据量比较小时默认配置通常就可以满足需求。但如果同步的数据量达到 10 万、100 万甚至更多就需要关注两个比较常用的性能参数batch_size和parallelism。简单来说parallelism控制任务并行度配置 batch_size 和 parallelism} source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_10w } } transform { } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_10w data_save_mode APPEND_DATA batch_size 5000 generate_sink_sql true enable_upsert false } }这个配置中parallelism 4表示任务并行度为 4batch_size 5000表示每批写入 5000 条数据理解配置和注意事项batch_size不是越大越好。设置太小写入次数会变多整体效率可能较低。设置太大单次写入压力会变大可能导致目标库连接超时、锁等待或内存压力增加。一般可以先从下面的值开始测试中等数据量3000 ~ 5000 大表5000 ~ 10000parallelism也需要根据数据库能力来调整。如果数据库配置较低并行度过高反而可能把数据库压垮。建议先从1或2开始再逐步调整到4、8观察同步耗时和数据库负载。适用场景单表数据量较大希望缩短同步时间目标端 PostgreSQL 写入能力较好可以适当提高批量写入大小需要通过测试找到更合适的同步参数示例10 万数据batch_size 3000 ~ 5000parallelism 2 ~ 4 100 万数据batch_size 5000 ~ 10000parallelism 4 ~ 8这些值不是固定标准最终还是要结合数据库性能、网络情况和任务执行结果来调整。10. 单表数据校验同步完成后如何确认数据正确同步任务执行成功并不代表数据一定完全符合预期。在实际项目中任务运行完成后通常还需要做一些简单的数据校验比如数据量是否一致、关键字段是否为空、部分样例数据是否正确。对于单表同步来说最常见的校验方式就是先从数量开始。编写数据校验 SQL1. 校验源表数据量2. 校验目标表数据量如果是全量同步源表和目标表数量通常应该一致。3. 校验关键字段空值如果id是关键字段那么这里的结果应该为0。4. 抽样检查部分数据SELECT * FROM public.test_1w WHERE id IN (1, 100, 1000);通过抽样对比可以快速确认关键字段内容是否一致。理解配置和注意事项单表校验可以从三个层面进行数量校验源表和目标表总数是否一致字段校验关键字段是否为空字段内容是否符合预期样例校验抽取几条数据对比源表和目标表内容如果使用的是条件同步比如只同步某一天的数据那么校验 SQL 也需要带上相同的过滤条件。例如FROM test1.t_test_1w WHERE create_time 2026-06-01 00:00:00 AND create_time 2026-06-02 00:00:00;目标表也要使用同样的时间范围进行校验。适用场景单表同步任务执行完成后确认数据是否正确测试环境验证同步配置是否可用生产环境同步后做基础核对排查目标表数据缺失、重复或字段异常问题示例目标表数量10000 关键字段空值0 抽样数据源表和目标表一致 校验结论本次同步结果符合预期11. 单表失败排查常见错误和解决办法单表同步任务失败时不一定是 SeaTunnel 本身的问题。更多时候问题出在数据库连接、驱动、表结构、字段类型、权限或数据内容上。所以排查时可以先从几个常见方向入手。常见错误类型常见问题可以分为下面几类2. JDBC 驱动不存在 3. 目标表不存在 4. 字段名不匹配 5. 字段类型不兼容 6. 字段长度超长 7. 主键冲突 8. 数据库账号权限不足这些问题在单表同步中都比较常见。理解排查思路1. 数据库连接失败优先检查连接地址、端口、账号、密码是否正确。username root password 123456如果地址、端口或账号密码错误任务会在连接数据库时失败。2. 驱动不存在检查driver配置是否正确以及运行环境中是否已经放入对应 JDBC 驱动。3. 目标表不存在如果没有开启自动建表目标表需要提前创建。CREATETABLEpublic.test_1w(idBIGINT,nameVARCHAR(128),emailVARCHAR(256),statusVARCHAR(32),create_timeTIMESTAMP);4. 字段不匹配如果源表字段和目标表字段不一致可能会出现字段找不到、写入失败等问题。这种情况可以检查源表和目标表结构SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_schema public AND table_name test_1w ORDER BY ordinal_position;5. 字段长度超长如果目标表字段长度比源表短可能会出现写入失败。例如源表字段长度是varchar(255)目标表字段长度是varchar(36)当数据超过 36 个字符时就可能失败。6. 权限不足如果账号没有查询、写入、建表或清空表的权限也会导致任务失败。特别是使用下面这些能力时要注意账号权限读取源表需要 SELECT 权限写入目标表需要 INSERT 权限自动建表需要 CREATE 权限覆盖同步可能需要 DELETE 或 TRUNCATE 权限适用场景单表失败排查适合下面这些情况任务启动后直接失败数据库连接测试不通过目标表没有写入数据写入过程中报字段类型或长度错误开启 upsert 后出现主键相关错误可以按照下面顺序排查先看连接是否正常 再看驱动是否存在 再看表是否存在 再看字段是否匹配 再看数据是否超长 最后看账号权限是否足够大多数单表同步问题都可以通过这个顺序快速定位。写在最后SeaTunnel Web 是我正在持续完善的一个 SeaTunnel 可视化项目希望把数据源管理、Zeta Engine 连接、任务配置、运行日志和指标查看这些常用能力做得更直观、更容易上手。项目地址https://github.com/weifuwan/seatunnel-web里面有体验地址、部署文档、社区交流群如果你也在学习 SeaTunnel或者正在做数据同步、数据集成相关的事情也欢迎一起交流。这里有一群上进的小伙伴也有一群爱分享的大佬。大家可以一起讨论问题、分享实践、完善文档也一起把 SeaTunnel 这件事讲得更清楚、做得更好用。
【新版SeaTunnel Web 最佳实践8】:MySQL 到 PostgreSQL 单表同步 11 个常见场景
1. 单表追加同步把一张 MySQL 表同步到一张 PostgreSQL 表本节先从最简单的 MySQL 到 PostgreSQL 单表同步开始。这个场景非常基础从源库test1中读取一张表t_test_1w然后写入目标端 PostgreSQL 库test2中的test_1w表。本示例有几个特点不使用query不做字段转换不自动创建目标表使用追加写入模式适合先把最基础的 JDBC Source 和 JDBC Sink 跑通编写单表同步 HOCON下面是一个最简单的 MySQL 到 PostgreSQL 单表同步配置env { job { mode BATCH } parallelism 1 } source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w } } transform { } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert false } }这个任务的含义很简单从 test1.t_test_1w 读取数据 追加写入到 PostgreSQL 的 public.test_1w这里没有配置query而是直接使用table_path指定源表。理解配置和注意事项这个配置主要分为四部分env、source、transform 和 sink。env 表示任务运行方式。env { job { mode BATCH } parallelism 1 }mode BATCH表示这是一个批处理任务。任务会读取当前源表中的存量数据写入目标表后结束。parallelism 1表示并行度为 1。对于入门示例来说这样更简单也更容易观察任务执行结果。source 表示数据从哪里来。source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w } }这里使用的是Jdbc连接器连接源端 MySQL 数据库test1。其中最重要的是table_path test1.t_test_1w它表示读取test1数据库中的t_test_1w表。transform 暂时为空。transform { }这里不做字段映射、不做字段改名、不做 SQL 处理数据会直接从 source 流向 sink。sink 表示数据写到哪里去。sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert false } }这里连接的是目标端 PostgreSQL 数据库test2写入目标表test_1w。需要注意的是本示例不自动建表。所以在运行任务之前需要先在目标端 PostgreSQL 中创建好表-- PostgreSQL 目标端建表示例字段请根据源表结构调整CREATETABLEpublic.test_1w(idBIGINT,nameVARCHAR(128),emailVARCHAR(256),statusVARCHAR(32),create_timeTIMESTAMP);如果目标表不存在任务写入时会失败。另外这里使用的是追加写入模式data_save_mode APPEND_DATA它表示每次运行任务时都会把源表数据追加写入目标表。如果重复执行任务目标表中可能会出现重复数据。这里关闭了 upsertenable_upsert false因为这是最简单的追加同步场景不需要根据主键判断插入还是更新。如果后面要做主键更新再单独讲enable_upsert true和主键配置。适用场景这个配置适合下面几类场景第一次学习 MySQL 到 PostgreSQL 同步源端 MySQL 表和目标端 PostgreSQL 表结构一致目标表已经提前创建好只需要做一次全量数据搬迁允许数据追加写入用来验证 JDBC Source 和 JDBC Sink 是否正常比如下面这种情况就非常适合源表test1.t_test_1w目标表public.test_1w同步方式全量读取 追加写入建表方式目标表提前创建运行前可以先检查源表数据量 text SELECT COUNT(*) FROM test1.t_test_1w;再检查目标表是否存在SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_schema public AND table_name test_1w ORDER BY ordinal_position;如果目标表已经存在并且字段结构和源表一致就可以运行这个同步任务。2. 单表条件同步只同步符合条件的数据这一节我们在上一节基础上增加条件过滤只同步源表中符合条件的部分数据。比如源表test1.t_test_1w中有一个status字段我们只想同步status ACTIVE的数据到目标表public.test_1w。编写带条件的 HOCONHOCON 配置如下env { job { mode BATCH } parallelism 1 } source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w query SELECT * FROM t_test_1w WHERE status ACTIVE } } transform { } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert false } }核心变化是 source 里增加了query参数query SELECT * FROM t_test_1w WHERE status ACTIVE这样SeaTunnel 只会读取符合条件的数据。理解配置和注意事项**query 参数**用于 SQL 级别过滤数据可以写任意合法 SQL不使用 query 时会直接读取整个表对应上节示例例如SELECT*FROMt_test_1wWHEREstatusACTIVEANDcreate_time2026-06-01sink 配置保持和上一节一致追加写入APPEND_DATA每批写入batch_size条数据不做 upsert注意事项table_pathquery组合使用时会按照 query 获取数据query 中字段必须和源表字段一致否则写入目标表可能失败目标表依然需要提前创建适用场景这个配置适合只同步部分数据例如只同步status ACTIVE只同步最近一周新增的数据只同步某个部门的数据源表数据量大但目标表只需要一部分目标表已经创建结构和源表一致不希望一次同步整个表提高效率示例源表test1.t_test_1w目标表public.test_1w条件status ‘ACTIVE’同步方式全量读取符合条件 追加写入3. 单表字段映射源表字段和目标表字段不一致在实际项目中源端 MySQL 表和目标端 PostgreSQL 表字段不一定完全一致。比如源表t_test_1w有字段id, name, email目标表可能是user_id, full_name, email_address。本节演示如何使用FieldMapper在 transform 阶段完成字段映射。编写带字段映射的 HOCONHOCON 配置如下env { job { mode BATCH } parallelism 1 } source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w } } transform { FieldMapper { mappings [ { sourceField id, targetField user_id, targetType bigint } { sourceField name, targetField full_name, targetType varchar(128) } { sourceField email, targetField email_address, targetType varchar(256) } ] } } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert false } }核心变化在transform部分我们通过FieldMapper映射源表字段到目标表字段id → user_idname → full_nameemail → email_address理解配置和注意事项**transform.FieldMapper** 配置sourceField源表字段名targetField目标表字段名targetType目标字段类型可做类型转换注意事项映射的字段必须存在于源表否则会报错目标表必须提前创建并且字段类型尽量与targetType匹配不需要修改 sink 配置FieldMapper 会在写入前自动处理字段对应关系仍然使用追加模式APPEND_DATA不会覆盖目标表数据适用场景字段映射适合以下场景源表字段和目标表字段名称不一致目标表已经建好但不能修改字段名想在同步时做字段类型转换如int → bigint或varchar → varchar(256))保持数据一致性同时对接已有业务系统示例源表id, name, email 目标表user_id, full_name, email_address 同步方式全量读取 字段映射 追加写入4. 单表自动建表目标表不存在时自动创建在实际项目中目标表可能尚未创建。这时可以使用自动建表功能SeaTunnel 会根据源表结构自动在目标库创建表然后再写入数据。编写自动建表 HOCONHOCON 配置如下env { job { mode BATCH } parallelism 1 } source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w } } transform { } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert false schema_save_mode CREATE_SCHEMA_WHEN_NOT_EXIST } }核心变化在 sink 配置schema_save_mode CREATE_SCHEMA_WHEN_NOT_EXIST这个参数表示如果目标表不存在SeaTunnel 会自动根据源表结构创建已存在的表不会覆盖理解配置和注意事项**sink.schema_save_mode** 说明CREATE_SCHEMA_WHEN_NOT_EXIST目标表不存在时自动建表自动建表时字段类型和源表一致不会覆盖已有表安全追加数据配合generate_sink_sql true使用系统会生成完整 insert SQL注意事项自动建表功能依赖数据库账号有建表权限如果目标端 PostgreSQL 没有权限会报错enable_upsert仍然可选如果要做主键更新需要确保建表时设置主键适用场景自动建表适合以下场景目标表尚未创建想快速部署数据同步任务源表字段结构固定可以直接建表适合测试环境或开发环境快速搭建示例源表test1.t_test_1w 目标表public.test_1w不存在 同步方式全量读取 自动建表 追加写入5. 单表主键更新使用 upsert 避免重复数据在实际业务中源表可能会有更新或重复数据。如果直接追加写入目标表可能会产生重复记录。这时可以使用upsert功能根据主键判断插入或更新。编写带 upsert 的 HOCONHOCON 配置如下env { job { mode BATCH } parallelism 1 } source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w } } transform { FieldMapper { mappings [ { sourceField id, targetField user_id, targetType bigint } { sourceField name, targetField full_name, targetType varchar(128) } { sourceField email, targetField email_address, targetType varchar(256) } ] } } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert true primary_keys [user_id] } }核心变化在 sink 配置enable_upsert true primary_keys [user_id]enable_upsert true启用增量更新primary_keys指定目标表判断插入/更新的字段理解配置和注意事项**enable_upsert 注意事项**必须指定primary_keys否则任务会报错primary_keys最好选择目标表中的主键字段或有索引的字段否则 upsert 写入时需要频繁判断数据是否存在性能会比较差upsert 模式会在写入前根据主键判断记录是否存在存在 → 更新不存在 → 插入data_save_mode仍可使用APPEND_DATAupsert 会覆盖匹配主键的数据与generate_sink_sql配合使用系统自动生成带主键判断的 SQL其他注意事项确保目标表主键和primary_keys对应字段一致目标表必须存在或配合自动建表功能使用对大表开启 upsert 时建议设置合适的batch_size提升写入性能适用场景upsert 适合以下业务场景源表可能更新已有数据避免目标表产生重复记录保证目标表数据一致性配合字段映射使用实现源表字段和目标表字段不一致也能正确更新示例源表id, name, email 目标表user_id, full_name, email_address 同步方式增量更新 追加写入 字段映射 主键user_id6. 单表覆盖同步每次同步前清空目标表前面几个示例主要使用的是追加写入模式也就是每次同步都会把数据继续写入目标表。但在一些场景中目标表不需要保留历史数据只需要和源表当前数据保持一致。比如维表、字典表、配置表等每次同步都以源表最新数据为准。这时可以使用覆盖同步模式data_save_mode DROP_DATA它表示在写入目标表之前先清空目标表中的已有数据然后再重新写入本次同步的数据。编写覆盖同步 HOCONenv { job { mode BATCH } parallelism 1 } source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_1w } } transform { } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode DROP_DATA batch_size 1000 generate_sink_sql true enable_upsert false } }这个任务的含义是从 test1.t_test_1w 读取数据 写入 public.test_1w 写入前先清空目标表已有数据也就是说任务执行完成后目标表中只保留本次从源表同步过来的数据。理解配置和注意事项覆盖同步的核心配置是data_save_mode DROP_DATA它和追加同步最大的区别是APPEND_DATA保留目标表原有数据继续追加写入DROP_DATA先清空目标表原有数据再重新写入在覆盖同步场景中一般不需要开启 upsertenable_upsert false因为目标表已经在写入前被清空后续数据都是重新插入不需要再根据主键判断更新或插入。需要注意的是DROP_DATA会清空目标表数据所以使用前要确认目标表是否允许被清空。另外目标端 PostgreSQL 表字段需要和写入数据匹配。可以提前创建目标表-- PostgreSQL 目标端建表示例字段请根据源表结构调整CREATETABLEpublic.test_1w(idBIGINT,nameVARCHAR(128),emailVARCHAR(256),statusVARCHAR(32),create_timeTIMESTAMP);如果想让目标表不存在时自动创建也可以配合自动建表配置schema_save_mode CREATE_SCHEMA_WHEN_NOT_EXIST适用场景覆盖同步适合下面几类场景维表同步例如用户维表、部门维表、区域维表字典表同步例如状态码、类型码、枚举配置配置表同步例如业务参数、规则配置测试环境刷新数据每次以源表为准目标表不需要保留历史数据只需要最新快照比如下面这种情况就比较适合源表test1.t_test_1w 目标表public.test_1w 同步方式全量读取源表 写入方式清空目标表后重新写入 目标效果目标表和源表当前数据保持一致如果目标表需要保留历史数据建议使用APPEND_DATA。如果目标表需要根据主键更新已有数据建议使用enable_upsert true。7. 单表字段裁剪只同步需要的字段在很多场景中源表字段较多但目标表只需要其中部分字段。直接同步所有字段会增加网络和数据库压力也可能写入冗余数据。这时可以使用字段裁剪方式通过 SQL 查询或 FieldMapper 只同步需要的字段。编写字段裁剪 HOCONsink { Jdbc { database test2 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true } }这个配置的含义是只读取源表的id、name和create_time字段并写入目标表。理解配置和注意事项通过query选择需要的字段减少网络传输和目标表写入压力可以配合FieldMapper对字段重命名或类型转换目标表字段需和查询字段匹配否则写入可能失败适用场景源表字段多但目标表只关心部分字段开发或测试环境只需要部分数据验证需要减少冗余数据传输和存储压力示例目标表只需要 id, name, create_time 同步方式全量读取指定字段 追加写入8. 单表时间范围同步只同步某一天或某一周的数据在实际同步任务中并不是每次都需要同步整张表。如果源表数据量比较大或者任务是每天定时执行通常只需要同步某个时间范围内的数据比如昨天的数据、最近一周的数据或者某个固定日期的数据。这时可以通过query配置时间条件只读取指定时间范围的数据。编写时间范围同步 HOCON} source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 query SELECT * FROM t_test_1w WHERE create_time 2026-06-01 00:00:00 AND create_time 2026-06-02 00:00:00 } } transform { } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_1w data_save_mode APPEND_DATA batch_size 1000 generate_sink_sql true enable_upsert false } }这个配置表示只同步2026-06-01当天的数据。理解配置和注意事项时间范围同步的核心是query中的时间条件AND create_time 2026-06-02 00:00:00这里建议使用“左闭右开”的时间范围这样可以避免跨天同步时出现重复或遗漏。如果是每天定时同步也可以把时间写成变量例如AND create_time ${end_date}需要注意的是时间字段最好建立索引否则源表数据量较大时查询速度可能会比较慢。适用场景每天同步昨天新增或变更的数据每周同步最近一周的数据源表数据量较大不适合每次全量同步需要按照创建时间、更新时间、业务时间进行范围过滤示例目标表public.test_1w 同步范围2026-06-01 当天数据 同步方式按时间范围过滤 追加写入9. 单表性能优化batch_size 和 parallelism 怎么设置当单表数据量比较小时默认配置通常就可以满足需求。但如果同步的数据量达到 10 万、100 万甚至更多就需要关注两个比较常用的性能参数batch_size和parallelism。简单来说parallelism控制任务并行度配置 batch_size 和 parallelism} source { Jdbc { database test1 driver com.mysql.cj.jdbc.Driver url jdbc:mysql://127.0.0.1:3306/test1?allowPublicKeyRetrievaltrueuseSSLfalse username root password 123456 table_path test1.t_test_10w } } transform { } sink { Jdbc { database test2 driver org.postgresql.Driver url jdbc:postgresql://127.0.0.1:5432/test2 username root password 123456 table public.test_10w data_save_mode APPEND_DATA batch_size 5000 generate_sink_sql true enable_upsert false } }这个配置中parallelism 4表示任务并行度为 4batch_size 5000表示每批写入 5000 条数据理解配置和注意事项batch_size不是越大越好。设置太小写入次数会变多整体效率可能较低。设置太大单次写入压力会变大可能导致目标库连接超时、锁等待或内存压力增加。一般可以先从下面的值开始测试中等数据量3000 ~ 5000 大表5000 ~ 10000parallelism也需要根据数据库能力来调整。如果数据库配置较低并行度过高反而可能把数据库压垮。建议先从1或2开始再逐步调整到4、8观察同步耗时和数据库负载。适用场景单表数据量较大希望缩短同步时间目标端 PostgreSQL 写入能力较好可以适当提高批量写入大小需要通过测试找到更合适的同步参数示例10 万数据batch_size 3000 ~ 5000parallelism 2 ~ 4 100 万数据batch_size 5000 ~ 10000parallelism 4 ~ 8这些值不是固定标准最终还是要结合数据库性能、网络情况和任务执行结果来调整。10. 单表数据校验同步完成后如何确认数据正确同步任务执行成功并不代表数据一定完全符合预期。在实际项目中任务运行完成后通常还需要做一些简单的数据校验比如数据量是否一致、关键字段是否为空、部分样例数据是否正确。对于单表同步来说最常见的校验方式就是先从数量开始。编写数据校验 SQL1. 校验源表数据量2. 校验目标表数据量如果是全量同步源表和目标表数量通常应该一致。3. 校验关键字段空值如果id是关键字段那么这里的结果应该为0。4. 抽样检查部分数据SELECT * FROM public.test_1w WHERE id IN (1, 100, 1000);通过抽样对比可以快速确认关键字段内容是否一致。理解配置和注意事项单表校验可以从三个层面进行数量校验源表和目标表总数是否一致字段校验关键字段是否为空字段内容是否符合预期样例校验抽取几条数据对比源表和目标表内容如果使用的是条件同步比如只同步某一天的数据那么校验 SQL 也需要带上相同的过滤条件。例如FROM test1.t_test_1w WHERE create_time 2026-06-01 00:00:00 AND create_time 2026-06-02 00:00:00;目标表也要使用同样的时间范围进行校验。适用场景单表同步任务执行完成后确认数据是否正确测试环境验证同步配置是否可用生产环境同步后做基础核对排查目标表数据缺失、重复或字段异常问题示例目标表数量10000 关键字段空值0 抽样数据源表和目标表一致 校验结论本次同步结果符合预期11. 单表失败排查常见错误和解决办法单表同步任务失败时不一定是 SeaTunnel 本身的问题。更多时候问题出在数据库连接、驱动、表结构、字段类型、权限或数据内容上。所以排查时可以先从几个常见方向入手。常见错误类型常见问题可以分为下面几类2. JDBC 驱动不存在 3. 目标表不存在 4. 字段名不匹配 5. 字段类型不兼容 6. 字段长度超长 7. 主键冲突 8. 数据库账号权限不足这些问题在单表同步中都比较常见。理解排查思路1. 数据库连接失败优先检查连接地址、端口、账号、密码是否正确。username root password 123456如果地址、端口或账号密码错误任务会在连接数据库时失败。2. 驱动不存在检查driver配置是否正确以及运行环境中是否已经放入对应 JDBC 驱动。3. 目标表不存在如果没有开启自动建表目标表需要提前创建。CREATETABLEpublic.test_1w(idBIGINT,nameVARCHAR(128),emailVARCHAR(256),statusVARCHAR(32),create_timeTIMESTAMP);4. 字段不匹配如果源表字段和目标表字段不一致可能会出现字段找不到、写入失败等问题。这种情况可以检查源表和目标表结构SELECT column_name, data_type, character_maximum_length FROM information_schema.columns WHERE table_schema public AND table_name test_1w ORDER BY ordinal_position;5. 字段长度超长如果目标表字段长度比源表短可能会出现写入失败。例如源表字段长度是varchar(255)目标表字段长度是varchar(36)当数据超过 36 个字符时就可能失败。6. 权限不足如果账号没有查询、写入、建表或清空表的权限也会导致任务失败。特别是使用下面这些能力时要注意账号权限读取源表需要 SELECT 权限写入目标表需要 INSERT 权限自动建表需要 CREATE 权限覆盖同步可能需要 DELETE 或 TRUNCATE 权限适用场景单表失败排查适合下面这些情况任务启动后直接失败数据库连接测试不通过目标表没有写入数据写入过程中报字段类型或长度错误开启 upsert 后出现主键相关错误可以按照下面顺序排查先看连接是否正常 再看驱动是否存在 再看表是否存在 再看字段是否匹配 再看数据是否超长 最后看账号权限是否足够大多数单表同步问题都可以通过这个顺序快速定位。写在最后SeaTunnel Web 是我正在持续完善的一个 SeaTunnel 可视化项目希望把数据源管理、Zeta Engine 连接、任务配置、运行日志和指标查看这些常用能力做得更直观、更容易上手。项目地址https://github.com/weifuwan/seatunnel-web里面有体验地址、部署文档、社区交流群如果你也在学习 SeaTunnel或者正在做数据同步、数据集成相关的事情也欢迎一起交流。这里有一群上进的小伙伴也有一群爱分享的大佬。大家可以一起讨论问题、分享实践、完善文档也一起把 SeaTunnel 这件事讲得更清楚、做得更好用。