なんやブログ|データ分析格闘記

データ分析を生業に、分析ナレッジや読書感想などを書くブログ。ミナミのデータ分析格闘ブログ

【DataBricks】PySpark で Pandas を扱うときの注意まとめ

f:id:bee5boo5bee:20220321190824p:plain

DataBricks(PySpark)を勉強中ですが、自分の備忘録として記録に残したいと思います。

DataBricks では、Pandas のデータフレームを扱いながら、PySparkの仕組みを使って分散処理ができます。

PySpark での Pandas

PySpark で Pandas を使う際に、分散処理を有効にするには下記を使ったほうが良い。

分散処理を有効にしたPandasの使い方

  1. import pyspark.pandas as pd
    to_Pandas() でもSpark形式のPandasとして扱うことができる。

  2. df.to_pandas_on_spark()
    Spark形式のdfを変換するときに、.to_pandas_on_spark()を使うとSpark形式のpandasに返還できるので分散処理が有効になる。

なお、そのまま通常のPandasを使っても、PySparkの遅延評価(分散処理)は有効とならないため注意。

feature_store のライブラリでも.toPandas()があるが、.to_pandas_on_spark()を使っておいたほうが確実な気がする、

pandas_udf

udf = user difined function ユーザ定義関数を作ることができる。

基本の使い方

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('double', PandasUDFType.SCALAR)
def pandas_plus_one(v):
    # `v` is a pandas Series
    return v.add(1)  # outputs a pandas Series

spark.range(10).select(pandas_plus_one("id")).show()

pandas_udf を使うときには、Iterator で渡すほうが分散処理が有効になって早い。

# Pandas Function API
from typing import Iterator
import pandas as pd


def pandas_plus_one(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    return map(lambda v: v + 1, iterator)


# pandas_plus_one is just a regular Python function, and mapInPandas is
# logically treated as _a separate SQL query plan_ instead of a SQL expression. 
# Therefore, direct interactions with other expressions are impossible.
spark.range(10).mapInPandas(pandas_plus_one, schema="id long").show()
参考サイト

How Python type hints simplify Pandas UDFs in Apache Spark 3.0 - The Databricks Blog

参考書籍

プライバシーポリシー | 趣味のページ