目录第一关 Transformation - map第二关 Transformation - mapPartitions第三关 Transformation - filter第四关 Transformation - flatMap第五关 Transformation - distinct第六关 Transformation - sortBy第七关 Transformation - sortByKey第八关 Transformation - mapValues第九关 Transformations - reduceByKey第十关 Actions - 常用算子第一关 Transformation - mapfrom pyspark import SparkContext # -*- coding: UTF-8 -*- if __name__ __main__: #********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个1到5的列表List data [1, 2, 3, 4, 5] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) 使用 map 算子将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作规则如下: 需求 偶数转换成该数的平方 奇数转换成该数的立方 # 5.使用 map 算子完成以上需求 rdd_map rdd.map(lambda x: x * x if x % 2 0 else x * x * x) # 6.使用rdd.collect() 收集完成 map 转换的元素 print(rdd_map.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#第二关 Transformation - mapPartitions# -*- coding: UTF-8 -*- from pyspark import SparkContext #********** Begin **********# def f(iterator): list [] for x in iterator: list.append((x, len(x))) return list #********** End **********# if __name__ __main__: # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2. 一个内容为dog, salmon, salmon, rat, elephant的列表List data [dog, salmon, salmon, rat, elephant] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) 使用 mapPartitions 算子将 rdd 的数据 (dog, salmon, salmon, rat, elephant) 按照下面的规则进行转换操作规则如下: 需求 将字符串与该字符串的长度组合成一个元组例如 dog -- (dog,3) salmon -- (salmon,6) # 5.使用 mapPartitions 算子完成以上需求 partitions rdd.mapPartitions(f) # 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素 print(partitions.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#第三关 Transformation - filter# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: #********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个1到8的列表List data [1, 2, 3, 4, 5, 6, 7, 8] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) 使用 filter 算子将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作规则如下: 需求 过滤掉rdd中的奇数 # 5.使用 filter 算子完成以上需求 rdd_filter rdd.filter(lambda x: x % 2 0) # 6.使用rdd.collect() 收集完成 filter 转换的元素 print(rdd_filter.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#第四关 Transformation - flatMap# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: #********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List data [[1, 2, 3], [4, 5, 6], [7, 8, 9]] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) 使用 flatMap 算子将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作规则如下: 需求 合并RDD的元素例如 ([1,2,3],[4,5,6]) -- (1,2,3,4,5,6) ([2,3],[4,5],[6]) -- (1,2,3,4,5,6) # 5.使用 filter 算子完成以上需求 flat_map rdd.flatMap(lambda x: x) # 6.使用rdd.collect() 收集完成 filter 转换的元素 print(flat_map.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#第五关 Transformation - distinct# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: #********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个内容为1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1的列表List data [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) 使用 distinct 算子将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作规则如下: 需求 元素去重例如 1,2,3,3,2,1 -- 1,2,3 1,1,1,1, -- 1 # 5.使用 distinct 算子完成以上需求 distinctResult rdd.distinct() # 6.使用rdd.collect() 收集完成 distinct 转换的元素 print(distinctResult.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#第六关 Transformation - sortBy# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: # ********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个内容为1, 3, 5, 7, 9, 8, 6, 4, 2的列表List data [1, 3, 5, 7, 9, 8, 6, 4, 2] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) 使用 sortBy 算子将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作规则如下: 需求 元素排序例如 5,4,3,1,2 -- 1,2,3,4,5 # 5.使用 sortBy 算子完成以上需求 sort_result rdd.sortBy(lambda x: x) # 6.使用rdd.collect() 收集完成 sortBy 转换的元素 print(sort_result.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#第七关 Transformation - sortByKey# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: # ********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个内容为[(B,1),(A,2),(C,3)]的列表List data [(B, 1), (A, 2), (C, 3)] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) 使用 sortByKey 算子将 rdd 的数据 (B, 1), (A, 2), (C, 3) 按照下面的规则进行转换操作规则如下: 需求 元素排序例如 [(3,3),(2,2),(1,1)] -- [(1,1),(2,2),(3,3)] # 5.使用 sortByKey 算子完成以上需求 sort_by_key rdd.sortByKey() # 6.使用rdd.collect() 收集完成 sortByKey 转换的元素 print(sort_by_key.collect()) # 7.停止 SparkContext sc.stop() # ********** End **********#第八关 Transformation - mapValues# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: # ********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个内容为[(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)]的列表List data [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) 使用 mapValues 算子将 rdd 的数据 (1, 1), (2, 2), (3, 3), (4, 4), (5, 5) 按照下面的规则进行转换操作规则如下: 需求 元素key,value的value进行以下操作 偶数转换成该数的平方 奇数转换成该数的立方 # 5.使用 mapValues 算子完成以上需求 values rdd.mapValues(lambda x: x * x if x % 2 0 else x * x * x) # 6.使用rdd.collect() 收集完成 mapValues 转换的元素 print(values.collect()) # 7.停止 SparkContext sc.stop() # ********** End **********#第九关 Transformations - reduceByKey# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: # ********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个内容为[(python, 1), (scala, 2), (python, 3), (python, 4), (java, 5)]的列表List data [(python, 1), (scala, 2), (python, 3), (python, 4), (java, 5)] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) 使用 reduceByKey 算子将 rdd 的数据[(python, 1), (scala, 2), (python, 3), (python, 4), (java, 5)] 按照下面的规则进行转换操作规则如下: 需求 元素key-value的value累加操作例如 (1,1),(1,1),(1,2) -- (1,4) (1,1),(1,1),(2,2),(2,2) -- (1,2),(2,4) # 5.使用 reduceByKey 算子完成以上需求 result rdd.reduceByKey(lambda x, y: x y) # 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素 print(result.collect()) # 7.停止 SparkContext sc.stop() # ********** End **********#第十关 Actions - 常用算子# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: # ********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List data [1, 3, 5, 7, 9, 8, 6, 4, 2] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.收集rdd的所有元素并print输出 print(rdd.collect()) # 5.统计rdd的元素个数并print输出 print(rdd.count()) # 6.获取rdd的第一个元素并print输出 print(rdd.first()) # 7.获取rdd的前3个元素并print输出 print(rdd.take(3)) # 8.聚合rdd的所有元素并print输出 print(rdd.reduce(lambda x, y: x y)) # 9.停止 SparkContext sc.stop() # ********** End **********#
Spark算子 - Python
目录第一关 Transformation - map第二关 Transformation - mapPartitions第三关 Transformation - filter第四关 Transformation - flatMap第五关 Transformation - distinct第六关 Transformation - sortBy第七关 Transformation - sortByKey第八关 Transformation - mapValues第九关 Transformations - reduceByKey第十关 Actions - 常用算子第一关 Transformation - mapfrom pyspark import SparkContext # -*- coding: UTF-8 -*- if __name__ __main__: #********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个1到5的列表List data [1, 2, 3, 4, 5] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) 使用 map 算子将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作规则如下: 需求 偶数转换成该数的平方 奇数转换成该数的立方 # 5.使用 map 算子完成以上需求 rdd_map rdd.map(lambda x: x * x if x % 2 0 else x * x * x) # 6.使用rdd.collect() 收集完成 map 转换的元素 print(rdd_map.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#第二关 Transformation - mapPartitions# -*- coding: UTF-8 -*- from pyspark import SparkContext #********** Begin **********# def f(iterator): list [] for x in iterator: list.append((x, len(x))) return list #********** End **********# if __name__ __main__: # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2. 一个内容为dog, salmon, salmon, rat, elephant的列表List data [dog, salmon, salmon, rat, elephant] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) 使用 mapPartitions 算子将 rdd 的数据 (dog, salmon, salmon, rat, elephant) 按照下面的规则进行转换操作规则如下: 需求 将字符串与该字符串的长度组合成一个元组例如 dog -- (dog,3) salmon -- (salmon,6) # 5.使用 mapPartitions 算子完成以上需求 partitions rdd.mapPartitions(f) # 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素 print(partitions.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#第三关 Transformation - filter# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: #********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个1到8的列表List data [1, 2, 3, 4, 5, 6, 7, 8] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) 使用 filter 算子将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作规则如下: 需求 过滤掉rdd中的奇数 # 5.使用 filter 算子完成以上需求 rdd_filter rdd.filter(lambda x: x % 2 0) # 6.使用rdd.collect() 收集完成 filter 转换的元素 print(rdd_filter.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#第四关 Transformation - flatMap# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: #********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List data [[1, 2, 3], [4, 5, 6], [7, 8, 9]] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素。 print(rdd.collect()) 使用 flatMap 算子将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作规则如下: 需求 合并RDD的元素例如 ([1,2,3],[4,5,6]) -- (1,2,3,4,5,6) ([2,3],[4,5],[6]) -- (1,2,3,4,5,6) # 5.使用 filter 算子完成以上需求 flat_map rdd.flatMap(lambda x: x) # 6.使用rdd.collect() 收集完成 filter 转换的元素 print(flat_map.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#第五关 Transformation - distinct# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: #********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个内容为1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1的列表List data [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) 使用 distinct 算子将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作规则如下: 需求 元素去重例如 1,2,3,3,2,1 -- 1,2,3 1,1,1,1, -- 1 # 5.使用 distinct 算子完成以上需求 distinctResult rdd.distinct() # 6.使用rdd.collect() 收集完成 distinct 转换的元素 print(distinctResult.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#第六关 Transformation - sortBy# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: # ********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个内容为1, 3, 5, 7, 9, 8, 6, 4, 2的列表List data [1, 3, 5, 7, 9, 8, 6, 4, 2] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) 使用 sortBy 算子将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作规则如下: 需求 元素排序例如 5,4,3,1,2 -- 1,2,3,4,5 # 5.使用 sortBy 算子完成以上需求 sort_result rdd.sortBy(lambda x: x) # 6.使用rdd.collect() 收集完成 sortBy 转换的元素 print(sort_result.collect()) # 7.停止 SparkContext sc.stop() #********** End **********#第七关 Transformation - sortByKey# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: # ********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个内容为[(B,1),(A,2),(C,3)]的列表List data [(B, 1), (A, 2), (C, 3)] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) 使用 sortByKey 算子将 rdd 的数据 (B, 1), (A, 2), (C, 3) 按照下面的规则进行转换操作规则如下: 需求 元素排序例如 [(3,3),(2,2),(1,1)] -- [(1,1),(2,2),(3,3)] # 5.使用 sortByKey 算子完成以上需求 sort_by_key rdd.sortByKey() # 6.使用rdd.collect() 收集完成 sortByKey 转换的元素 print(sort_by_key.collect()) # 7.停止 SparkContext sc.stop() # ********** End **********#第八关 Transformation - mapValues# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: # ********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个内容为[(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)]的列表List data [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) 使用 mapValues 算子将 rdd 的数据 (1, 1), (2, 2), (3, 3), (4, 4), (5, 5) 按照下面的规则进行转换操作规则如下: 需求 元素key,value的value进行以下操作 偶数转换成该数的平方 奇数转换成该数的立方 # 5.使用 mapValues 算子完成以上需求 values rdd.mapValues(lambda x: x * x if x % 2 0 else x * x * x) # 6.使用rdd.collect() 收集完成 mapValues 转换的元素 print(values.collect()) # 7.停止 SparkContext sc.stop() # ********** End **********#第九关 Transformations - reduceByKey# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: # ********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个内容为[(python, 1), (scala, 2), (python, 3), (python, 4), (java, 5)]的列表List data [(python, 1), (scala, 2), (python, 3), (python, 4), (java, 5)] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.使用rdd.collect() 收集 rdd 的元素 print(rdd.collect()) 使用 reduceByKey 算子将 rdd 的数据[(python, 1), (scala, 2), (python, 3), (python, 4), (java, 5)] 按照下面的规则进行转换操作规则如下: 需求 元素key-value的value累加操作例如 (1,1),(1,1),(1,2) -- (1,4) (1,1),(1,1),(2,2),(2,2) -- (1,2),(2,4) # 5.使用 reduceByKey 算子完成以上需求 result rdd.reduceByKey(lambda x, y: x y) # 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素 print(result.collect()) # 7.停止 SparkContext sc.stop() # ********** End **********#第十关 Actions - 常用算子# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ __main__: # ********** Begin **********# # 1.初始化 SparkContext该对象是 Spark 程序的入口 sc SparkContext(local, Simple App) # 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List data [1, 3, 5, 7, 9, 8, 6, 4, 2] # 3.通过 SparkContext 并行化创建 rdd rdd sc.parallelize(data) # 4.收集rdd的所有元素并print输出 print(rdd.collect()) # 5.统计rdd的元素个数并print输出 print(rdd.count()) # 6.获取rdd的第一个元素并print输出 print(rdd.first()) # 7.获取rdd的前3个元素并print输出 print(rdd.take(3)) # 8.聚合rdd的所有元素并print输出 print(rdd.reduce(lambda x, y: x y)) # 9.停止 SparkContext sc.stop() # ********** End **********#