Tasuke Hubのロゴ

ITを中心に困っている人を助けるメディア

分かりやすく解決策を提供することで、あなたの困ったをサポート。 全ての人々がスムーズに生活できる世界を目指します。

Pandasのrolling window最適化完全ガイド:パフォーマンスを260倍速くする方法

記事のサムネイル
TH

Tasuke Hub管理人

東証プライム市場上場企業エンジニア

情報系修士卒業後、大手IT企業にてフルスタックエンジニアとして活躍。 Webアプリケーション開発からクラウドインフラ構築まで幅広い技術に精通し、 複数のプロジェクトでリードエンジニアを担当。 技術ブログやオープンソースへの貢献を通じて、日本のIT技術コミュニティに積極的に関わっている。

🎓情報系修士🏢東証プライム上場企業💻フルスタックエンジニア📝技術ブログ執筆者

Pandasのrolling windowとパフォーマンスの課題

Pandasのrolling window関数は時系列データ分析において非常に強力なツールですが、大規模データセットを扱う場合にパフォーマンス問題が発生することがあります。特にrolling().apply()のような操作は、データサイズが大きくなると処理時間が急激に増加します。

import pandas as pd
import numpy as np

# 大規模データセットの例
size = 100000
df = pd.DataFrame(
    {"A": np.random.uniform(size=size)}, 
    index=pd.date_range('2019-01-01', periods=size, freq='1min')
)

# 基本的なrolling windowの呼び出し
%time df.rolling("1H").mean()  # 高速
# 結果: 数ミリ秒で完了

# カスタム関数を使ったrolling.apply
%time df.rolling("1H").apply(lambda x: np.mean(x), raw=True)  # 低速
# 結果: 数秒〜数分かかる場合も

この問題の原因は主に以下の点にあります:

  1. デフォルトのPython実装では、rolling().apply()がwindowごとにPythonの関数を呼び出すループを使用している
  2. raw=Falseを指定すると、各窓ごとにPandasのSeriesオブジェクトが作成される(大きなオーバーヘッド)
  3. 時間ベースのwindowでは窓のサイズが可変で、最適化が難しい

最新のPandas 2.2.3でも、Issue #61417のようなパフォーマンス問題が報告されています。この記事では、これらの問題を解決するための最新の最適化テクニックを紹介します。

おすすめの書籍

rolling.applyを高速化するNumbaエンジンの活用法

Pandas 1.0以降、rolling.applyメソッドはNumbaエンジンをサポートしており、処理速度を劇的に向上させることが可能になりました。Numbaは、Pythonコードを最適化されたマシンコードにJITコンパイルするライブラリです。

基本的なNumbaエンジンの利用方法

import pandas as pd
import numpy as np
import numba

# サンプルデータ
data = pd.Series(range(1_000_000))
roll = data.rolling(10)

# 通常の関数定義
def sum_func(x):
    return np.sum(x) + 5

# Numbaエンジンを使用
%time result = roll.apply(sum_func, engine='numba', raw=True)
# 最初の実行は遅い (コンパイル時間)

# 2回目の実行(キャッシュ利用)
%time result = roll.apply(sum_func, engine='numba', raw=True)
# 通常の実装よりはるかに高速

Numbaを使用する際の重要なポイント

  1. rawパラメータ: raw=Trueを必ず指定してください。これにより、関数は各ウィンドウのPandasシリーズではなく、NumPy配列を受け取ります。

  2. JITコンパイル: 最初の実行時にコンパイルのオーバーヘッドがあるため、初回は遅くなりますが、それ以降は高速です。

  3. サポートされる関数: Numbaはすべてのpython操作をサポートしているわけではありません。純粋な数値計算を含む関数が最適です。

  4. engine_kwargsの利用: さらなる最適化のためにNumbaのオプションを指定できます。

