Kettle9.4(Pentaho Data Integration)调度PostgreSQL18存储过程或函数,在传入指定日期时优先指定日期,未传入指定日期默认T-1昨天

Kettle9.4(Pentaho Data Integration)调度PostgreSQL18存储过程或函数,在传入指定日期时优先指定日期,未传入指定日期默认T-1昨天 目录环境说明一、过程参数/函数参数1.1 参数说明1.2 参数示例 IN/OUT1.3 存储过程与函数对比二、调度结构2.1 转换结构如下2.2 作业结构如下三、存储过程与函数结构3.1 建表[3.1.1] d_run_log日志表[3.1.2] d_bsz_proc_batchdate函数和过程统一的目标表3.2 建存储过程[3.2.1] sp_d_run_log日志存储过程[3.2.2] sp_d_bsz_proc_batchdate调度存储过程3.3 建函数功能与过程一致[3.3.1] sp_d_bsz_func_batchdate调度函数四、Kettle(Pentaho Data Integration)配置4.1 转换4.1.1 获取变量4.1.2 JS脚本4.1.3 输出日志4.1.4 执行SQL-存储过程4.1.5 执行SQL-函数4.1.6 测试并保存转换4.2 作业4.2.1 设置变量4.2.2 设置转换4.2.3 测试执行环境说明Ubuntu24.04Kettle9.4pdi-ce-9.4.0.0-343.zipPostgreSQL18这里会分别按存储过程与函数来演示调用过程可以按需选择一、过程参数/函数参数在真实项目中为了日常跑批和管理通常过程和函数入参都是固定的昨天为日期参数为字符串格式例20260101月末跑批也是每月1号跑昨日上月末数据这种周期跑批可以由作业控制参数中仅为日期字符串即可场景一每天批量跑昨天数据场景二指定日期重跑数据逻辑口径紧急调整重跑场景三指定日期区间重跑数据历史逻辑口径有误重跑历史区间数据1.1 参数说明存储过程参数有两个日期入参和整型出参存储过程也可以改写为函数函数只有日期参数并对应一个整型返回值两者在执行语句上有细微差别call存储过程名select函数名1.2 参数示例 IN/OUT存储过程参数示例 入参p_i_date出参p_o_rtn CREATE OR REPLACE PROCEDURE sp_d_bsz_proc_batchdate(IN p_i_date character varying, OUT p_o_rtn integer)函数参数示例入参p_i_date CREATE OR REPLACE FUNCTION sp_d_bsz_proc_batchdate(IN p_i_date character varying)1.3 存储过程与函数对比特性函数FUNCTION存储过程PROCEDURE引入版本一直存在PostgreSQL 11返回值必须返回一个值单值、记录或集合可以不返回值通过 OUT 参数模拟返回调用方式SELECT function(...)或作为表达式的一部分CALL procedure(...)事务控制不允许包含COMMIT/ROLLBACK允许包含事务控制语句在 SQL 中使用可以在SELECT、WHERE、INSERT等语句中直接使用不能在 SQL 语句中直接使用输出参数支持OUT参数但通常用返回值支持INOUT/OUT参数常见用途计算并返回结果封装复杂查询逻辑执行数据修改、批量操作、需要事务控制的流程⚠️注意Kettle 的“调用 DB 存储过程”步骤默认使用函数调用语法SELECT procedure(...)PostgreSQL 引入的存储过程PROCEDURE必须使用 CALL procedure(...)来执行Kettle 的该步骤尚未适配需要用“执行SQL”步骤替代“调用 DB 存储过程”步骤。二、调度结构由作业设置变量传给转换 -- 转换获取变量 -- 转换处理变量传入参数 -- 转换执行函数和存储过程 -- 作业成功主要的执行过程在转换中完成2.1 转换结构如下2.2 作业结构如下三、存储过程与函数结构⚠️注意postgresql名称有大小写敏感请统一这里会创建一个统一的日志存储过程用来记录存储和函数的执行日志sp_d_run_log同时也便于排查报错问题如果存储或函数报错了用来看执行步骤到了第几步什么时候执行的存储过程和函数功能都一样改写一下的区别3.1 建表[3.1.1] d_run_log日志表-- DROP TABLE public.d_run_log; CREATE TABLE public.d_run_log ( run_time timestamp DEFAULT now() NULL, -- 运行时间记录日志插入的时间戳默认当前时间 prc_nm varchar(60) NOT NULL, -- 存储过程名 batch_date date NOT NULL, -- 批量日期业务日期用于区分不同批次的日志 step_no int4 NOT NULL, -- 执行步骤编号,1234,便于排序和定位 step_desc varchar(100) NULL, -- 执行步骤说明描述该步骤的具体操作或状态 step_type varchar(20) NOT NULL -- 步骤类型使用 START、END、INFO、ERROR 等约定值便于过滤分析 ); CREATE INDEX idx_run_log_batch_date ON public.d_run_log USING btree (batch_date); CREATE INDEX idx_run_log_run_time ON public.d_run_log USING btree (run_time); COMMENT ON TABLE public.d_run_log IS 运行日志表记录业务批处理过程中的步骤信息; -- Column comments COMMENT ON COLUMN public.d_run_log.run_time IS 运行时间记录日志插入的时间戳默认当前时间; COMMENT ON COLUMN public.d_run_log.prc_nm IS 存储过程名; COMMENT ON COLUMN public.d_run_log.batch_date IS 批量日期业务日期用于区分不同批次的日志; COMMENT ON COLUMN public.d_run_log.step_no IS 执行步骤编号,1234,便于排序和定位; COMMENT ON COLUMN public.d_run_log.step_desc IS 执行步骤说明描述该步骤的具体操作或状态; COMMENT ON COLUMN public.d_run_log.step_type IS 步骤类型使用 START、END、INFO、ERROR 等约定值便于过滤分析;[3.1.2] d_bsz_proc_batchdate函数和过程统一的目标表-- public.d_bsz_proc_batchdate definition -- Drop table -- DROP TABLE public.d_bsz_proc_batchdate; CREATE TABLE public.d_bsz_proc_batchdate ( prc_nm varchar(100) NOT NULL, -- 执行的存储过程名称 run_time timestamp DEFAULT now() NOT NULL, -- 执行时间插入时自动记录 batch_date date NOT NULL -- 业务日期 ); COMMENT ON TABLE public.d_bsz_proc_batchdate IS 存储过程执行记录表; -- Column comments COMMENT ON COLUMN public.d_bsz_proc_batchdate.prc_nm IS 执行的存储过程名称; COMMENT ON COLUMN public.d_bsz_proc_batchdate.run_time IS 执行时间插入时自动记录; COMMENT ON COLUMN public.d_bsz_proc_batchdate.batch_date IS 业务日期;3.2 建存储过程[3.2.1] sp_d_run_log日志存储过程被日常存储过程和函数调用记录执行日志-- DROP PROCEDURE public.sp_d_run_log(varchar, date, int4, varchar, varchar); CREATE OR REPLACE PROCEDURE public.sp_d_run_log(IN p_prc_nm character varying, IN p_batch_date date, IN p_step_no integer, IN p_step_desc character varying, IN p_step_type character varying) LANGUAGE plpgsql AS $procedure$ BEGIN INSERT INTO public.d_run_log (run_time,prc_nm, batch_date, step_no, step_desc, step_type) VALUES (now(),p_prc_nm, p_batch_date, p_step_no, p_step_desc, p_step_type); -- 可选提交事务如果在调用者的事务中已包含则无需此句 -- COMMIT; EXCEPTION WHEN OTHERS THEN -- 记录错误日志到标准输出或应用日志避免存储过程抛出异常影响主流程 RAISE WARNING Failed to insert run log: %, SQLERRM; END; $procedure$ ;[3.2.2] sp_d_bsz_proc_batchdate调度存储过程用来将本过程执行的执行时间和对应业务日期插入目标表功能与函数一致-- DROP PROCEDURE public.sp_d_bsz_proc_batchdate(in varchar, out int4); CREATE OR REPLACE PROCEDURE public.sp_d_bsz_proc_batchdate(IN p_i_date character varying, OUT p_o_rtn integer) LANGUAGE plpgsql AS $procedure$ DECLARE /*日志变量区域*/ V_DATE DATE; p_prc_nm VARCHAR(60); p_batch_date date; p_step_no int; p_step_desc VARCHAR(100); p_step_type VARCHAR(20); /**********************************************************************/ BEGIN -- 将字符串转换为日期格式 YYYYMMDD V_DATE : to_date(P_I_DATE, YYYYMMDD); p_prc_nm : sp_d_bsz_proc_batchdate; p_batch_date : V_DATE; p_step_no : 1; p_step_desc : 插入目标表; p_step_type : START; call sp_d_run_log(p_prc_nm, p_batch_date, p_step_no, p_step_desc, p_step_type); -- 插入记录 INSERT INTO public.d_bsz_proc_batchdate (prc_nm, batch_date, run_time) VALUES (sp_d_bsz_proc_batchdate, V_DATE, now()); -- 成功返回 0 P_O_RTN : 0; p_prc_nm : sp_d_bsz_proc_batchdate; p_batch_date : V_DATE; p_step_no : 99; p_step_desc : 运行成功; p_step_type : END; call sp_d_run_log(p_prc_nm, p_batch_date, p_step_no, p_step_desc, p_step_type); EXCEPTION WHEN OTHERS THEN -- 发生任何异常返回 1 P_O_RTN : 1; -- 可选的错误记录输出到控制台或日志 RAISE WARNING Failed to insert log for procedure %: %, p_prc_nm, SQLERRM; END; $procedure$ ;3.3 建函数功能与过程一致[3.3.1] sp_d_bsz_func_batchdate调度函数-- DROP FUNCTION public.sp_d_bsz_func_batchdate(varchar); CREATE OR REPLACE FUNCTION public.sp_d_bsz_func_batchdate(p_i_date character varying) RETURNS integer LANGUAGE plpgsql AS $function$ DECLARE /*日志变量区域*/ V_DATE DATE; p_prc_nm VARCHAR(60); p_batch_date date; p_step_no int; p_step_desc VARCHAR(100); p_step_type VARCHAR(20); /**********************************************************************/ BEGIN -- 将字符串转换为日期格式 YYYYMMDD V_DATE : to_date(P_I_DATE, YYYYMMDD); p_prc_nm : sp_d_bsz_func_batchdate; p_batch_date : V_DATE; p_step_no : 1; p_step_desc : 插入目标表; p_step_type : START; CALL sp_d_run_log(p_prc_nm, p_batch_date, p_step_no, p_step_desc, p_step_type); -- 插入记录 INSERT INTO public.d_bsz_proc_batchdate (prc_nm, batch_date, run_time) VALUES (sp_d_bsz_func_batchdate, V_DATE, now()); -- 成功返回 0 -- P_O_RTN : 0; -- 移除OUT参数 p_prc_nm : sp_d_bsz_func_batchdate; p_batch_date : V_DATE; p_step_no : 99; p_step_desc : 运行成功; p_step_type : END; CALL sp_d_run_log(p_prc_nm, p_batch_date, p_step_no, p_step_desc, p_step_type); RETURN 0; -- 显式返回成功值 EXCEPTION WHEN OTHERS THEN -- 发生任何异常返回 1 -- P_O_RTN : 1; RAISE WARNING Failed to insert log for func %: %, p_prc_nm, SQLERRM; RETURN 1; -- 显式返回失败值 END; $function$ ;四、Kettle(Pentaho Data Integration)配置4.1 转换4.1.1 获取变量由作业传入run_date参数变量res是针对存储过程用来获取存储过程的返回值函数不需要在界面空白处右键-转换设置命名参数-新增 run_date为了能在测试执行转换的时候指定日期4.1.2 JS脚本用来处理传入的run_date变量并输出p_i_date作为变量和参数//Script here //Script here // 获取传入的日期字符串参数可能为空 var dateStr run_date; // 来自“获取变量”步骤的字段 var res; var targetDate; if (dateStr null || dateStr.trim() ) { // 无参数默认使用昨天日期 targetDate new Date(); targetDate.setDate(targetDate.getDate() - 1); } else { // 解析传入的 yyyymmdd 格式字符串 var year parseInt(dateStr.substring(0, 4), 10); var month parseInt(dateStr.substring(4, 6), 10) - 1; // 月份从0开始 var day parseInt(dateStr.substring(6, 8), 10); targetDate new Date(year, month, day); } // 格式化为 yyyymmdd不使用 padStart var year targetDate.getFullYear(); var month targetDate.getMonth() 1; var day targetDate.getDate(); // 补零 var monthStr month 10 ? 0 month : month; var dayStr day 10 ? 0 day : day; var yesterdayStr year monthStr dayStr; setVariable(p_i_date, yesterdayStr, r);4.1.3 输出日志4.1.4 执行SQL-存储过程执行call语句call sp_d_bsz_proc_batchdate(?,?);表示参数需要在参数列表Bind绑定参数会按编号顺序绑定顺序不能乱下面的Bind parameters?一定要勾上要不然加不了参数参数p_i_date作为日期字符串的输入res作为存储过程的输出4.1.5 执行SQL-函数执行select语句select sp_d_bsz_func_batchdate(?) AS result;函数只有一个参数p_i_date作为日期字符串的输入4.1.6 测试并保存转换点击执行可以自定义日志级别可以选调试级别日志更加详细命名参数中可以在值位置指定业务日期也可以留空默认昨天日期点击启动日志中会打印各项参数信息确认是否符合自己的预期我这边run_date留空默认为昨天20260219无误转换完成查看目标表函数与存储过程都已入表转换完成4.2 作业4.2.1 设置变量拖入设置变量 run_date值 ${run_date}空白处右键-作业设置新增命名参数 run_date在测试执行作业时可以自定义指定日期4.2.2 设置转换拖入转换选择保存的转换文件4.2.3 测试执行选择起始位置 run_date可以按需选择是否要指定日期点击执行我这里指定日期 20251231 进行测试指定日期、传入参数无误查询目标表函数与存储过程插入正常最后在收尾拖入Start与成功步骤用来配置调度定时和结束标记Start步骤作业调度配置完成。