DataBricks(PySpark)を勉強中ですが、自分の備忘録として記録に残したいと思います。
DataBricks では、Pandas のデータフレームを扱いながら、PySparkの仕組みを使って分散処理ができます。
PySpark での Pandas
PySpark で Pandas を使う際に、分散処理を有効にするには下記を使ったほうが良い。
分散処理を有効にしたPandasの使い方
import pyspark.pandas as pd
to_Pandas() でもSpark形式のPandasとして扱うことができる。df.to_pandas_on_spark()
Spark形式のdfを変換するときに、.to_pandas_on_spark()を使うとSpark形式のpandasに返還できるので分散処理が有効になる。
なお、そのまま通常のPandasを使っても、PySparkの遅延評価(分散処理)は有効とならないため注意。
feature_store のライブラリでも.toPandas()があるが、.to_pandas_on_spark()を使っておいたほうが確実な気がする、
pandas_udf
udf = user difined function ユーザ定義関数を作ることができる。
基本の使い方
from pyspark.sql.functions import pandas_udf, PandasUDFType @pandas_udf('double', PandasUDFType.SCALAR) def pandas_plus_one(v): # `v` is a pandas Series return v.add(1) # outputs a pandas Series spark.range(10).select(pandas_plus_one("id")).show()
pandas_udf を使うときには、Iterator で渡すほうが分散処理が有効になって早い。
# Pandas Function API from typing import Iterator import pandas as pd def pandas_plus_one(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]: return map(lambda v: v + 1, iterator) # pandas_plus_one is just a regular Python function, and mapInPandas is # logically treated as _a separate SQL query plan_ instead of a SQL expression. # Therefore, direct interactions with other expressions are impossible. spark.range(10).mapInPandas(pandas_plus_one, schema="id long").show()
参考サイト
How Python type hints simplify Pandas UDFs in Apache Spark 3.0 - The Databricks Blog