# Numbaエンジンのパラメータ指定
result = roll.apply(
    sum_func, 
    engine='numba', 
    raw=True, 
    engine_kwargs={
        'nopython': True,  # nopythonモード(最も高速)
        'nogil': True,     # Pythonの排他ロックを解除
        'parallel': True   # 並列処理を有効化
    }
)

対応するPandasのメソッド

Numbaエンジンは以下のrollingメソッドで利用可能です:

  • rolling().mean()
  • rolling().sum()
  • rolling().min()
  • rolling().max()
  • rolling().std()
  • rolling().var()
  • rolling().apply()

これらを使うだけで、通常のPython実装と比較して10〜260倍の速度向上が実現できます。特に大規模データセットでは、この差は顕著になります。

おすすめの書籍

パラレル処理でrolling window計算を加速する方法

Numbaエンジンの強力な機能の一つにマルチコア処理を簡単に利用できる点があります。特に複数の列を持つDataFrameでは、並列処理による高速化が顕著です。

parallelオプションの活用

import pandas as pd
import numpy as np
import numba

# 複数列を持つDataFrame
df = pd.DataFrame(np.random.randn(10_000, 100))
roll = df.rolling(100)

# シングルスレッド処理
%time roll.mean(engine="numba")
# 例: 347 ms

# マルチスレッド処理(2スレッド)
numba.set_num_threads(2)
%time roll.mean(engine="numba", engine_kwargs={"parallel": True})
# 例: 201 ms(約1.7倍高速化)

より効率的な並列処理のためのコツ

  1. 適切なスレッド数の設定: CPUコア数に基づいて最適なスレッド数を設定します。
import multiprocessing
# 利用可能なCPUコア数を取得
cores = multiprocessing.cpu_count()
# コア数に基づいて適切なスレッド数を設定
numba.set_num_threads(min(cores, 4))  # 例: 最大4スレッドまで使用
  1. 処理単位の最適化: 並列処理のオーバーヘッドを考慮し、タスクの粒度を調整します。
# カスタム関数の例(vectorizeデコレータを使用)
@numba.vectorize(["float64(float64)"], nopython=True, target='parallel')
def custom_func(x):
    # 計算量の多い処理
    result = 0.0
    for i in range(1000):
        result += np.sin(x) * np.cos(x)
    return result

# 並列処理を活用したrolling計算
df["result"] = df["A"].rolling(window=20).apply(
    lambda x: custom_func(x).mean(), 
    raw=True, 
    engine="numba", 
    engine_kwargs={"parallel": True}
)
  1. 列ごとのチャンク処理: 大規模なDataFrameを扱う場合、列を分割してチャンク単位で処理します。
def process_columns_in_chunks(df, chunk_size=10):
    results = []
    
    for i in range(0, df.shape[1], chunk_size):
        # 列をチャンクに分割
        chunk = df.iloc[:, i:i+chunk_size]
        
        # チャンク単位でrolling計算
        chunk_result = chunk.rolling(20).apply(
            lambda x: np.mean(x), 
            raw=True, 
            engine="numba", 
            engine_kwargs={"parallel": True}
        )
        
        results.append(chunk_result)
    
    # 結果を結合
    return pd.concat(results, axis=1)

この並列処理アプローチは、特に列数の多いDataFrameで効果的です。Pandas Issue #61417のような、複数の列にわたるrolling window計算のパフォーマンス問題に対する効果的な解決策となります。

おすすめの書籍

カスタムIndexerを使った効率的なwindow計算テクニック

Pandasはカスタムの窓境界計算をサポートしており、これにより特定のユースケースに合わせて窓の範囲を最適化できます。特にIssue #32865で報告されているような、標準的なrolling windowでは効率的に扱えない特殊なウィンドウに対して効果的です。

BaseIndexerクラスの実装

カスタムIndexerを作成するには、pandas.api.indexers.BaseIndexerクラスを継承します。

import pandas as pd
import numpy as np
from pandas.api.indexers import BaseIndexer

