概览第一篇我们解决了怎么写——一条flink run跑起完整的 Multi-Statement SQL 脚本。这一篇解决怎么管让 Flink SQL 作业的研发流程具备和 Java 后端同样的工程能力——可检测、可追溯、可回滚、自动化。本文将深入Flink SQL Validate的底层原理Calcite 解析、验证这也是各个大厂内部实时研发流程中最基础、最重要的一环让你不仅知道怎么用更理解为什么它能在不连 Flink 集群的情况下精确校验语法。像上一篇 Flink 实时数仓开发实战像 Hive 那样用 Flink SQL 一样本文也提供了一个示例项目 Flink SQL Bootstrap Examples - CI/CD 帮助大家快速搭建本地环境并一步步演示 CI/CD 流水线。为什么需要 CI/CDFlink SQL 及 CI/CD 的引入可以从以下四个方面大大提升研发效率代码量技术栈门槛维护成本迭代效率这也是为什么大厂普遍使用 Flink SQL 作为实时研发的核心原因。并且 CI/CD 它还保障了四个核心能力能力说明可检测编译不过不能合、单测不过不能上线——机器过滤低级错误人专注逻辑可追溯谁改的、什么时候改的、为什么改——git log pipeline 记录一条链路可回滚出了事不至于手忙脚乱重新部署上一个版本几分钟恢复自动化从提交到上线不需要人做机器能做的事这些能力虽然在服务端已经是家常便饭但是对于数据开发这些能力只有大厂内部高度集成的研发平台才能提供。但这些能力往往是最基础、最核心的研发流程它的缺失就像一个雷不知道什么时候、在哪里就会炸一下。接下来我们将以flink-sql-bootstrap Gitlab为样板提供一种轻量、可靠的方案基于公司现有的服务端 CI/CD 流程可能是 Gitlab Runner 或 Jenkins搭建大数据自己的实时研发 CI/CD 流程。快速开始示例中将非常多繁琐的工作都帮读者做好了一个命令就能够完成所有环境的安装、配置甚至 Gitlab ssh 的创建和配置cd example-cicd/dockerbash setup.sh这个脚本自动完成 8 件事步骤说明1. 检查 Docker Docker Compose前置依赖校验2. 构建flink-ci-runner:1.20.4CI Runner 镜像Flink 1.20.4 Python3 git3. 启动 GitLab CE Runner两个容器桥接网络4. 预拉取 Runner Helper 镜像防止 CI job 因网络问题拉取超时5. 生成/上传 SSH 公钥到 GitLab免密推送代码6. 在 GitLab 上创建项目仓库API 自动创建不需要手动操作7. 创建并注册 Project RunnerPOST /api/v4/user/runners注册8. 配置本地 git remotegitlabssh://gitlocalhost:2224一键推送环境就绪后打开http://localhost:8929用户名root密码flink1234就能看到 GitLab 面板。随便改个 SQL 文件推送代码即可触发流水线git push gitlab main流水线设计架构总览示例中搭建的 CI/CD 流水线分为了以下几个阶段规范检查我们的数仓有一些必要的规范命名规范、SQL 语法规范等等这一步是对 SQL 代码是否满足数仓规范的检查SQL校验基于 Flink 内置的 Calcite 解析、验证能力对提交的 SQL 代码进行语法检查、语义检查语法检查检查 SQL 语法是否符合 Flink SQL 语法规范语义检查解析 SQL 查询的 Catalog解析表名、字段名、函数名这些 Catalog 中都有吗引用的对吗类型对吗动态编排SQL Script 的发布往往是需要编排的比如上游新增了一个字段需要先发 DWD 再发 ADS搞反了会导致发布失败因此需要有一定的编排策略当然可以根据自己的实际情况看下是否保留这一步权限审批未实现真实的发布是需要走审批流程的示例中为了简单没有实现这一环读者可以根据自己的实际情况接入公司内部的审批系统部署线上审批通过后将 SQL Script 部署到线上当然可能涉及到重启的方式示例中为了简单没有实现这一环示例中我们将 PR 作为触发 CI 的时机用户提交了 PR 且涉及到了.sql文件的变更则触发 CI 流程。我们将合并作为触发 CD 的流程用户合并了 PR 且合并到了main分支则触发 CD 流程。当然CD 流程只是为了演示整体的链路。实际 CD 流程可能涉及到权限、审批流、部署顺序、部署时间、部署方式等问题读者可以根据自己的实际情况进行调整。整条流水线由 4 个脚本和 1 个.gitlab-ci.yml驱动。流水线脚本脚本做了什么方式check-warehouse-naming.sh检查{层级}_{业务}_{后缀}.sql三段式命名合规性全量扫描warehouse/validate-sql.shFlink SQL 语法 语义校验表是否存在、字段是否在、类型是否匹配增量git diffCI 下自动获取变更文件generate-deploy-pipeline.sh检测两次 commit 间的 SQL 变更调用 Python 编排脚本增量build-deploy-order.py从文件名解析层级按dwd → dws → ads排序输出child-pipeline.yml被 shell 脚本调用核心校验命令只有一条不连 Flink 集群2 秒出结果$FLINK_HOME/bin/flink run --target local $BOOTSTRAP_JAR \--script-file file://sql文件 --validate部署则是两步串联generate-deploy-pipeline.sh检测变更 →build-deploy-order.py按层级排序、生成子流水线。子流水线通过 GitLab 的triggerartifact机制按 stage 顺序执行--catalog-file注入生产环境表结构。流水线配置stages:- validate- rule-check- deploywarehouse-naming-check: # 全量命名规范检查stage: rule-checkscript: bash scripts/check-warehouse-naming.shrules:- if: $CI_PIPELINE_SOURCE merge_request_event # MR 触发changes: [example-cicd/warehouse/**/*.sql]- if: $CI_COMMIT_BRANCH main # main 触发changes: [example-cicd/warehouse/**/*.sql]flink-sql-validate: # 增量语法校验stage: validatescript: bash scripts/validate-sql.shrules: # 同上MR 和 main 都触发generate-deploy-pipeline: # 生成部署子流水线仅 mainstage: deployscript: bash scripts/generate-deploy-pipeline.shartifacts: [example-cicd/child-pipeline.yml]rules:- if: $CI_COMMIT_BRANCH mainchanges: [example-cicd/warehouse/**/*.sql]deploy-jobs: # 触发子流水线仅 mainstage: deployneeds: [generate-deploy-pipeline]trigger:include:- artifact: example-cicd/child-pipeline.ymljob: generate-deploy-pipelinestrategy: dependrules: # 同 generate-deploy-pipeline几个关键设计changes:确保只有 SQL 变更才触发改脚本、文档不跑流水线MR 只跑 CI规范检查 语法校验CD 仅 main 分支触发strategy: depend保证子流水线挂了父流水线也标红本地调试所有脚本都脱离 CI 环境变量独立可跑# 校验单个文件bash scripts/validate-sql.sh --file warehouse/orders/dwd_orders_di.sql# 模拟增量校验CI_COMMIT_BEFORE_SHAHEAD~2 CI_COMMIT_SHAHEAD bash scripts/validate-sql.sh# 手动生成部署子流水线bash scripts/generate-deploy-pipeline.sh --from HEAD~2 --to HEAD底层原理拆解SQL 的多语句切分机制已在 上一篇阶段一智能切分中详细讨论——六种状
Flink 实时数仓开发实战:像后端那样 CI/CD
概览第一篇我们解决了怎么写——一条flink run跑起完整的 Multi-Statement SQL 脚本。这一篇解决怎么管让 Flink SQL 作业的研发流程具备和 Java 后端同样的工程能力——可检测、可追溯、可回滚、自动化。本文将深入Flink SQL Validate的底层原理Calcite 解析、验证这也是各个大厂内部实时研发流程中最基础、最重要的一环让你不仅知道怎么用更理解为什么它能在不连 Flink 集群的情况下精确校验语法。像上一篇 Flink 实时数仓开发实战像 Hive 那样用 Flink SQL 一样本文也提供了一个示例项目 Flink SQL Bootstrap Examples - CI/CD 帮助大家快速搭建本地环境并一步步演示 CI/CD 流水线。为什么需要 CI/CDFlink SQL 及 CI/CD 的引入可以从以下四个方面大大提升研发效率代码量技术栈门槛维护成本迭代效率这也是为什么大厂普遍使用 Flink SQL 作为实时研发的核心原因。并且 CI/CD 它还保障了四个核心能力能力说明可检测编译不过不能合、单测不过不能上线——机器过滤低级错误人专注逻辑可追溯谁改的、什么时候改的、为什么改——git log pipeline 记录一条链路可回滚出了事不至于手忙脚乱重新部署上一个版本几分钟恢复自动化从提交到上线不需要人做机器能做的事这些能力虽然在服务端已经是家常便饭但是对于数据开发这些能力只有大厂内部高度集成的研发平台才能提供。但这些能力往往是最基础、最核心的研发流程它的缺失就像一个雷不知道什么时候、在哪里就会炸一下。接下来我们将以flink-sql-bootstrap Gitlab为样板提供一种轻量、可靠的方案基于公司现有的服务端 CI/CD 流程可能是 Gitlab Runner 或 Jenkins搭建大数据自己的实时研发 CI/CD 流程。快速开始示例中将非常多繁琐的工作都帮读者做好了一个命令就能够完成所有环境的安装、配置甚至 Gitlab ssh 的创建和配置cd example-cicd/dockerbash setup.sh这个脚本自动完成 8 件事步骤说明1. 检查 Docker Docker Compose前置依赖校验2. 构建flink-ci-runner:1.20.4CI Runner 镜像Flink 1.20.4 Python3 git3. 启动 GitLab CE Runner两个容器桥接网络4. 预拉取 Runner Helper 镜像防止 CI job 因网络问题拉取超时5. 生成/上传 SSH 公钥到 GitLab免密推送代码6. 在 GitLab 上创建项目仓库API 自动创建不需要手动操作7. 创建并注册 Project RunnerPOST /api/v4/user/runners注册8. 配置本地 git remotegitlabssh://gitlocalhost:2224一键推送环境就绪后打开http://localhost:8929用户名root密码flink1234就能看到 GitLab 面板。随便改个 SQL 文件推送代码即可触发流水线git push gitlab main流水线设计架构总览示例中搭建的 CI/CD 流水线分为了以下几个阶段规范检查我们的数仓有一些必要的规范命名规范、SQL 语法规范等等这一步是对 SQL 代码是否满足数仓规范的检查SQL校验基于 Flink 内置的 Calcite 解析、验证能力对提交的 SQL 代码进行语法检查、语义检查语法检查检查 SQL 语法是否符合 Flink SQL 语法规范语义检查解析 SQL 查询的 Catalog解析表名、字段名、函数名这些 Catalog 中都有吗引用的对吗类型对吗动态编排SQL Script 的发布往往是需要编排的比如上游新增了一个字段需要先发 DWD 再发 ADS搞反了会导致发布失败因此需要有一定的编排策略当然可以根据自己的实际情况看下是否保留这一步权限审批未实现真实的发布是需要走审批流程的示例中为了简单没有实现这一环读者可以根据自己的实际情况接入公司内部的审批系统部署线上审批通过后将 SQL Script 部署到线上当然可能涉及到重启的方式示例中为了简单没有实现这一环示例中我们将 PR 作为触发 CI 的时机用户提交了 PR 且涉及到了.sql文件的变更则触发 CI 流程。我们将合并作为触发 CD 的流程用户合并了 PR 且合并到了main分支则触发 CD 流程。当然CD 流程只是为了演示整体的链路。实际 CD 流程可能涉及到权限、审批流、部署顺序、部署时间、部署方式等问题读者可以根据自己的实际情况进行调整。整条流水线由 4 个脚本和 1 个.gitlab-ci.yml驱动。流水线脚本脚本做了什么方式check-warehouse-naming.sh检查{层级}_{业务}_{后缀}.sql三段式命名合规性全量扫描warehouse/validate-sql.shFlink SQL 语法 语义校验表是否存在、字段是否在、类型是否匹配增量git diffCI 下自动获取变更文件generate-deploy-pipeline.sh检测两次 commit 间的 SQL 变更调用 Python 编排脚本增量build-deploy-order.py从文件名解析层级按dwd → dws → ads排序输出child-pipeline.yml被 shell 脚本调用核心校验命令只有一条不连 Flink 集群2 秒出结果$FLINK_HOME/bin/flink run --target local $BOOTSTRAP_JAR \--script-file file://sql文件 --validate部署则是两步串联generate-deploy-pipeline.sh检测变更 →build-deploy-order.py按层级排序、生成子流水线。子流水线通过 GitLab 的triggerartifact机制按 stage 顺序执行--catalog-file注入生产环境表结构。流水线配置stages:- validate- rule-check- deploywarehouse-naming-check: # 全量命名规范检查stage: rule-checkscript: bash scripts/check-warehouse-naming.shrules:- if: $CI_PIPELINE_SOURCE merge_request_event # MR 触发changes: [example-cicd/warehouse/**/*.sql]- if: $CI_COMMIT_BRANCH main # main 触发changes: [example-cicd/warehouse/**/*.sql]flink-sql-validate: # 增量语法校验stage: validatescript: bash scripts/validate-sql.shrules: # 同上MR 和 main 都触发generate-deploy-pipeline: # 生成部署子流水线仅 mainstage: deployscript: bash scripts/generate-deploy-pipeline.shartifacts: [example-cicd/child-pipeline.yml]rules:- if: $CI_COMMIT_BRANCH mainchanges: [example-cicd/warehouse/**/*.sql]deploy-jobs: # 触发子流水线仅 mainstage: deployneeds: [generate-deploy-pipeline]trigger:include:- artifact: example-cicd/child-pipeline.ymljob: generate-deploy-pipelinestrategy: dependrules: # 同 generate-deploy-pipeline几个关键设计changes:确保只有 SQL 变更才触发改脚本、文档不跑流水线MR 只跑 CI规范检查 语法校验CD 仅 main 分支触发strategy: depend保证子流水线挂了父流水线也标红本地调试所有脚本都脱离 CI 环境变量独立可跑# 校验单个文件bash scripts/validate-sql.sh --file warehouse/orders/dwd_orders_di.sql# 模拟增量校验CI_COMMIT_BEFORE_SHAHEAD~2 CI_COMMIT_SHAHEAD bash scripts/validate-sql.sh# 手动生成部署子流水线bash scripts/generate-deploy-pipeline.sh --from HEAD~2 --to HEAD底层原理拆解SQL 的多语句切分机制已在 上一篇阶段一智能切分中详细讨论——六种状