【Python】Pandasデータフレームを高速化!並列処理の導入方法

はじめに

データサイエンスの分野において、Pandasは非常に便利なライブラリです。しかし、大量のデータを処理する際には、処理時間がかかってしまうことがあります。このような場合、並列処理を利用することで処理速度を向上させることができます。本記事では、Pandasデータフレームに並列処理を導入する方法を解説します。

並列処理の基本概念とそのメリット

並列処理は、複数のタスクを同時に実行することによって、処理時間を短縮することができます。一般的に、処理速度が遅いのはCPUの処理速度ではなく、メモリーやディスクI/Oの速度によるものが多いです。そのため、複数のCPUを同時に使用することで、処理時間を短縮することができます。

並列処理のメリットとしては、以下のようなものがあります。

  • 処理時間の短縮
  • 複数のCPUを使用するため、システム全体の処理効率が向上する
  • 分散処理が可能になるため、大規模なデータ処理も可能になる

Pandasデータフレームに並列処理を適用する方法:Daskの利用

Daskは、Pandasの代替ライブラリであり、分散処理をサポートしています。Daskを使用することで、Pandasデータフレームの分散処理が可能になります。以下は、Daskを使用したPandasデータフレームの並列処理の例です。

import dask.dataframe as dd

# CSVファイルからデータフレームを作成
df = dd.read_csv('data.csv')

# グループごとに集計
result = df.groupby('group').sum()

# 結果を出力
print(result.compute())

ここでは、CSVファイルからデータフレームを作成し、グループごとに集計しています。Daskでは、dd.read_csv()メソッドでCSVファイルを読み込み、df.groupby()メソッドでグループごとに集計を行い、最後にcompute()メソッドで結果を出力しています。

Daskを使用することで、Pandasデータフレームを分散処理することができます。しかし、分散処理を行うためには、複数のマシンやクラスタを用意する必要があります。そのため、小規模な処理には適していますが、大規模なデータ処理には向いていません。

Pandasデータフレームに並列処理を適用する方法:Joblibの利用

Joblibは、並列処理を行うためのライブラリであり、Pandasデータフレームの処理にも使用することができます。以下は、Joblibを使用したPandasデータフレームの並列処理の例です。

from joblib import Parallel, delayed

# データフレームを作成
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6], 'C': [7, 8, 9]})

# 各列の平均を計算する関数
def calc_mean(col):
    return col.mean()

# 各列の平均を計算
result = Parallel(n_jobs=-1)([delayed(calc_mean)(df) for col in df.columns])

# 結果を出力
print(result)

ここでは、pd.DataFrame()メソッドでデータフレームを作成し、calc_mean()関数で各列の平均を計算しています。そして、Parallel()メソッドを使用して、各列の平均を並列処理で計算し、resultに格納しています。

Joblibを使用することで、Pandasデータフレームの処理を並列化することができます。Joblibは、分散処理をサポートしていないため、一つのマシンでの処理に適しています。

実践例:Pandasデータフレームを使った並列処理の実装とパフォーマンス比較

ここでは、実際にPandasデータフレームを使用した並列処理の実装を行い、DaskとJoblibのパフォーマンスを比較します。

まず、以下のようなデータフレームを作成します。

import pandas as pd
import numpy as np

# データフレームを作成
df = pd.DataFrame({'A': np.random.randint(0, 10, size=10000000),
                   'B': np.random.randint(0, 10, size=10000000),
                   'C': np.random.randint(0, 10, size=10000000)})

print(df.head())

ここでは、np.random.randint()関数を使用して、10^7行3列のデータフレームを作成しています。

次に、Daskを使用して、各列の平均を計算してみます。

import dask.dataframe as dd
import time

# CSVファイルからデータフレームを作成
dask_df = dd.from_pandas(df, npartitions=4)

# 各列の平均を計算
start_time = time.time()
result = dask_df.mean().compute()
end_time = time.time()

# 処理時間を出力
print('Dask処理時間:', end_time - start_time)
print(result)

ここでは、dd.from_pandas()メソッドを使用して、PandasデータフレームをDaskデータフレームに変換しています。そして、mean()メソッドを使用して、各列の平均を計算し、compute()メソッドで結果を出力しています。

次に、Joblibを使用して、各列の平均を計算してみます。

from joblib import Parallel, delayed
import time

# 各列の平均を計算する関数
def calc_mean(col):
    return col.mean()

# 各列の平均を計算
start_time = time.time()
result = Parallel(n_jobs=-1)([delayed(calc_mean)(df[ col ]) for col in df.columns])
end_time = time.time()

# 処理時間を出力
print('Joblib処理時間:', end_time - start_time)
print(result)

ここでは、calc_mean()関数で各列の平均を計算し、Parallel()メソッドで並列処理を行っています。

以上の処理を実行した結果、DaskとJoblibの処理時間を比較すると、以下のようになりました。

Dask処理時間: 0.13286423683166504
A    4.499387
B    4.499571
C    4.500222
dtype: float64
Joblib処理時間: 0.3168032169342041
[4.4993873, 4.4995714, 4.5002224]

処理時間を比較すると、Daskの方がJoblibよりも処理速度が速いことがわかります。ただし、PC性能などの環境により結果は変わりますので、実際に検証されることをおススメします。

まとめ

本記事では、Pandasデータフレームに並列処理を導入する方法を解説しました。並列処理は、複数のタスクを同時に実行することによって、処理時間を短縮することができます。Pandasデータフレームの並列処理には、DaskとJoblibが使用されます。Daskは、分散処理をサポートしており、Joblibは、一つのマシンでの処理に適しています。

実際に、Pandasデータフレームを使用した並列処理の実装を行い、DaskとJoblibのパフォーマンスを比較しました。

Pandasデータフレームを使用する際には、処理速度を向上させるために、並列処理を導入することが重要です。DaskとJoblibを上手に使い分けることで、より高速な処理を実現することができます。

以上が、Pandasデータフレームに並列処理を導入する方法についての解説でした。