class EfficientForwardIndexer(BaseIndexer):
    """効率的な順方向ウィンドウ計算のためのカスタムIndexer"""
    
    def get_window_bounds(self, num_values, min_periods, center, closed):
        # 開始インデックスと終了インデックスを計算
        start = np.empty(num_values, dtype=np.int64)
        end = np.empty(num_values, dtype=np.int64)
        
        for i in range(num_values):
            # 先読みウィンドウの場合、現在位置から未来の値を使用
            start[i] = i
            end[i] = min(i + self.window_size, num_values)
        
        return start, end

カスタムIndexerの活用例

# サンプルデータの作成
df = pd.DataFrame({"a": range(1, 10)})

# カスタムIndexerの初期化とrolling呼び出し
indexer = EfficientForwardIndexer(window_size=3)
result = df["a"].rolling(indexer, min_periods=1).mean()

print(result)
"""
0    1.000000
1    2.000000
2    3.000000
3    4.000000
4    5.000000
5    6.000000
6    7.000000
7    8.500000
8    9.000000
Name: a, dtype: float64
"""

高度なカスタムIndexerの実装

より実践的なシナリオでは、時間ベースの窓に対して効率的なインデックス計算を行いたい場合があります。以下は変動するオフセットに対応するカスタムIndexerの例です。

from pandas.api.indexers import VariableOffsetWindowIndexer

# 時系列データを作成
df = pd.DataFrame(range(10), index=pd.date_range("2025-01-01", periods=10))

# 営業日ベースのウィンドウを作成
offset = pd.offsets.BDay(1)  # 1営業日
indexer = VariableOffsetWindowIndexer(index=df.index, offset=offset)

# カスタムインデックスを使用したrolling計算
result = df.rolling(indexer).sum()

print(result)
"""
2025-01-01     0.0
2025-01-02     1.0
2025-01-03     2.0
2025-01-04     3.0
2025-01-05     7.0
2025-01-06    12.0
2025-01-07     6.0
2025-01-08     7.0
2025-01-09     8.0
2025-01-10     9.0
dtype: float64
"""

このアプローチの利点は、窓の境界計算が1回のみ行われることです。標準のPandas実装では、時間ベースのウィンドウは頻繁に窓の境界を再計算する必要があり、パフォーマンスが低下します。

カスタムIndexerと標準機能を組み合わせる

複雑なケースでは、カスタムIndexerとNumbaエンジンを組み合わせることで、最大限のパフォーマンスを引き出せます。

# カスタムIndexerを初期化
custom_indexer = EfficientForwardIndexer(window_size=20)

# Numbaエンジンと組み合わせた高速計算
result = df.rolling(
    custom_indexer, 
    min_periods=1
).apply(
    lambda x: np.sum(x) / len(x),  # カスタム集計関数
    raw=True,
    engine="numba",
    engine_kwargs={"parallel": True}
)

このような組み合わせは、Issue #61417のようなパフォーマンス問題を解決するための効果的なアプローチです。特に時間ベースのインデックスを持つ大規模データセットでは、カスタムIndexerの柔軟性とNumbaエンジンの計算効率の両方を活用できます。

おすすめの書籍

大規模データセット向けNumba+JITコンパイルの活用戦略

大規模データセットでは、前述のNumbaエンジンをさらに効果的に活用するための戦略が必要です。ここでは、JIT(Just-In-Time)コンパイルの最適化戦略を紹介します。

カスタムJIT関数の作成

Numbaの@jitデコレータを使って、カスタム関数を最適化できます。

import pandas as pd
import numpy as np
import numba

# nopythonモードでJIT関数を定義
@numba.jit(nopython=True)
def fast_rolling_mean(values, window_size):
    result = np.empty_like(values)
    result[:window_size-1] = np.nan  # 最初のwindow_size-1要素はNaN
    
    # 累積和を計算
    cumsum = 0.0
    for i in range(window_size-1):
        cumsum += values[i]
    
    # 移動平均を計算
    for i in range(window_size-1, len(values)):
        cumsum += values[i]
        result[i] = cumsum / window_size
        cumsum -= values[i - (window_size-1)]
    
    return result

