课程结构总览入门阶段 │ 第 1 章Spark 是什么 → 第 2 章环境搭建 → 第 3 章核心概念 RDD │ 基础阶段 │ 第 4 章整体架构 → 第 5 章Spark SQL → 第 6 章DAG 调度 → 第 7 章Shuffle │ 进阶阶段 │ 第 8 章Structured Streaming → 第 9 章性能调优 → 第 10 章MLlib │ 精通阶段 │ 第 11 章Delta Lake → 第 12 章生产部署 → 第 13 章Real-Time Mode 入门阶段第 1 章Spark 是什么为什么需要它1.1 大数据处理的历史困境在 Spark 出现之前Hadoop MapReduce 是大数据处理的主流方案但它有一个致命弱点每次计算都要把中间结果写到 HDFS 磁盘导致迭代计算如机器学习极其缓慢。Spark 的核心突破是把中间数据放在内存里避免反复的磁盘 I/O。1.2 Spark 是什么Apache Spark 是一个统一的、分布式的大数据计算引擎支持批处理Batch Processing交互式查询Spark SQL流处理Structured Streaming机器学习MLlib图计算GraphX1.3 Spark 生态全景图1.4 Spark vs Hadoop MapReduce 对比对比项Hadoop MapReduceApache Spark计算模型磁盘迭代内存计算速度慢大量磁盘 I/O快 10~100 倍编程模型Map Reduce固定丰富的算子 API流处理不支持Structured Streaming机器学习Mahout弱MLlib强容错方式数据复制血统Lineage第 2 章环境搭建2.1 安装方式选择对于初学者推荐按以下顺序尝试本地模式Local Mode— 最简单单机运行适合学习和调试Docker— 一键启动环境隔离Databricks Community Edition— 免费云端环境无需安装2.2 本地模式安装步骤# 1. 安装 Java 11java-version# 2. 下载 Spark以 3.5.x 为例wgethttps://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgztar-xzfspark-3.5.0-bin-hadoop3.tgz# 3. 配置环境变量exportSPARK_HOME/path/to/spark-3.5.0-bin-hadoop3exportPATH$PATH:$SPARK_HOME/bin# 4. 启动 PySpark 交互式 Shellpyspark--masterlocal[*]2.3 第一个 Spark 程序frompyspark.sqlimportSparkSession# 创建 SparkSession入口sparkSparkSession.builder \.appName(HelloSpark)\.master(local[*])\.getOrCreate()# 读取数据dfspark.read.csv(data.csv,headerTrue,inferSchemaTrue)# 转换与查询resultdf.filter(df[age]18).groupBy(city).count()# 展示结果result.show()# 关闭spark.stop()2.4 Spark 模式对比第 3 章核心概念 — RDD、DataFrame、Dataset3.1 三个核心数据抽象Spark 有三种数据表示方式理解它们的关系是入门的关键3.2 RDD 的两类算子RDD 操作分为两类这是理解 Spark 懒执行的关键Transformation转换map、filter、groupByKey、join等返回新 RDD不立即执行只记录操作Action动作collect、count、save、show等触发实际计算# Transformation懒执行不触发计算rdd2rdd.filter(lambdax:x0)# 记录操作rdd3rdd2.map(lambdax:x*2)# 继续记录# Action触发计算DAG 开始执行resultrdd3.collect()# 此刻才真正运行3.3 RDD 的两种依赖关系 基础阶段第 4 章Spark 整体架构4.1 核心角色Spark 集群由以下角色组成Driver运行用户代码的主程序负责创建 SparkContext、构建 DAG、提交 JobCluster Manager负责整个集群的资源分配YARN/K8s/StandaloneWorker集群中的工作节点上面运行 Executor 进程ExecutorWorker 节点上的 JVM 进程负责执行 Task存储 RDD 缓存TaskExecutor 内的线程处理一个 Partition 的数据4.2 Spark 集群架构图第 5 章Spark SQL 与 DataFrame 编程5.1 DataFrame 常用操作frompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimportcol,avg,count,when sparkSparkSession.builder.appName(SQL Demo).getOrCreate()# 读取数据dfspark.read.parquet(s3://bucket/sales/)# 选择列df.select(name,amount,city)# 过滤df.filter(col(amount)1000)# 分组聚合df.groupBy(city).agg(avg(amount).alias(avg_amount),count(*).alias(total_orders))# 排序df.orderBy(col(avg_amount).desc())# 使用 SQLdf.createOrReplaceTempView(sales)spark.sql( SELECT city, AVG(amount) as avg_amount FROM sales WHERE amount 1000 GROUP BY city ORDER BY avg_amount DESC ).show()5.2 Catalyst 查询优化器Spark SQL 的核心优化器 Catalyst 将 SQL / DataFrame 操作转换为高效的物理执行计划第 6 章DAG 与 Job / Stage / Task 调度6.1 执行单元层级在 Spark 中一次程序运行包含三个层级的执行单元Job一次 Action 触发一个 Job如df.count()StageJob 按宽依赖Shuffle边界切分为多个 StageStage 内部是流水线执行Task一个 Stage 内每个 Partition 对应一个 TaskTask 是最小执行单元6.2 DAG 调度流程第 7 章Shuffle 机制详解7.1 什么是 ShuffleShuffle 是 Spark 中最昂贵的操作。当数据需要跨节点重新分区如groupBy、join就必须通过网络传输数据这个过程称为 Shuffle。7.2 Shuffle 写出与读取流程7.3 减少 Shuffle 的常用技巧使用reduceByKey代替groupByKey前者在 Map 端做预聚合开启 AQEAdaptive Query Execution自动合并小 Partition使用 Broadcast Join 避免大表 Shuffle 进阶阶段第 8 章Structured Streaming 流处理8.1 核心思想将流视为无界表Structured Streaming 的革命性思想把实时数据流看做一张不断追加行的无界表用和批处理完全相同的 SQL / DataFrame API 来写流处理代码。8.2 执行模型全览8.3 一个完整的流处理示例frompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimportwindow,count sparkSparkSession.builder.appName(StreamDemo).getOrCreate()# 读取 Kafka 流dfspark.readStream \.format(kafka)\.option(kafka.bootstrap.servers,localhost:9092)\.option(subscribe,orders)\.load()# 解析 JSON 数据frompyspark.sql.functionsimportfrom_json,colfrompyspark.sql.typesimportStructType,StringType,DoubleType schemaStructType().add(city,StringType()).add(amount,DoubleType())ordersdf.select(from_json(col(value).cast(string),schema).alias(data)).select(data.*)# 每 1 分钟统计各城市订单金额resultorders \.withWatermark(timestamp,10 minutes)\.groupBy(window(timestamp,1 minute),city)\.agg(count(*).alias(order_count))# 写出到 KafkaUpdate 模式queryresult.writeStream \.format(kafka)\.option(kafka.bootstrap.servers,localhost:9092)\.option(topic,order_stats)\.outputMode(update)\.start()query.awaitTermination()第 9 章性能调优9.1 性能调优全景图9.2 关键配置参数sparkSparkSession.builder \.config(spark.executor.memory,8g)\.config(spark.executor.cores,4)\.config(spark.sql.shuffle.partitions,200)# 默认 200按数据量调整.config(spark.sql.adaptive.enabled,true)# 开启 AQE.config(spark.sql.adaptive.skewJoin.enabled,true)# AQE 自动处理倾斜.config(spark.serializer,org.apache.spark.serializer.KryoSerializer).getOrCreate()# Broadcast Join小表 10MB 自动广播spark.conf.set(spark.sql.autoBroadcastJoinThreshold,10*1024*1024)# 手动指定 Broadcastfrompyspark.sql.functionsimportbroadcast resultlarge_df.join(broadcast(small_df),id)第 10 章MLlib 机器学习10.1 MLlib PipelineSpark MLlib 使用 Pipeline 将数据预处理和模型训练串联frompyspark.mlimportPipelinefrompyspark.ml.featureimportVectorAssembler,StandardScaler,StringIndexerfrompyspark.ml.classificationimportRandomForestClassifierfrompyspark.ml.evaluationimportBinaryClassificationEvaluator# 1. 特征工程indexerStringIndexer(inputColcity,outputColcity_idx)assemblerVectorAssembler(inputCols[age,amount,city_idx],outputColfeatures)scalerStandardScaler(inputColfeatures,outputColscaled_features)# 2. 模型rfRandomForestClassifier(featuresColscaled_features,labelCollabel,numTrees100)# 3. Pipeline 串联pipelinePipeline(stages[indexer,assembler,scaler,rf])# 4. 训练train_df,test_dfdf.randomSplit([0.8,0.2])modelpipeline.fit(train_df)# 5. 预测 评估predictionsmodel.transform(test_df)evaluatorBinaryClassificationEvaluator(labelCollabel)print(fAUC:{evaluator.evaluate(predictions):.4f}) 精通阶段第 11 章Delta Lake 与 Lakehouse 架构11.1 Delta Lake 解决什么问题传统数据湖S3 / HDFS 上的 Parquet 文件存在三大痛点数据不一致写了一半宕机、无法更新删除Upsert、读取慢大量小文件。Delta Lake 在 Parquet 上加了一个事务日志Transaction Log解决了以上所有问题。11.2 Delta Lake 架构图11.3 Delta Lake 核心能力fromdelta.tablesimportDeltaTable# ACID 写入df.write.format(delta).mode(overwrite).save(/delta/orders)# UpsertMerge Intodelta_tableDeltaTable.forPath(spark,/delta/orders)delta_table.alias(target).merge(updates_df.alias(source),target.order_id source.order_id).whenMatchedUpdateAll()\.whenNotMatchedInsertAll()\.execute()# Time Travel数据回溯df_v0spark.read.format(delta).option(versionAsOf,0).load(/delta/orders)df_yesterdayspark.read.format(delta).option(timestampAsOf,2024-01-01).load(/delta/orders)# Schema Evolution自动演化df.write.format(delta).option(mergeSchema,true).mode(append).save(/delta/orders)第 12 章生产部署与运维12.1 提交作业到集群# 提交到 YARN 集群spark-submit\--masteryarn\--deploy-mode cluster\--executor-memory 8g\--executor-cores4\--num-executors20\--confspark.sql.adaptive.enabledtrue\--confspark.serializerorg.apache.spark.serializer.KryoSerializer\my_spark_job.py# 提交到 Kubernetesspark-submit\--masterk8s://https://k8s-master:6443\--deploy-mode cluster\--confspark.kubernetes.container.imagespark:3.5.0\my_spark_job.py12.2 监控与排障Spark UI默认http://driver-host:4040查看 Jobs / Stages / Tasks / SQL 执行计划History Server查看已完成作业的历史日志常见问题排查OOM → 增加 Executor 内存或减少 Partition 大小数据倾斜 → 开启 AQE 或加盐GC 频繁 → 开启堆外内存12.3 生产架构推荐第 13 章Spark Real-Time Mode — 毫秒级实时计算13.1 为什么需要 RTMSpark 4.1 引入 Real-Time Mode通过三大技术创新实现毫秒级延迟长周期 Epoch 连续数据流Checkpoint 开销被摊薄Stage 并发执行Reducer 无需等待 Mapper 全部完成非阻塞算子数据流过即处理无需等待13.2 如何开启 RTM# 方式一通过 Trigger 配置querydf.writeStream \.trigger(processingTime0 seconds)# 或使用 Trigger.Continuous.format(delta)\.start()# 方式二Databricks 平台开启 RTMspark.conf.set(spark.databricks.streaming.realtime.enabled,true)13.3 RTM 适用场景场景延迟要求推荐方案实时特征工程100ms 级Spark RTM交易欺诈检测100ms 级Spark RTM旅行实时定价100ms 级Spark RTM高频交易风控 10msApache Flink批量 ETL 管道秒级Spark 微批次附录学习资源推荐资源说明Apache Spark 官方文档最权威的参考资料Databricks Learning免费在线课程《Learning Spark》第二版O’Reilly 出版官方推荐书籍《Spark: The Definitive Guide》深度原理讲解Databricks Community Edition免费云端练习环境本指南基于 Apache Spark 3.5 / 4.1持续更新中。
Apache Spark 从入门到精通:完整学习指南
课程结构总览入门阶段 │ 第 1 章Spark 是什么 → 第 2 章环境搭建 → 第 3 章核心概念 RDD │ 基础阶段 │ 第 4 章整体架构 → 第 5 章Spark SQL → 第 6 章DAG 调度 → 第 7 章Shuffle │ 进阶阶段 │ 第 8 章Structured Streaming → 第 9 章性能调优 → 第 10 章MLlib │ 精通阶段 │ 第 11 章Delta Lake → 第 12 章生产部署 → 第 13 章Real-Time Mode 入门阶段第 1 章Spark 是什么为什么需要它1.1 大数据处理的历史困境在 Spark 出现之前Hadoop MapReduce 是大数据处理的主流方案但它有一个致命弱点每次计算都要把中间结果写到 HDFS 磁盘导致迭代计算如机器学习极其缓慢。Spark 的核心突破是把中间数据放在内存里避免反复的磁盘 I/O。1.2 Spark 是什么Apache Spark 是一个统一的、分布式的大数据计算引擎支持批处理Batch Processing交互式查询Spark SQL流处理Structured Streaming机器学习MLlib图计算GraphX1.3 Spark 生态全景图1.4 Spark vs Hadoop MapReduce 对比对比项Hadoop MapReduceApache Spark计算模型磁盘迭代内存计算速度慢大量磁盘 I/O快 10~100 倍编程模型Map Reduce固定丰富的算子 API流处理不支持Structured Streaming机器学习Mahout弱MLlib强容错方式数据复制血统Lineage第 2 章环境搭建2.1 安装方式选择对于初学者推荐按以下顺序尝试本地模式Local Mode— 最简单单机运行适合学习和调试Docker— 一键启动环境隔离Databricks Community Edition— 免费云端环境无需安装2.2 本地模式安装步骤# 1. 安装 Java 11java-version# 2. 下载 Spark以 3.5.x 为例wgethttps://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgztar-xzfspark-3.5.0-bin-hadoop3.tgz# 3. 配置环境变量exportSPARK_HOME/path/to/spark-3.5.0-bin-hadoop3exportPATH$PATH:$SPARK_HOME/bin# 4. 启动 PySpark 交互式 Shellpyspark--masterlocal[*]2.3 第一个 Spark 程序frompyspark.sqlimportSparkSession# 创建 SparkSession入口sparkSparkSession.builder \.appName(HelloSpark)\.master(local[*])\.getOrCreate()# 读取数据dfspark.read.csv(data.csv,headerTrue,inferSchemaTrue)# 转换与查询resultdf.filter(df[age]18).groupBy(city).count()# 展示结果result.show()# 关闭spark.stop()2.4 Spark 模式对比第 3 章核心概念 — RDD、DataFrame、Dataset3.1 三个核心数据抽象Spark 有三种数据表示方式理解它们的关系是入门的关键3.2 RDD 的两类算子RDD 操作分为两类这是理解 Spark 懒执行的关键Transformation转换map、filter、groupByKey、join等返回新 RDD不立即执行只记录操作Action动作collect、count、save、show等触发实际计算# Transformation懒执行不触发计算rdd2rdd.filter(lambdax:x0)# 记录操作rdd3rdd2.map(lambdax:x*2)# 继续记录# Action触发计算DAG 开始执行resultrdd3.collect()# 此刻才真正运行3.3 RDD 的两种依赖关系 基础阶段第 4 章Spark 整体架构4.1 核心角色Spark 集群由以下角色组成Driver运行用户代码的主程序负责创建 SparkContext、构建 DAG、提交 JobCluster Manager负责整个集群的资源分配YARN/K8s/StandaloneWorker集群中的工作节点上面运行 Executor 进程ExecutorWorker 节点上的 JVM 进程负责执行 Task存储 RDD 缓存TaskExecutor 内的线程处理一个 Partition 的数据4.2 Spark 集群架构图第 5 章Spark SQL 与 DataFrame 编程5.1 DataFrame 常用操作frompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimportcol,avg,count,when sparkSparkSession.builder.appName(SQL Demo).getOrCreate()# 读取数据dfspark.read.parquet(s3://bucket/sales/)# 选择列df.select(name,amount,city)# 过滤df.filter(col(amount)1000)# 分组聚合df.groupBy(city).agg(avg(amount).alias(avg_amount),count(*).alias(total_orders))# 排序df.orderBy(col(avg_amount).desc())# 使用 SQLdf.createOrReplaceTempView(sales)spark.sql( SELECT city, AVG(amount) as avg_amount FROM sales WHERE amount 1000 GROUP BY city ORDER BY avg_amount DESC ).show()5.2 Catalyst 查询优化器Spark SQL 的核心优化器 Catalyst 将 SQL / DataFrame 操作转换为高效的物理执行计划第 6 章DAG 与 Job / Stage / Task 调度6.1 执行单元层级在 Spark 中一次程序运行包含三个层级的执行单元Job一次 Action 触发一个 Job如df.count()StageJob 按宽依赖Shuffle边界切分为多个 StageStage 内部是流水线执行Task一个 Stage 内每个 Partition 对应一个 TaskTask 是最小执行单元6.2 DAG 调度流程第 7 章Shuffle 机制详解7.1 什么是 ShuffleShuffle 是 Spark 中最昂贵的操作。当数据需要跨节点重新分区如groupBy、join就必须通过网络传输数据这个过程称为 Shuffle。7.2 Shuffle 写出与读取流程7.3 减少 Shuffle 的常用技巧使用reduceByKey代替groupByKey前者在 Map 端做预聚合开启 AQEAdaptive Query Execution自动合并小 Partition使用 Broadcast Join 避免大表 Shuffle 进阶阶段第 8 章Structured Streaming 流处理8.1 核心思想将流视为无界表Structured Streaming 的革命性思想把实时数据流看做一张不断追加行的无界表用和批处理完全相同的 SQL / DataFrame API 来写流处理代码。8.2 执行模型全览8.3 一个完整的流处理示例frompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimportwindow,count sparkSparkSession.builder.appName(StreamDemo).getOrCreate()# 读取 Kafka 流dfspark.readStream \.format(kafka)\.option(kafka.bootstrap.servers,localhost:9092)\.option(subscribe,orders)\.load()# 解析 JSON 数据frompyspark.sql.functionsimportfrom_json,colfrompyspark.sql.typesimportStructType,StringType,DoubleType schemaStructType().add(city,StringType()).add(amount,DoubleType())ordersdf.select(from_json(col(value).cast(string),schema).alias(data)).select(data.*)# 每 1 分钟统计各城市订单金额resultorders \.withWatermark(timestamp,10 minutes)\.groupBy(window(timestamp,1 minute),city)\.agg(count(*).alias(order_count))# 写出到 KafkaUpdate 模式queryresult.writeStream \.format(kafka)\.option(kafka.bootstrap.servers,localhost:9092)\.option(topic,order_stats)\.outputMode(update)\.start()query.awaitTermination()第 9 章性能调优9.1 性能调优全景图9.2 关键配置参数sparkSparkSession.builder \.config(spark.executor.memory,8g)\.config(spark.executor.cores,4)\.config(spark.sql.shuffle.partitions,200)# 默认 200按数据量调整.config(spark.sql.adaptive.enabled,true)# 开启 AQE.config(spark.sql.adaptive.skewJoin.enabled,true)# AQE 自动处理倾斜.config(spark.serializer,org.apache.spark.serializer.KryoSerializer).getOrCreate()# Broadcast Join小表 10MB 自动广播spark.conf.set(spark.sql.autoBroadcastJoinThreshold,10*1024*1024)# 手动指定 Broadcastfrompyspark.sql.functionsimportbroadcast resultlarge_df.join(broadcast(small_df),id)第 10 章MLlib 机器学习10.1 MLlib PipelineSpark MLlib 使用 Pipeline 将数据预处理和模型训练串联frompyspark.mlimportPipelinefrompyspark.ml.featureimportVectorAssembler,StandardScaler,StringIndexerfrompyspark.ml.classificationimportRandomForestClassifierfrompyspark.ml.evaluationimportBinaryClassificationEvaluator# 1. 特征工程indexerStringIndexer(inputColcity,outputColcity_idx)assemblerVectorAssembler(inputCols[age,amount,city_idx],outputColfeatures)scalerStandardScaler(inputColfeatures,outputColscaled_features)# 2. 模型rfRandomForestClassifier(featuresColscaled_features,labelCollabel,numTrees100)# 3. Pipeline 串联pipelinePipeline(stages[indexer,assembler,scaler,rf])# 4. 训练train_df,test_dfdf.randomSplit([0.8,0.2])modelpipeline.fit(train_df)# 5. 预测 评估predictionsmodel.transform(test_df)evaluatorBinaryClassificationEvaluator(labelCollabel)print(fAUC:{evaluator.evaluate(predictions):.4f}) 精通阶段第 11 章Delta Lake 与 Lakehouse 架构11.1 Delta Lake 解决什么问题传统数据湖S3 / HDFS 上的 Parquet 文件存在三大痛点数据不一致写了一半宕机、无法更新删除Upsert、读取慢大量小文件。Delta Lake 在 Parquet 上加了一个事务日志Transaction Log解决了以上所有问题。11.2 Delta Lake 架构图11.3 Delta Lake 核心能力fromdelta.tablesimportDeltaTable# ACID 写入df.write.format(delta).mode(overwrite).save(/delta/orders)# UpsertMerge Intodelta_tableDeltaTable.forPath(spark,/delta/orders)delta_table.alias(target).merge(updates_df.alias(source),target.order_id source.order_id).whenMatchedUpdateAll()\.whenNotMatchedInsertAll()\.execute()# Time Travel数据回溯df_v0spark.read.format(delta).option(versionAsOf,0).load(/delta/orders)df_yesterdayspark.read.format(delta).option(timestampAsOf,2024-01-01).load(/delta/orders)# Schema Evolution自动演化df.write.format(delta).option(mergeSchema,true).mode(append).save(/delta/orders)第 12 章生产部署与运维12.1 提交作业到集群# 提交到 YARN 集群spark-submit\--masteryarn\--deploy-mode cluster\--executor-memory 8g\--executor-cores4\--num-executors20\--confspark.sql.adaptive.enabledtrue\--confspark.serializerorg.apache.spark.serializer.KryoSerializer\my_spark_job.py# 提交到 Kubernetesspark-submit\--masterk8s://https://k8s-master:6443\--deploy-mode cluster\--confspark.kubernetes.container.imagespark:3.5.0\my_spark_job.py12.2 监控与排障Spark UI默认http://driver-host:4040查看 Jobs / Stages / Tasks / SQL 执行计划History Server查看已完成作业的历史日志常见问题排查OOM → 增加 Executor 内存或减少 Partition 大小数据倾斜 → 开启 AQE 或加盐GC 频繁 → 开启堆外内存12.3 生产架构推荐第 13 章Spark Real-Time Mode — 毫秒级实时计算13.1 为什么需要 RTMSpark 4.1 引入 Real-Time Mode通过三大技术创新实现毫秒级延迟长周期 Epoch 连续数据流Checkpoint 开销被摊薄Stage 并发执行Reducer 无需等待 Mapper 全部完成非阻塞算子数据流过即处理无需等待13.2 如何开启 RTM# 方式一通过 Trigger 配置querydf.writeStream \.trigger(processingTime0 seconds)# 或使用 Trigger.Continuous.format(delta)\.start()# 方式二Databricks 平台开启 RTMspark.conf.set(spark.databricks.streaming.realtime.enabled,true)13.3 RTM 适用场景场景延迟要求推荐方案实时特征工程100ms 级Spark RTM交易欺诈检测100ms 级Spark RTM旅行实时定价100ms 级Spark RTM高频交易风控 10msApache Flink批量 ETL 管道秒级Spark 微批次附录学习资源推荐资源说明Apache Spark 官方文档最权威的参考资料Databricks Learning免费在线课程《Learning Spark》第二版O’Reilly 出版官方推荐书籍《Spark: The Definitive Guide》深度原理讲解Databricks Community Edition免费云端练习环境本指南基于 Apache Spark 3.5 / 4.1持续更新中。