一文读懂DolphinScheduler插件机制:如何轻松扩展任务类型与数据源

一文读懂DolphinScheduler插件机制:如何轻松扩展任务类型与数据源 一、整体架构概览DolphinScheduler 的插件体系基于 Java SPIService Provider Interface 机制配合 Google AutoService 自动生成注册文件实现了零侵入的插件化扩展dolphinscheduler-spi ← 核心接口层SPI基础设施 dolphinscheduler-datasource-plugin ← 数据源插件层 dolphinscheduler-task-plugin ← Task插件层 dolphinscheduler-worker ← 插件消费层Worker执行任务二、SPI 基础设施PrioritySPI (接口) └── getIdentify(): SPIIdentify ← 插件唯一标识 优先级 └── compareTo(Integer) ← 优先级比较PrioritySPIFactory 是插件发现的核心引擎通过 Java 标准 ServiceLoader 扫描 classpath for (T t : ServiceLoader.load(spiClass)) { if (map.containsKey(t.getIdentify().getName())) { resolveConflict(t); // 同名插件按优先级决策优先级相同则抛异常 } else { map.put(t.getIdentify().getName(), t); } }插件注册方式每个插件模块使用 AutoService 注解编译时自动在 META-INF/services/ 下生成 SPI 配置文件无需手动维护三、数据源插件详细流程3.1 接口层次结构DataSourceChannelFactory (SPI入口) └── getName() ← 插件唯一名称如 MYSQL └── create() ← 创建 DataSourceChannel DataSourceChannel (通道) └── createAdHocDataSourceClient() ← 创建临时连接客户端 └── createPooledDataSourceClient() ← 创建连接池客户端 DataSourceClient (基础接口) └── getConnection(): Connection PooledDataSourceClient extends DataSourceClient └── createDataSourcePool() ← 创建 HikariCP 连接池3.2 以 MySQL 为例的完整实现链MySQLDataSourceChannelFactory ← AutoService 注册 └── create() → MySQLDataSourceChannel └── createPooledDataSourceClient() → MySQLPooledDataSourceClient └── extends BasePooledDataSourceClient └── createDataSourcePool() → HikariDataSource ├── setDriverClassName() ├── setJdbcUrl() ├── setUsername() / setPassword() ├── setMinimumIdle() / setMaximumPoolSize() └── setConnectionTestQuery()四、Task 插件详细流程4.1 接口层次结构TaskChannelFactory (SPI入口) extends UiChannelFactory, PrioritySPI └── getName() ← 插件类型名如 SHELL └── create() ← 创建 TaskChannel └── getParams() ← 返回 UI 配置参数前端渲染用 TaskChannel (通道) └── createTask(TaskExecutionContext) → AbstractTask └── parseParameters(ParametersNode) → AbstractParameters └── getResources(parameters) → ResourceParametersHelper └── cancelApplication(boolean) AbstractTask (任务执行基类) └── init() ← 初始化 └── handle(callback) ← 执行抽象 └── cancel() ← 取消抽象 └── getExitStatus() ← 根据 exitCode 返回状态4.2 以 Shell 为例的完整实现链ShellTaskChannelFactory ← AutoService 注册 └── getName() → SHELL └── getParams() → [nodeName, runFlag, ...] ← 前端UI参数 └── create() → ShellTaskChannel └── createTask(ctx) → ShellTask └── handle(callback) └── ShellCommandExecutor.run(shellActuatorBuilder) └── 执行 Shell 脚本进程五、两种插件的关键对