# 使用例
df = pd.DataFrame({'A': np.random.randn(1_000_000)})
%time result = fast_rolling_mean(df['A'].values, 100)
# 通常のPandasメソッドより数倍〜数十倍高速

テーブルメソッドの活用

Pandas 1.3以降では、テーブル全体に対して操作を実行するmethod='table'オプションが追加されました。これはNumbaエンジンと組み合わせると特に効果的です。

import pandas as pd
import numpy as np

# テーブル全体に対するNumba最適化関数
def weighted_mean(x):
    # 最後の列をウェイトとして使用
    weights = x[:, -1]
    values = x[:, :-1]
    # ウェイト付き平均を計算
    return (values * weights.reshape(-1, 1)).sum(axis=0) / weights.sum()

# サンプルデータ
df = pd.DataFrame([
    [1, 2, 0.6],  # 最後の列をウェイトとして使用
    [2, 3, 0.4],
    [3, 4, 0.2],
    [4, 5, 0.7]
])

# テーブルメソッドを使用
result = df.rolling(
    2, 
    method="table", 
    min_periods=1
).apply(
    weighted_mean, 
    raw=True, 
    engine="numba"
)

print(result)

ufuncとvectorizeデコレータの活用

より高度な最適化のために、NumbaのUFuncを活用できます。

import numba
import pandas as pd
import numpy as np

# vectorizeデコレータを使用したUFunc
@numba.vectorize(["float64(float64, float64)"], nopython=True)
def fast_rolling_calc(value, prev_value):
    """カスタムマイクロ最適化関数"""
    if np.isnan(prev_value):
        return value
    return (value + prev_value) / 2

# 基本のrolling処理
def optimized_rolling_calc(series, window):
    values = series.values
    result = np.empty_like(values)
    result[0] = values[0]
    
    for i in range(1, len(values)):
        start_idx = max(0, i - window + 1)
        # UFuncを活用した高速計算
        window_values = values[start_idx:i+1]
        result[i] = fast_rolling_calc.reduce(window_values)
    
    return pd.Series(result, index=series.index)

# 使用例
series = pd.Series(np.random.randn(100_000))
%time result = optimized_rolling_calc(series, 20)

メモリ配置の最適化

Numbaの性能を最大限に引き出すには、メモリレイアウトを最適化することが重要です。

import pandas as pd
import numpy as np
import numba

# メモリレイアウトを最適化したJIT関数
@numba.jit(nopython=True, fastmath=True)
def optimized_memory_rolling(values):
    n = len(values)
    window = 10
    result = np.empty(n, dtype=np.float64)
    
    # 初期値設定
    for i in range(window - 1):
        result[i] = np.nan
    
    # メモリキャッシュに優しいアクセスパターン
    for i in range(window - 1, n):
        # 連続したメモリへのアクセス
        window_sum = 0.0
        for j in range(window):
            window_sum += values[i - j]
        result[i] = window_sum / window
    
    return result

# 使用例
df = pd.DataFrame({'A': np.random.randn(1_000_000)})
%time result = optimized_memory_rolling(df['A'].values)

この高度な最適化アプローチは、Issue #61417や他の多くのrolling windowパフォーマンス問題に対して非常に効果的です。特に、数百万行以上のデータや、複雑なカスタム計算が必要な場合に威力を発揮します。

おすすめの書籍

メモリ使用量を削減するrolling windowの最適化パターン

大規模なデータセットでは、パフォーマンスだけでなく、メモリ使用量も重要な課題です。Pandasのrolling window操作はメモリを大量に消費することがあるため、メモリ使用量を削減するためのテクニックを紹介します。

チャンク処理によるメモリフットプリントの削減

大きなDataFrameを一度に処理するのではなく、小さなチャンクに分割して処理することでメモリ使用量を削減できます。

import pandas as pd
import numpy as np

