摘要Dataset是Spark SQL中最高级的数据抽象兼具RDD的类型安全和DataFrame的Catalyst优化能力。本文深入讲解Dataset的创建方式createDataset、toDS、DataFrame转换系统梳理RDD、DataFrame、Dataset三者之间的相互转换方法并通过Dataset实现WordCount的实战案例帮助读者建立完整的Spark SQL数据抽象体系认知。一、Dataset概述1.1 为什么需要DatasetDataFrame的出现让Spark可以更好地处理结构化数据的计算但存在一个关键问题编译时的类型安全。DataFrame的类型安全问题// DataFrame是无类型的编译时无法检查字段名和类型valdfspark.read.json(people.json)df.select(nmae)// 字段名拼写错误编译通过运行时才报错df(age)abc// 类型不匹配编译通过运行时才报错Dataset的解决方案// Dataset是强类型的编译时即可发现错误caseclassPerson(name:String,age:Long,sex:String)valdsspark.read.json(people.json).as[Person]ds.map(_.nmae)// 编译报错字段名拼写错误被提前发现ds.map(_.ageabc)// 编译报错类型不匹配被提前发现1.2 Dataset的核心特性特性RDDDataFrameDataset类型安全✅ 编译时❌ 运行时✅ 编译时Catalyst优化❌✅✅Tungsten优化❌✅✅API风格函数式SQL DSL函数式 SQL序列化Java/KryoTungsten二进制Tungsten二进制适用场景非结构化数据结构化数据查询复杂类型数据处理1.3 Spark 2.0中的关系在Spark 2.0中DataFrame和Dataset被合并为统一的Dataset APIDataset[T] // 泛型DatasetT可以是任意类型 ├── Dataset[Row] // DataFrame的本质Row是无类型的行记录 └── Dataset[Person] // 强类型的DatasetPerson是case class 结论DataFrame Dataset[Row]即DataFrame是Dataset的子集1.4 三种API的选择策略需求推荐API原因需要精确控制执行细节RDD直接操作分区、血统、缓存编译时类型安全Dataset强类型约束IDE自动补全统一简化APIDataFrame/Dataset一套API处理所有结构化数据非结构化数据处理RDD文本流、不规则数据SQL风格查询DataFrameDSL和SQL语法直观复杂对象处理Dataset支持嵌套case class二、Dataset的创建2.1 方式一使用createDataset()方法通过SparkSession.createDataset()方法从集合或RDD创建Dataset。完整代码importorg.apache.spark.sql.SparkSessionobjectCreateDatasetMethod1{defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(CreateDataset-Method1).master(local[*]).getOrCreate()// 必须导入隐式转换否则无法创建Datasetimportspark.implicits._// 从Range创建Dataset[Int]valds1spark.createDataset(1to5)println( Dataset[Int] )ds1.show()// 从RDD[String]创建Dataset[String]valds2spark.createDataset(spark.sparkContext.textFile(data/sql/people.txt))println( Dataset[String] )ds2.show()spark.stop()}}运行结果 Dataset[Int] ----- |value| ----- | 1| | 2| | 3| | 4| | 5| ----- Dataset[String] -------- | value| -------- | Tom, 21| |Mike, 25| |Andy, 18| --------关键点import spark.implicits._必须导入提供基本类型的EncodercreateDataset支持Scala集合List、Array、Range等和RDD基本类型Int、String、Long等的Encoder由Spark自动提供2.2 方式二通过toDS()方法将Scala集合或RDD[CaseClass]通过toDS()隐式转换为Dataset。完整代码importorg.apache.spark.sql.{Dataset,SparkSession}objectCreateDatasetMethod2{// case class必须定义在main方法之外caseclassPerson(name:String,age:Int)defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(CreateDataset-Method2).master(local[*]).getOrCreate()// 导入SparkSession对象下的implicitsimportspark.implicits._// 从List[Person]创建Dataset[Person]valdataList(Person(Tom,21),Person(Andy,22))valds:Dataset[Person]data.toDS()ds.show()spark.stop()}}运行结果------- |name|age| ------- | Tom| 21| |Andy| 22| -------toDS() vs toDF() 对比方法输入输出类型toDS()List[Person]Dataset[Person]强类型toDF()List[Person]DataFrame即Dataset[Row]无类型2.3 方式三通过DataFrame转换将DataFrame通过as[CaseClass]方法转换为强类型的Dataset。完整代码importorg.apache.spark.sql.SparkSessionobjectCreateDatasetMethod3{// 注意JSON中数值默认推断为Long类型case class字段类型需匹配caseclassPerson(name:String,age:Long,sex:String)defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(CreateDataset-Method3).master(local[*]).getOrCreate()importspark.implicits._// 从JSON文件读取DataFramevaldfspark.read.json(data/sql/people.json)// DataFrame转Dataset[Person]valdsdf.as[Person]ds.show()spark.stop()}}运行结果---------------- |age| name|sex| ---------------- | 30| Michael| 男| | 19| Andy| 女| | 19| Justin| 男| | 20|Bernadette| 女| | 23| Gretchen| 女| | 27| David| 男| | 33| Joseph| 女| | 27| Trish| 女| | 33| Alex| 女| | 25| Ben| 男| ----------------重要注意事项注意点说明错误示例字段名匹配case class字段名必须与DataFrame列名一致case class Person(nmae: String)→ 报错字段类型匹配case class字段类型必须与DataFrame推断类型一致JSON中age推断为Long用Int会报错字段顺序case class字段顺序不影响匹配按名匹配顺序不同不影响可空字段数据库字段可为null时case class用Optionage: Option[Long]JSON字段类型推断规则JSON值Spark推断类型case class建议类型TomStringTypeString30LongTypeLong30.5DoubleTypeDoubletrueBooleanTypeBoolean建议处理JSON数据时case class的数值字段优先使用Long和Double避免类型不匹配。三、RDD、DataFrame、Dataset相互转换3.1 RDD DataFrameRDD转DataFrame方法1反射推断模式推荐importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{Row,SparkSession}objectRDDToDataFrame{caseclassPerson(name:String,age:Int)defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(RDD-To-DataFrame).master(local[*]).getOrCreate()importspark.implicits._// RDD[Person] - DataFramevalrdd:RDD[Person]spark.sparkContext.textFile(data/sql/people.txt).map(_.split(,)).map(attrPerson(attr(0).trim,attr(1).trim.toInt))valdfrdd.toDF()df.show()spark.stop()}}方法2编程式定义模式importorg.apache.spark.sql.types._importorg.apache.spark.sql.Row// 定义SchemavalschemaStructType(Array(StructField(name,StringType,true),StructField(age,IntegerType,true)))// 创建Row RDDvalrowRDDspark.sparkContext.textFile(data/sql/people.txt).map(_.split(,)).map(attrRow(attr(0).trim,attr(1).trim.toInt))// 合并为DataFramevaldfspark.createDataFrame(rowRDD,schema)方法3元组RDD直接转DataFrame// RDD[(String, Int)] 可直接toDF因为元组类型已知valtupleRDDspark.sparkContext.textFile(data/sql/people.txt).map(_.split(,)).map(attr(attr(0).trim,attr(1).trim.toInt))valdftupleRDD.toDF(name,age)DataFrame转RDD// DataFrame - RDD[Row]valrdd:RDD[Row]df.rdd// 注意转换后为RDD[Row]不再是原始的RDD[Person]rdd.foreach(println)// 输出: [Andy,18] [Tom,21] [Mike,25]类型变化图解RDD[Person] DataFrame (Dataset[Row]) Person(Tom,21) Row(Tom,21) Person(Mike,25) - Row(Mike,25) Person(Andy,18) Row(Andy,18) 转换特点 RDD - DataFrame: 类型信息丢失变为无类型的Row DataFrame - RDD: 只能得到RDD[Row]无法恢复原始类型3.2 RDD DatasetRDD转Datasetimportorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{Dataset,SparkSession}objectRDDToDataset{caseclassPerson(name:String,age:Int)defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(RDD-To-Dataset).master(local[*]).getOrCreate()importspark.implicits._// 创建RDD[Person]valrdd:RDD[Person]spark.sparkContext.textFile(data/sql/people.txt).map(_.split(,)).map(attrPerson(attr(0).trim,attr(1).trim.toInt))// RDD[Person] - Dataset[Person]valds:Dataset[Person]rdd.toDS()ds.show()// Dataset[Person] - RDD[Person]valresRDD:RDD[Person]ds.rdd resRDD.foreach(println)spark.stop()}}运行结果------- |name|age| ------- | Tom| 21| |Mike| 25| |Andy| 18| ------- Person(Andy,18) Person(Tom,21) Person(Mike,25)关键发现转换方向类型变化特点RDD[Person] - Dataset[Person]类型不变强类型保留安全Dataset[Person] - RDD[Person]类型不变强类型保留安全对比RDD-DataFrameRDD和Dataset互转过程中数据类型不会丢失而DataFrame转RDD时case class会被转为Row对象。3.3 DataFrame DatasetDataFrame转Datasetimportorg.apache.spark.sql.{DataFrame,Dataset,SparkSession}objectDataFrameToDataset{// JSON中数值默认推断为LongcaseclassPerson(name:String,age:Long,sex:String)defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(DataFrame-To-Dataset).master(local[*]).getOrCreate()importspark.implicits._// 创建DataFramevaldf:DataFramespark.read.json(data/sql/people.json)df.show()// DataFrame - Dataset[Person]valds:Dataset[Person]df.as[Person]ds.show()// Dataset[Person] - DataFramevalresDF:DataFrameds.toDF()resDF.show()spark.stop()}}运行结果// DataFrame ---------------- |age| name|sex| ---------------- | 30| Michael| 男| | 19| Andy| 女| ... // Dataset[Person]显示效果相同但底层是强类型 ---------------- |age| name|sex| ---------------- | 30| Michael| 男| | 19| Andy| 女| ... // 转回DataFrame ---------------- |age| name|sex| ---------------- | 30| Michael| 男| | 19| Andy| 女| ...3.4 三种抽象互转总览图┌─────────────────┐ │ RDD[Person] │ │ (分布式集合) │ └────────┬────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │ toDF() │ │ toDS() │ │ 保持不变 │ └────┬─────┘ └────┬─────┘ └──────────────┘ │ │ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ │ DataFrame │ │ Dataset[Person]│ │ (Dataset[Row]) │◄─┤ (强类型) │ │ 无类型 │ │ │ └────────┬────────┘ └────────┬────────┘ │ │ │ ┌─────────────────┘ │ │ ▼ ▼ ┌─────────────────┐ │ .rdd │ │ │ ▼ ▼ ┌──────────────┐ ┌──────────────┐ │ RDD[Row] │ │ RDD[Person] │ │ (类型丢失) │ │ (类型保留) │ └──────────────┘ └──────────────┘3.5 转换方法速查表转换方向方法类型变化说明RDD - DataFramerdd.toDF()RDD[T]→DataFrame需导入implicitsRDD - Datasetrdd.toDS()RDD[T]→Dataset[T]类型保留DataFrame - RDDdf.rddDataFrame→RDD[Row]类型丢失DataFrame - Datasetdf.as[T]DataFrame→Dataset[T]需case classDataset - RDDds.rddDataset[T]→RDD[T]类型保留Dataset - DataFrameds.toDF()Dataset[T]→DataFrame类型丢失四、Dataset实现WordCount利用Dataset的强类型特性和函数式API可以写出更简洁优雅的WordCount。4.1 完整代码importorg.apache.spark.sql.{Dataset,SparkSession}objectDatasetWordCount{defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(Dataset-WordCount).master(local[*]).getOrCreate()importspark.implicits._// Dataset实现WordCountvalres:Dataset[(String,Long)]spark.read.text(data/word.txt)// 读取文本文件得到DataFrame.as[String]// DataFrame - Dataset[String].flatMap(_.split( ))// 拆分单词.groupByKey(_.toLowerCase)// 按单词分组转小写统一.count()// 统计每组数量res.show()spark.stop()}}4.2 代码解析步骤代码说明类型变化1spark.read.text(data/word.txt)读取文本文件DataFrame单列value2.as[String]转为Dataset[String]Dataset[String]3.flatMap(_.split( ))拆分单词Dataset[String]单词流4.groupByKey(_.toLowerCase)按小写单词分组KeyValueGroupedDataset5.count()统计每组数量Dataset[(String, Long)]4.3 运行结果-------------- | key|count(1)| -------------- | fast| 1| | is| 3| | spark| 2| |better| 1| | good| 1| |hadoop| 1| --------------4.4 Dataset WordCount vs RDD WordCountRDD版本valrddsc.textFile(data/word.txt)valresultrdd.flatMap(_.split( )).map((_,1)).reduceByKey(__)Dataset版本valdsspark.read.text(data/word.txt).as[String]valresultds.flatMap(_.split( )).groupByKey(_.toLowerCase).count()对比分析特性RDD版本Dataset版本代码量较少更少类型安全无有编译时检查性能优化手动Catalyst自动优化API风格函数式函数式 SQLShufflereduceByKeygroupByKey count序列化Java/KryoTungsten更高效五、Dataset高级操作5.1 强类型map操作caseclassPerson(name:String,age:Long)valdsspark.read.json(people.json).as[Person]// 编译时检查字段名和类型ds.map(_.name.toUpperCase).show()// ✅ 正确ds.map(_.nmae.toUpperCase).show()// ❌ 编译报错字段名错误ds.map(_.ageyears).show()// ❌ 编译报错类型不匹配5.2 类型安全的聚合caseclassScore(name:String,subject:String,score:Int)valdsspark.read.json(scores.json).as[Score]// 按学生分组计算平均分ds.groupByKey(_.name).mapGroups{case(name,scores)valscoreListscores.toList(name,scoreList.map(_.score).sum/scoreList.size.toDouble)}.show()5.3 Dataset与SQL混用caseclassPerson(name:String,age:Long,sex:String)valdsspark.read.json(people.json).as[Person]// 注册临时视图使用SQL查询ds.createOrReplaceTempView(people)spark.sql(SELECT * FROM people WHERE age 25).as[Person].show()六、总结本文系统讲解了Spark SQL中Dataset的核心知识核心知识点回顾Dataset的定位兼具RDD的类型安全和DataFrame的Catalyst优化DataFrame Dataset[Row]是Dataset的子集Spark 2.0中三者统一为Dataset APIDataset的三种创建方式spark.createDataset(集合/RDD)从数据源创建集合.toDS()隐式转换df.as[CaseClass]从DataFrame转换三种抽象的互转RDD - DataFrame类型会丢失转为RowRDD - Dataset类型保留安全DataFrame - Datasetas[T]和toDF()互转选择策略非结构化数据 → RDDSQL查询、简单ETL → DataFrame复杂类型处理、编译时安全 → Dataset
Spark SQL详解(三):Dataset深度解析与RDD、DataFrame、Dataset互转实战
摘要Dataset是Spark SQL中最高级的数据抽象兼具RDD的类型安全和DataFrame的Catalyst优化能力。本文深入讲解Dataset的创建方式createDataset、toDS、DataFrame转换系统梳理RDD、DataFrame、Dataset三者之间的相互转换方法并通过Dataset实现WordCount的实战案例帮助读者建立完整的Spark SQL数据抽象体系认知。一、Dataset概述1.1 为什么需要DatasetDataFrame的出现让Spark可以更好地处理结构化数据的计算但存在一个关键问题编译时的类型安全。DataFrame的类型安全问题// DataFrame是无类型的编译时无法检查字段名和类型valdfspark.read.json(people.json)df.select(nmae)// 字段名拼写错误编译通过运行时才报错df(age)abc// 类型不匹配编译通过运行时才报错Dataset的解决方案// Dataset是强类型的编译时即可发现错误caseclassPerson(name:String,age:Long,sex:String)valdsspark.read.json(people.json).as[Person]ds.map(_.nmae)// 编译报错字段名拼写错误被提前发现ds.map(_.ageabc)// 编译报错类型不匹配被提前发现1.2 Dataset的核心特性特性RDDDataFrameDataset类型安全✅ 编译时❌ 运行时✅ 编译时Catalyst优化❌✅✅Tungsten优化❌✅✅API风格函数式SQL DSL函数式 SQL序列化Java/KryoTungsten二进制Tungsten二进制适用场景非结构化数据结构化数据查询复杂类型数据处理1.3 Spark 2.0中的关系在Spark 2.0中DataFrame和Dataset被合并为统一的Dataset APIDataset[T] // 泛型DatasetT可以是任意类型 ├── Dataset[Row] // DataFrame的本质Row是无类型的行记录 └── Dataset[Person] // 强类型的DatasetPerson是case class 结论DataFrame Dataset[Row]即DataFrame是Dataset的子集1.4 三种API的选择策略需求推荐API原因需要精确控制执行细节RDD直接操作分区、血统、缓存编译时类型安全Dataset强类型约束IDE自动补全统一简化APIDataFrame/Dataset一套API处理所有结构化数据非结构化数据处理RDD文本流、不规则数据SQL风格查询DataFrameDSL和SQL语法直观复杂对象处理Dataset支持嵌套case class二、Dataset的创建2.1 方式一使用createDataset()方法通过SparkSession.createDataset()方法从集合或RDD创建Dataset。完整代码importorg.apache.spark.sql.SparkSessionobjectCreateDatasetMethod1{defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(CreateDataset-Method1).master(local[*]).getOrCreate()// 必须导入隐式转换否则无法创建Datasetimportspark.implicits._// 从Range创建Dataset[Int]valds1spark.createDataset(1to5)println( Dataset[Int] )ds1.show()// 从RDD[String]创建Dataset[String]valds2spark.createDataset(spark.sparkContext.textFile(data/sql/people.txt))println( Dataset[String] )ds2.show()spark.stop()}}运行结果 Dataset[Int] ----- |value| ----- | 1| | 2| | 3| | 4| | 5| ----- Dataset[String] -------- | value| -------- | Tom, 21| |Mike, 25| |Andy, 18| --------关键点import spark.implicits._必须导入提供基本类型的EncodercreateDataset支持Scala集合List、Array、Range等和RDD基本类型Int、String、Long等的Encoder由Spark自动提供2.2 方式二通过toDS()方法将Scala集合或RDD[CaseClass]通过toDS()隐式转换为Dataset。完整代码importorg.apache.spark.sql.{Dataset,SparkSession}objectCreateDatasetMethod2{// case class必须定义在main方法之外caseclassPerson(name:String,age:Int)defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(CreateDataset-Method2).master(local[*]).getOrCreate()// 导入SparkSession对象下的implicitsimportspark.implicits._// 从List[Person]创建Dataset[Person]valdataList(Person(Tom,21),Person(Andy,22))valds:Dataset[Person]data.toDS()ds.show()spark.stop()}}运行结果------- |name|age| ------- | Tom| 21| |Andy| 22| -------toDS() vs toDF() 对比方法输入输出类型toDS()List[Person]Dataset[Person]强类型toDF()List[Person]DataFrame即Dataset[Row]无类型2.3 方式三通过DataFrame转换将DataFrame通过as[CaseClass]方法转换为强类型的Dataset。完整代码importorg.apache.spark.sql.SparkSessionobjectCreateDatasetMethod3{// 注意JSON中数值默认推断为Long类型case class字段类型需匹配caseclassPerson(name:String,age:Long,sex:String)defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(CreateDataset-Method3).master(local[*]).getOrCreate()importspark.implicits._// 从JSON文件读取DataFramevaldfspark.read.json(data/sql/people.json)// DataFrame转Dataset[Person]valdsdf.as[Person]ds.show()spark.stop()}}运行结果---------------- |age| name|sex| ---------------- | 30| Michael| 男| | 19| Andy| 女| | 19| Justin| 男| | 20|Bernadette| 女| | 23| Gretchen| 女| | 27| David| 男| | 33| Joseph| 女| | 27| Trish| 女| | 33| Alex| 女| | 25| Ben| 男| ----------------重要注意事项注意点说明错误示例字段名匹配case class字段名必须与DataFrame列名一致case class Person(nmae: String)→ 报错字段类型匹配case class字段类型必须与DataFrame推断类型一致JSON中age推断为Long用Int会报错字段顺序case class字段顺序不影响匹配按名匹配顺序不同不影响可空字段数据库字段可为null时case class用Optionage: Option[Long]JSON字段类型推断规则JSON值Spark推断类型case class建议类型TomStringTypeString30LongTypeLong30.5DoubleTypeDoubletrueBooleanTypeBoolean建议处理JSON数据时case class的数值字段优先使用Long和Double避免类型不匹配。三、RDD、DataFrame、Dataset相互转换3.1 RDD DataFrameRDD转DataFrame方法1反射推断模式推荐importorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{Row,SparkSession}objectRDDToDataFrame{caseclassPerson(name:String,age:Int)defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(RDD-To-DataFrame).master(local[*]).getOrCreate()importspark.implicits._// RDD[Person] - DataFramevalrdd:RDD[Person]spark.sparkContext.textFile(data/sql/people.txt).map(_.split(,)).map(attrPerson(attr(0).trim,attr(1).trim.toInt))valdfrdd.toDF()df.show()spark.stop()}}方法2编程式定义模式importorg.apache.spark.sql.types._importorg.apache.spark.sql.Row// 定义SchemavalschemaStructType(Array(StructField(name,StringType,true),StructField(age,IntegerType,true)))// 创建Row RDDvalrowRDDspark.sparkContext.textFile(data/sql/people.txt).map(_.split(,)).map(attrRow(attr(0).trim,attr(1).trim.toInt))// 合并为DataFramevaldfspark.createDataFrame(rowRDD,schema)方法3元组RDD直接转DataFrame// RDD[(String, Int)] 可直接toDF因为元组类型已知valtupleRDDspark.sparkContext.textFile(data/sql/people.txt).map(_.split(,)).map(attr(attr(0).trim,attr(1).trim.toInt))valdftupleRDD.toDF(name,age)DataFrame转RDD// DataFrame - RDD[Row]valrdd:RDD[Row]df.rdd// 注意转换后为RDD[Row]不再是原始的RDD[Person]rdd.foreach(println)// 输出: [Andy,18] [Tom,21] [Mike,25]类型变化图解RDD[Person] DataFrame (Dataset[Row]) Person(Tom,21) Row(Tom,21) Person(Mike,25) - Row(Mike,25) Person(Andy,18) Row(Andy,18) 转换特点 RDD - DataFrame: 类型信息丢失变为无类型的Row DataFrame - RDD: 只能得到RDD[Row]无法恢复原始类型3.2 RDD DatasetRDD转Datasetimportorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{Dataset,SparkSession}objectRDDToDataset{caseclassPerson(name:String,age:Int)defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(RDD-To-Dataset).master(local[*]).getOrCreate()importspark.implicits._// 创建RDD[Person]valrdd:RDD[Person]spark.sparkContext.textFile(data/sql/people.txt).map(_.split(,)).map(attrPerson(attr(0).trim,attr(1).trim.toInt))// RDD[Person] - Dataset[Person]valds:Dataset[Person]rdd.toDS()ds.show()// Dataset[Person] - RDD[Person]valresRDD:RDD[Person]ds.rdd resRDD.foreach(println)spark.stop()}}运行结果------- |name|age| ------- | Tom| 21| |Mike| 25| |Andy| 18| ------- Person(Andy,18) Person(Tom,21) Person(Mike,25)关键发现转换方向类型变化特点RDD[Person] - Dataset[Person]类型不变强类型保留安全Dataset[Person] - RDD[Person]类型不变强类型保留安全对比RDD-DataFrameRDD和Dataset互转过程中数据类型不会丢失而DataFrame转RDD时case class会被转为Row对象。3.3 DataFrame DatasetDataFrame转Datasetimportorg.apache.spark.sql.{DataFrame,Dataset,SparkSession}objectDataFrameToDataset{// JSON中数值默认推断为LongcaseclassPerson(name:String,age:Long,sex:String)defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(DataFrame-To-Dataset).master(local[*]).getOrCreate()importspark.implicits._// 创建DataFramevaldf:DataFramespark.read.json(data/sql/people.json)df.show()// DataFrame - Dataset[Person]valds:Dataset[Person]df.as[Person]ds.show()// Dataset[Person] - DataFramevalresDF:DataFrameds.toDF()resDF.show()spark.stop()}}运行结果// DataFrame ---------------- |age| name|sex| ---------------- | 30| Michael| 男| | 19| Andy| 女| ... // Dataset[Person]显示效果相同但底层是强类型 ---------------- |age| name|sex| ---------------- | 30| Michael| 男| | 19| Andy| 女| ... // 转回DataFrame ---------------- |age| name|sex| ---------------- | 30| Michael| 男| | 19| Andy| 女| ...3.4 三种抽象互转总览图┌─────────────────┐ │ RDD[Person] │ │ (分布式集合) │ └────────┬────────┘ │ ┌──────────────────┼──────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │ toDF() │ │ toDS() │ │ 保持不变 │ └────┬─────┘ └────┬─────┘ └──────────────┘ │ │ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ │ DataFrame │ │ Dataset[Person]│ │ (Dataset[Row]) │◄─┤ (强类型) │ │ 无类型 │ │ │ └────────┬────────┘ └────────┬────────┘ │ │ │ ┌─────────────────┘ │ │ ▼ ▼ ┌─────────────────┐ │ .rdd │ │ │ ▼ ▼ ┌──────────────┐ ┌──────────────┐ │ RDD[Row] │ │ RDD[Person] │ │ (类型丢失) │ │ (类型保留) │ └──────────────┘ └──────────────┘3.5 转换方法速查表转换方向方法类型变化说明RDD - DataFramerdd.toDF()RDD[T]→DataFrame需导入implicitsRDD - Datasetrdd.toDS()RDD[T]→Dataset[T]类型保留DataFrame - RDDdf.rddDataFrame→RDD[Row]类型丢失DataFrame - Datasetdf.as[T]DataFrame→Dataset[T]需case classDataset - RDDds.rddDataset[T]→RDD[T]类型保留Dataset - DataFrameds.toDF()Dataset[T]→DataFrame类型丢失四、Dataset实现WordCount利用Dataset的强类型特性和函数式API可以写出更简洁优雅的WordCount。4.1 完整代码importorg.apache.spark.sql.{Dataset,SparkSession}objectDatasetWordCount{defmain(args:Array[String]):Unit{valsparkSparkSession.builder().appName(Dataset-WordCount).master(local[*]).getOrCreate()importspark.implicits._// Dataset实现WordCountvalres:Dataset[(String,Long)]spark.read.text(data/word.txt)// 读取文本文件得到DataFrame.as[String]// DataFrame - Dataset[String].flatMap(_.split( ))// 拆分单词.groupByKey(_.toLowerCase)// 按单词分组转小写统一.count()// 统计每组数量res.show()spark.stop()}}4.2 代码解析步骤代码说明类型变化1spark.read.text(data/word.txt)读取文本文件DataFrame单列value2.as[String]转为Dataset[String]Dataset[String]3.flatMap(_.split( ))拆分单词Dataset[String]单词流4.groupByKey(_.toLowerCase)按小写单词分组KeyValueGroupedDataset5.count()统计每组数量Dataset[(String, Long)]4.3 运行结果-------------- | key|count(1)| -------------- | fast| 1| | is| 3| | spark| 2| |better| 1| | good| 1| |hadoop| 1| --------------4.4 Dataset WordCount vs RDD WordCountRDD版本valrddsc.textFile(data/word.txt)valresultrdd.flatMap(_.split( )).map((_,1)).reduceByKey(__)Dataset版本valdsspark.read.text(data/word.txt).as[String]valresultds.flatMap(_.split( )).groupByKey(_.toLowerCase).count()对比分析特性RDD版本Dataset版本代码量较少更少类型安全无有编译时检查性能优化手动Catalyst自动优化API风格函数式函数式 SQLShufflereduceByKeygroupByKey count序列化Java/KryoTungsten更高效五、Dataset高级操作5.1 强类型map操作caseclassPerson(name:String,age:Long)valdsspark.read.json(people.json).as[Person]// 编译时检查字段名和类型ds.map(_.name.toUpperCase).show()// ✅ 正确ds.map(_.nmae.toUpperCase).show()// ❌ 编译报错字段名错误ds.map(_.ageyears).show()// ❌ 编译报错类型不匹配5.2 类型安全的聚合caseclassScore(name:String,subject:String,score:Int)valdsspark.read.json(scores.json).as[Score]// 按学生分组计算平均分ds.groupByKey(_.name).mapGroups{case(name,scores)valscoreListscores.toList(name,scoreList.map(_.score).sum/scoreList.size.toDouble)}.show()5.3 Dataset与SQL混用caseclassPerson(name:String,age:Long,sex:String)valdsspark.read.json(people.json).as[Person]// 注册临时视图使用SQL查询ds.createOrReplaceTempView(people)spark.sql(SELECT * FROM people WHERE age 25).as[Person].show()六、总结本文系统讲解了Spark SQL中Dataset的核心知识核心知识点回顾Dataset的定位兼具RDD的类型安全和DataFrame的Catalyst优化DataFrame Dataset[Row]是Dataset的子集Spark 2.0中三者统一为Dataset APIDataset的三种创建方式spark.createDataset(集合/RDD)从数据源创建集合.toDS()隐式转换df.as[CaseClass]从DataFrame转换三种抽象的互转RDD - DataFrame类型会丢失转为RowRDD - Dataset类型保留安全DataFrame - Datasetas[T]和toDF()互转选择策略非结构化数据 → RDDSQL查询、简单ETL → DataFrame复杂类型处理、编译时安全 → Dataset