Pandas API on Spark 快速入门像写 Pandas 一样使用 Spark

Pandas API on Spark 快速入门像写 Pandas 一样使用 Spark 1. 什么是 pandas API on Sparkpandas API on Spark 是一套面向 pandas 用户的接口。它的目标不是让你重新学习一套全新的数据处理方式而是尽量沿用 pandas 的操作习惯同时把底层执行交给 Spark。官方 Quickstart 主要演示了它和 pandas 的相似点以及与 Spark DataFrame 的互转方式。2. 基础导入importpandasaspdimportnumpyasnpimportpyspark.pandasaspsfrompyspark.sqlimportSparkSession这是官方示例中的标准导入方式。3. 对象创建3.1 创建 Seriessps.Series([1,3,5,np.nan,6,8])print(s)3.2 创建 DataFramepsdfps.DataFrame({a:[1,2,3,4,5,6],b:[100,200,300,400,500,600],c:[one,two,three,four,five,six],},index[10,20,30,40,50,60],)print(psdf)3.3 从 pandas DataFrame 转换datespd.date_range(20130101,periods6)pdfpd.DataFrame(np.random.randn(6,4),indexdates,columnslist(ABCD))psdfps.from_pandas(pdf)print(type(psdf))print(psdf)3.4 从 Spark DataFrame 转换sparkSparkSession.builder.getOrCreate()sdfspark.createDataFrame(pdf)sdf.show()psdfsdf.pandas_api()print(psdf)官方示例展示了 pandas DataFrame 可以转换成 pandas API on Spark DataFrameSpark DataFrame 也可以通过pandas_api()转成 pandas API on Spark 对象。4. 查看数据4.1 查看前几行print(psdf.head())4.2 查看索引、列名和 numpy 数据print(psdf.index)print(psdf.columns)print(psdf.to_numpy())4.3 查看数据摘要print(psdf.describe())4.4 转置、按索引排序、按值排序print(psdf.T)print(psdf.sort_index(ascendingFalse))print(psdf.sort_values(byB))需要注意的是Spark DataFrame 默认不保证天然顺序如果通过相关配置保留顺序会带来额外排序开销。5. 缺失值处理pandas API on Spark 主要使用np.nan表示缺失值默认情况下它不会被纳入计算。pdf1pdf.reindex(indexdates[0:4],columnslist(pdf.columns)[E])pdf1.loc[dates[0]:dates[1],E]1psdf1ps.from_pandas(pdf1)print(psdf1)5.1 删除缺失值print(psdf1.dropna(howany))5.2 填充缺失值print(psdf1.fillna(value5))6. 基础统计操作print(psdf.mean())官方示例中mean()、describe()等操作都可以直接使用整体风格与 pandas 很接近。7. Arrow 优化pandas API on Spark 可以使用 Spark 配置进行优化。官方示例里开启 Arrow 后ps.range(300000).to_pandas()的耗时大约从 3.08 秒下降到 900 毫秒说明 Arrow 对 pandas 转换性能有明显提升。prevspark.conf.get(spark.sql.execution.arrow.pyspark.enabled)ps.set_option(compute.default_index_type,distributed)importwarnings warnings.filterwarnings(ignore)spark.conf.set(spark.sql.execution.arrow.pyspark.enabled,True)# %timeit ps.range(300000).to_pandas()spark.conf.set(spark.sql.execution.arrow.pyspark.enabled,False)# %timeit ps.range(300000).to_pandas()ps.reset_option(compute.default_index_type)spark.conf.set(spark.sql.execution.arrow.pyspark.enabled,prev)8. 分组聚合psdfps.DataFrame({A:[foo,bar,foo,bar,foo,bar,foo,foo],B:[one,one,two,three,two,two,one,three],C:np.random.randn(8),D:np.random.randn(8),})print(psdf.groupby(A).sum())print(psdf.groupby([A,B]).sum())官方示例中groupby支持单列分组和多列分组多列分组后会形成层级索引。9. 绘图pserpd.Series(np.random.randn(1000),indexpd.date_range(1/1/2000,periods1000))psserps.Series(pser)psserpsser.cummax()psser.plot()pdfpd.DataFrame(np.random.randn(1000,4),indexpser.index,columns[A,B,C,D])psdfps.from_pandas(pdf)psdfpsdf.cummax()psdf.plot()官方示例展示了 Series 和 DataFrame 都可以直接调用plot()进行绘图。10. 数据读写10.1 CSVpsdf.to_csv(foo.csv)print(ps.read_csv(foo.csv).head(10))10.2 Parquetpsdf.to_parquet(bar.parquet)print(ps.read_parquet(bar.parquet).head(10))10.3 Spark IOpsdf.spark.to_spark_io(zoo.orc,formatorc)print(ps.read_spark_io(zoo.orc,formatorc).head(10))官方 Quickstart 展示了 pandas API on Spark 支持 CSV、Parquet以及通过 Spark IO 写入 ORC 等 Spark 数据源。11. 总结pandas API on Spark 最适合这样一类场景你已经熟悉 pandas希望继续用接近 pandas 的写法处理数据但数据规模开始变大或者希望接入 Spark 的分布式能力。这套接口的价值不在于“发明一种新语法”而在于让 pandas 风格代码更自然地过渡到 Spark 世界。它支持对象创建、DataFrame 互转、常见查看操作、缺失值处理、统计分析、Arrow 优化、分组聚合、绘图和多种数据读写方式足够作为入门和迁移的起点。