def memory_efficient_rolling(df, window, func, chunk_size=10000):
    """メモリ効率の良いrolling計算を行う関数"""
    n_rows = len(df)
    result_chunks = []
    
    # データをチャンクに分割して処理
    for start in range(0, n_rows, chunk_size):
        # オーバーラップを考慮したチャンク境界を計算
        end = min(start + chunk_size + window, n_rows)
        overlap_start = max(0, start - window + 1)
        
        # チャンクを処理(オーバーラップ部分を含む)
        chunk = df.iloc[overlap_start:end]
        chunk_result = chunk.rolling(window=window).apply(func, raw=True)
        
        # 有効な範囲のみを保持
        valid_start = start - overlap_start
        valid_end = valid_start + min(chunk_size, n_rows - start)
        valid_result = chunk_result.iloc[valid_start:valid_end]
        
        result_chunks.append(valid_result)
    
    # チャンク結果を結合
    return pd.concat(result_chunks)

# 使用例
df = pd.DataFrame({'A': np.random.randn(1_000_000)})
result = memory_efficient_rolling(df, window=100, func=np.mean, chunk_size=50000)

ジェネレータパターンの活用

大規模データセットを効率的に処理するためのもう一つのアプローチは、ジェネレータパターンを使ってデータを逐次的に処理することです。

import pandas as pd
import numpy as np
from itertools import islice

def rolling_window_generator(series, window):
    """rolling windowのジェネレータ関数"""
    # データの前処理
    values = series.values
    length = len(values)
    
    # 最初のウィンドウサイズ-1要素はNaN
    for i in range(window - 1):
        yield np.nan
    
    # 残りの要素に対して移動平均を計算
    for i in range(window - 1, length):
        yield np.mean(values[i - (window - 1):i + 1])

# 使用例
series = pd.Series(np.random.randn(1_000_000))
rolling_means = pd.Series(rolling_window_generator(series, 100))

累積計算手法の活用

可能な計算では、累積的なアプローチを使用することで計算を効率化できます。

import pandas as pd
import numpy as np

def efficient_rolling_sum(series, window):
    """累積和を使った効率的なrolling sum計算"""
    values = series.values
    n = len(values)
    result = np.empty(n)
    result[:window-1] = np.nan
    
    # 累積和を計算
    cumsum = np.cumsum(values)
    
    # ウィンドウの計算
    result[window-1:] = cumsum[window-1:] - np.hstack(([0], cumsum[:-window]))
    
    return pd.Series(result, index=series.index)

# 使用例
series = pd.Series(np.random.randn(1_000_000))
rolling_sums = efficient_rolling_sum(series, 100)

メモリ使用量のモニタリング

パフォーマンスの最適化を行う際に、メモリ使用量をモニタリングすることが重要です。Pythonのmemory_profilerを使用して、コードのメモリ使用量を追跡できます。

# !pip install memory_profiler
from memory_profiler import profile

@profile
def test_rolling_methods():
    # 標準のPandas実装
    df = pd.DataFrame({'A': np.random.randn(1_000_000)})
    result1 = df['A'].rolling(100).mean()
    
    # メモリ効率の良い実装
    result2 = efficient_rolling_sum(df['A'], 100) / 100
    
    # 結果の検証
    np.testing.assert_allclose(result1[99:], result2[99:], rtol=1e-10)
    
    return result1, result2

# メモリプロファイリングの実行
results = test_rolling_methods()

これらのメモリ最適化テクニックを組み合わせることで、Pandas Issue #61417のようなパフォーマンス問題に対してより効率的な解決策を提供できます。特に、数百万行を超えるデータセットを扱う場合には、これらのアプローチが非常に有効です。

以上の最適化テクニックを活用することで、Pandasのrolling window操作のパフォーマンスを大幅に向上させることができます。これらの手法は実際のデータ分析プロジェクトで役立つだけでなく、Issue #61417のような具体的な問題を解決するための実践的なアプローチを提供します。

おすすめの書籍

おすすめコンテンツ