Tasuke Hubのロゴ

ITを中心に困っている人を助けるメディア

分かりやすく解決策を提供することで、あなたの困ったをサポート。 全ての人々がスムーズに生活できる世界を目指します。

Pythonプログラムが遅い?プロファイリングでパフォーマンスボトルネックを特定する方法

記事のサムネイル
TH

Tasuke Hub管理人

東証プライム市場上場企業エンジニア

情報系修士卒業後、大手IT企業にてフルスタックエンジニアとして活躍。 Webアプリケーション開発からクラウドインフラ構築まで幅広い技術に精通し、 複数のプロジェクトでリードエンジニアを担当。 技術ブログやオープンソースへの貢献を通じて、日本のIT技術コミュニティに積極的に関わっている。

🎓情報系修士🏢東証プライム上場企業💻フルスタックエンジニア📝技術ブログ執筆者

Pythonプログラムのパフォーマンス問題を特定する重要性

Pythonは読みやすく書きやすい言語として人気がありますが、大規模なデータ処理や複雑な計算を行うと、実行速度が課題になることがあります。「遅いのはPythonだから仕方ない」と諦めていませんか?実は、多くの場合、プログラムのボトルネックを特定して最適化することで、驚くほどパフォーマンスを向上させることができます。

プロファイリングとは、プログラムの実行時間やメモリ使用量などを測定・分析することで、パフォーマンスのボトルネックを特定するプロセスです。これにより、最適化の労力を正しい場所に集中させることができます。

「測定できないものは改善できない」- ピーター・ドラッカー

この記事では、Pythonプログラムのパフォーマンス問題を特定し、効率的に解決するための実践的な方法を紹介します。長い処理時間に悩まされることなく、スムーズに動作するPythonプログラムを書けるようになりましょう。

なぜPythonプログラムは遅くなるのか?よくある原因と基本的な考え方

Pythonプログラムのパフォーマンスが低下する理由はさまざまですが、最も一般的な原因をいくつか見ていきましょう。

1. 非効率なループと反復処理

Pythonはインタープリタ言語のため、ループ内の処理は特に遅くなりがちです。特に入れ子になったループや大量のデータを処理するループは要注意です。

# 非効率なループの例
result = []
for i in range(10000):
    for j in range(10000):
        if i * j > 1000:
            result.append((i, j))

2. 不適切なデータ構造の選択

適切なデータ構造を選ばないと、検索や挿入などの操作が遅くなります。例えば、リストを使った要素の検索は、要素数に比例して遅くなります。

# リストでの検索(遅い)
my_list = list(range(100000))
if 99999 in my_list:  # O(n)の時間複雑度
    print("Found!")

# 辞書やセットでの検索(速い)
my_set = set(range(100000))
if 99999 in my_set:  # O(1)の時間複雑度
    print("Found!")

3. 余分なメモリ割り当て

大量のデータをメモリに保持すると、ガベージコレクションの負荷が高まり、パフォーマンスが低下します。特に一時的な大きなリストや辞書を作成する処理には注意が必要です。

4. 外部ライブラリの非効率な使用

NumPy、Pandas、SciPyなどの高速な外部ライブラリがあるにもかかわらず、純粋なPythonで実装することでパフォーマンスが低下することがあります。

5. I/O操作のブロッキング

ファイル操作やネットワーク通信などのI/O操作は、処理をブロックしてプログラム全体を遅くすることがあります。

これらの問題を特定するには、「どこを改善すべきか」を正確に把握する必要があります。これがプロファイリングの出番です。プロファイリングにより、どの関数や行が最も時間やメモリを消費しているかを特定し、効率的に最適化することができます。

プロファイリングの基本的な考え方は、「推測ではなく測定に基づいて最適化する」ということです。自分が遅いと思っている部分が、実際には問題ではないことがよくあります。データに基づいて最適化することで、最大の効果を得ることができます。

cProfileを使った基本的なプロファイリング方法

Pythonの標準ライブラリには、cProfileというプロファイリングツールが含まれています。これは、関数ごとの呼び出し回数と実行時間を測定するのに最適です。まずは基本的な使い方を見ていきましょう。

基本的な使い方

import cProfile

def slow_function():
    result = []
    for i in range(1000):
        for j in range(1000):
            result.append(i * j)
    return result

# 関数のプロファイリング
cProfile.run('slow_function()')

実行すると、以下のような結果が表示されます:

         4000004 function calls in 1.326 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.191    0.191    1.326    1.326 <string>:1(<module>)
        1    1.135    1.135    1.135    1.135 example.py:3(slow_function)
        1    0.000    0.000    1.326    1.326 {built-in method builtins.exec}
  4000000    0.000    0.000    0.000    0.000 {method 'append' of 'list' objects}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}

この結果は以下のように解釈できます:

  • ncalls: 関数が呼び出された回数
  • tottime: 関数自身の実行に費やした合計時間(サブ関数の呼び出しを除く)
  • percall: 1回の呼び出しあたりの平均時間(tottime / ncalls)
  • cumtime: 関数とそのサブ関数の実行に費やした累積時間
  • percall: 1回の呼び出しあたりの平均累積時間(cumtime / ncalls)

より詳細なプロファイリング

より詳細な結果を得るには、Statsオブジェクトを使って結果をソートしたり、フィルタリングしたりできます:

import cProfile
import pstats
from pstats import SortKey

# プロファイリング結果をファイルに保存
cProfile.run('slow_function()', 'stats.prof')

# 結果を読み込み、分析
p = pstats.Stats('stats.prof')
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(10)  # 累積時間でソートし、上位10項目を表示

実践的なケース:大きなファイルの処理

実際のプロジェクトでの例を見てみましょう。大きなCSVファイルを処理する関数をプロファイリングします:

import cProfile
import pstats
import pandas as pd

def process_csv(filename):
    # データの読み込み
    df = pd.read_csv(filename)
    
    # データの処理
    result = []
    for idx, row in df.iterrows():
        # 何らかの処理
        processed = row['value'] * 2
        result.append(processed)
    
    # 結果を返す
    return sum(result)

# プロファイリングの実行
cProfile.run('process_csv("large_file.csv")', 'csv_stats.prof')

# 結果の分析
p = pstats.Stats('csv_stats.prof')
p.sort_stats(pstats.SortKey.TIME).print_stats(5)  # 実行時間でソートし、上位5項目を表示

このプロファイリング結果から、次のようなことがわかるかもしれません:

  1. pd.read_csvが全体の処理時間の大部分を占めている
  2. df.iterrows()が遅い
  3. result.append()の呼び出しが多すぎる

このような情報に基づいて、次のような最適化が考えられます:

def optimized_process_csv(filename):
    # データの読み込み(チャンクで読み込む)
    result = 0
    for chunk in pd.read_csv(filename, chunksize=10000):
        # ベクトル化操作を使用(forループを避ける)
        result += (chunk['value'] * 2).sum()
    
    return result

cProfileは基本的なプロファイリングに最適ですが、より詳細な行レベルの情報や、メモリ使用量の分析には、別のツールが必要です。次のセクションでは、より詳細なプロファイリングツールを紹介します。

line_profilerで行レベルのパフォーマンス測定をマスターする

cProfileは関数レベルのプロファイリングには最適ですが、関数内のどの行が最も時間を消費しているかを知りたい場合は、line_profilerが役立ちます。このツールを使えば、コードの行ごとの実行時間を測定することができます。

line_profilerのインストール

まず、line_profilerをインストールします:

pip install line_profiler

基本的な使い方

line_profilerを使うには、プロファイリングしたい関数に@profileデコレータを追加します:

# sample_line_profile.py
@profile
def slow_function():
    result = []
    for i in range(1000):
        for j in range(1000):
            if i * j > 10000:
                result.append(i * j)
    return result

if __name__ == '__main__':
    slow_function()

次に、kernprof(line_profilerに含まれるコマンドラインツール)を使ってプロファイリングを実行します:

kernprof -l sample_line_profile.py

実行が完了すると、.lprof拡張子のファイルが作成されます。このファイルの内容を表示するには:

python -m line_profiler sample_line_profile.py.lprof

以下のような出力が表示されます:

Timer unit: 1e-06 s

Total time: 1.40256 s
File: sample_line_profile.py
Function: slow_function at line 2

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     2                                           @profile
     3                                           def slow_function():
     4         1          2.0      2.0      0.0      result = []
     5      1001       1023.0      1.0      0.1      for i in range(1000):
     6   1001000     980511.0      1.0     69.9          for j in range(1000):
     7   1000000     420000.0      0.4     29.9              if i * j > 10000:
     8     10000       1024.0      0.1      0.1                  result.append(i * j)
     9         1          0.0      0.0      0.0      return result

この出力から、以下のことがわかります:

  • Line #: ソースコードの行番号
  • Hits: その行が実行された回数
  • Time: その行の実行にかかった合計時間(マイクロ秒)
  • Per Hit: 1回の実行あたりの平均時間(マイクロ秒)
  • % Time: その行が関数の合計実行時間に占める割合
  • Line Contents: 実際のコード行

Jupyterノートブックでの使用

Jupyterノートブックでline_profilerを使用するには、以下のようにします:

%load_ext line_profiler

def slow_function():
    result = []
    for i in range(1000):
        for j in range(1000):
            if i * j > 10000:
                result.append(i * j)
    return result

%lprun -f slow_function slow_function()

実践例:データ処理パイプラインのプロファイリング

例として、画像処理パイプラインの各ステップをプロファイリングしてみましょう:

# image_processing.py
import numpy as np
from PIL import Image
import time

@profile
def process_image(image_path):
    # 画像を読み込む
    img = Image.open(image_path)
    img_array = np.array(img)
    
    # グレースケールに変換
    gray = np.mean(img_array, axis=2).astype(np.uint8)
    
    # エッジ検出(簡易版)
    edges = np.zeros_like(gray)
    for i in range(1, gray.shape[0]-1):
        for j in range(1, gray.shape[1]-1):
            gx = int(gray[i+1, j]) - int(gray[i-1, j])
            gy = int(gray[i, j+1]) - int(gray[i, j-1])
            edges[i, j] = min(255, int(np.sqrt(gx*gx + gy*gy)))
    
    # 閾値処理
    threshold = 100
    binary = (edges > threshold).astype(np.uint8) * 255
    
    return binary

if __name__ == '__main__':
    process_image('sample_image.jpg')

このコードをline_profilerで分析すると、エッジ検出の二重forループが最も時間を消費していることがわかるでしょう。この情報に基づいて、以下のように最適化できます:

@profile
def optimized_process_image(image_path):
    # 画像を読み込む
    img = Image.open(image_path)
    img_array = np.array(img)
    
    # グレースケールに変換
    gray = np.mean(img_array, axis=2).astype(np.uint8)
    
    # エッジ検出(NumPyのベクトル化操作を使用)
    gx = np.zeros_like(gray)
    gy = np.zeros_like(gray)
    gx[1:-1, :] = gray[2:, :] - gray[:-2, :]
    gy[:, 1:-1] = gray[:, 2:] - gray[:, :-2]
    edges = np.sqrt(gx**2 + gy**2).clip(0, 255).astype(np.uint8)
    
    # 閾値処理
    threshold = 100
    binary = (edges > threshold).astype(np.uint8) * 255
    
    return binary

line_profilerは、コードのどの部分が最も時間を消費しているかを特定し、最適化の努力を正しい方向に導くための強力なツールです。

memory_profilerでメモリ使用量のボトルネックを発見する

実行時間だけでなく、メモリ使用量もPythonプログラムのパフォーマンスに大きく影響します。特に大規模なデータセットを扱う場合、メモリ使用量のボトルネックを特定することが重要です。memory_profilerを使うと、各行のメモリ使用量を測定することができます。

memory_profilerのインストール

まず、memory_profilerをインストールします:

pip install memory_profiler

基本的な使い方

memory_profilerを使うには、line_profilerと同様に、プロファイリングしたい関数に@profileデコレータを追加します:

# sample_memory_profile.py
from memory_profiler import profile

@profile
def memory_hungry_function():
    # 大きなリストを作成
    big_list = [i for i in range(1000000)]
    
    # さらに大きなリストを作成
    bigger_list = [i * 2 for i in big_list]
    
    # 辞書を作成
    big_dict = {i: i * 3 for i in range(100000)}
    
    return len(bigger_list) + len(big_dict)

if __name__ == '__main__':
    memory_hungry_function()

次に、以下のコマンドでスクリプトを実行します:

python -m memory_profiler sample_memory_profile.py

以下のような出力が表示されます:

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     4     16.7 MiB     16.7 MiB           1   @profile
     5                                         def memory_hungry_function():
     6                                             # 大きなリストを作成
     7     24.5 MiB      7.8 MiB           1       big_list = [i for i in range(1000000)]
     8                                             
     9                                             # さらに大きなリストを作成
    10     40.1 MiB     15.6 MiB           1       bigger_list = [i * 2 for i in big_list]
    11                                             
    12                                             # 辞書を作成
    13     47.3 MiB      7.2 MiB           1       big_dict = {i: i * 3 for i in range(100000)}
    14                                             
    15     47.3 MiB      0.0 MiB           1       return len(bigger_list) + len(big_dict)

この出力から、以下のことがわかります:

  • Mem usage: その行の実行後のメモリ使用量
  • Increment: その行によって増加したメモリ使用量
  • Line Contents: 実際のコード行

Jupyterノートブックでの使用

Jupyterノートブックでmemory_profilerを使用するには、以下のようにします:

%load_ext memory_profiler

@profile
def memory_hungry_function():
    # 大きなリストを作成
    big_list = [i for i in range(1000000)]
    
    # さらに大きなリストを作成
    bigger_list = [i * 2 for i in big_list]
    
    # 辞書を作成
    big_dict = {i: i * 3 for i in range(100000)}
    
    return len(bigger_list) + len(big_dict)

%memit memory_hungry_function()  # 関数全体のメモリ使用量を測定
%mprun -f memory_hungry_function memory_hungry_function()  # 行ごとのメモリ使用量を測定

実践例:大きなデータフレームの処理

例として、大きなCSVファイルを読み込み、処理するコードのメモリ使用量をプロファイリングしてみましょう:

# memory_pandas.py
import pandas as pd
from memory_profiler import profile

@profile
def process_dataframe(filename):
    # CSVファイルを読み込む
    df = pd.read_csv(filename)
    
    # 複数の新しいカラムを作成
    df['squared'] = df['value'] ** 2
    df['cubed'] = df['value'] ** 3
    
    # グループ化と集計
    result = df.groupby('category').agg({
        'value': ['mean', 'sum', 'count'],
        'squared': ['mean', 'sum'],
        'cubed': ['mean', 'sum']
    })
    
    return result

if __name__ == '__main__':
    process_dataframe('large_data.csv')

このプロファイリング結果から、以下のようなことがわかるかもしれません:

  1. CSVファイルの読み込みで大量のメモリを使用している
  2. 新しいカラムの作成でメモリ使用量が増加している
  3. グループ化と集計の操作でもメモリを消費している

このような情報に基づいて、以下のように最適化することができます:

@profile
def optimized_process_dataframe(filename):
    # チャンクでCSVファイルを読み込む
    result_dfs = []
    for chunk in pd.read_csv(filename, chunksize=10000):
        # 必要なカラムだけを選択
        chunk = chunk[['category', 'value']]
        
        # 計算とグループ化を各チャンクで行う
        chunk['squared'] = chunk['value'] ** 2
        chunk['cubed'] = chunk['value'] ** 3
        
        # 各チャンクの結果を集計
        chunk_result = chunk.groupby('category').agg({
            'value': ['mean', 'sum', 'count'],
            'squared': ['mean', 'sum'],
            'cubed': ['mean', 'sum']
        })
        
        result_dfs.append(chunk_result)
    
    # 全チャンクの結果を結合
    final_result = pd.concat(result_dfs)
    
    # カテゴリごとに最終集計
    final_result = final_result.groupby(level=0).agg({
        ('value', 'mean'): 'mean',
        ('value', 'sum'): 'sum',
        ('value', 'count'): 'sum',
        ('squared', 'mean'): 'mean',
        ('squared', 'sum'): 'sum',
        ('cubed', 'mean'): 'mean',
        ('cubed', 'sum'): 'sum'
    })
    
    return final_result

memory_profilerを使うことで、メモリを大量に消費しているコードの部分を特定し、メモリ使用量を最適化することができます。大規模なデータを処理する場合は特に重要なツールです。

実践例:プロファイリング結果に基づいたコード最適化テクニック

ここまで、各種プロファイリングツールの使い方を見てきましたが、実際のコード最適化はどのように行えばよいのでしょうか?プロファイリング結果に基づいた最適化の実例を紹介します。

実践例1:データ処理パイプラインの最適化

大量のテキストファイルから単語の出現頻度を集計するプログラムを考えてみましょう:

# 最適化前のコード
def count_word_frequencies(file_paths):
    all_words = []
    
    # すべてのファイルから単語を収集
    for file_path in file_paths:
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read().lower()
            words = text.split()
            all_words.extend(words)
    
    # 単語の頻度を数える
    word_freq = {}
    for word in all_words:
        if word in word_freq:
            word_freq[word] += 1
        else:
            word_freq[word] = 1
    
    # 結果をソートして返す
    sorted_freq = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
    return sorted_freq

このコードをプロファイリングすると、次のような問題が見つかるかもしれません:

  1. すべての単語をall_wordsリストに収集しているため、メモリ使用量が多い
  2. 単語の頻度を数える際にディクショナリの検索を繰り返しているため、処理が遅い

これらの問題を解決するために、以下のように最適化できます:

# 最適化後のコード
from collections import Counter

def optimized_count_word_frequencies(file_paths):
    word_freq = Counter()
    
    # ファイルごとに処理し、直接Counterに追加
    for file_path in file_paths:
        with open(file_path, 'r', encoding='utf-8') as f:
            # 行ごとに処理してメモリ使用量を削減
            for line in f:
                words = line.lower().split()
                word_freq.update(words)
    
    # 結果をソートして返す
    return word_freq.most_common()

最適化のポイント:

  • all_wordsリストを作らず、直接Counterオブジェクトに追加することでメモリ使用量を削減
  • ファイルを一度に読み込むのではなく、行ごとに処理することでメモリ使用量をさらに削減
  • Counterクラスを使用して単語のカウントを効率化

実践例2:数値計算の最適化

行列の乗算を行う関数を最適化してみましょう:

# 最適化前のコード
def matrix_multiply(a, b):
    rows_a = len(a)
    cols_a = len(a[0])
    cols_b = len(b[0])
    
    # 結果行列を初期化
    result = [[0 for _ in range(cols_b)] for _ in range(rows_a)]
    
    # 行列の乗算
    for i in range(rows_a):
        for j in range(cols_b):
            for k in range(cols_a):
                result[i][j] += a[i][k] * b[k][j]
    
    return result

line_profilerを使ってこのコードを分析すると、三重のforループが非常に遅いことがわかるでしょう。NumPyを使って最適化します:

# 最適化後のコード
import numpy as np

def optimized_matrix_multiply(a, b):
    # NumPy配列に変換
    a_array = np.array(a)
    b_array = np.array(b)
    
    # NumPyの行列乗算を使用
    result = np.matmul(a_array, b_array)
    
    return result

最適化のポイント:

  • PythonのネイティブなリストではなくNumPy配列を使用
  • NumPyの最適化された行列乗算関数を利用
  • Cで実装された高速なライブラリを活用

一般的な最適化テクニック

プロファイリング結果に基づいた最適化の一般的なテクニックをまとめます:

  1. ループの最適化

    • リスト内包表記を使う
    • ネストしたループを避ける
    • ループ内で不要な計算を避ける
  2. データ構造の選択

    • 検索が頻繁な場合は辞書やセットを使う
    • 順序付きデータの場合はリストよりもcollections.dequeを検討
    • 多くの集計操作を行う場合はcollections.Counterを使う
  3. メモリ最適化

    • ジェネレータ式を使って大きなリストの生成を避ける
    • 一時変数を再利用する
    • 不要なオブジェクトは明示的に削除する
  4. 外部ライブラリの活用

    • 数値計算にはNumPy
    • データ処理にはPandas
    • 行列演算にはSciPy
  5. 並列処理と非同期処理

    • multiprocessingモジュールでCPUバウンドな処理を並列化
    • asyncioモジュールでI/Oバウンドな処理を非同期化

「シンプルなコードが最速のコードではない。しかし、複雑なコードは理解しにくく、バグが入りやすい。最適化と可読性のバランスを常に意識しよう。」- Pythonの格言(筆者による)

最後に、プロファイリングと最適化のプロセスをまとめます:

  1. 測定: まずプロファイリングツールを使って問題を特定する
  2. 分析: 結果を分析し、ボトルネックを見つける
  3. 改善: 特定された問題に対して最適化を行う
  4. 検証: 再度プロファイリングを行い、改善を確認する
  5. 繰り返し: 必要に応じてこのプロセスを繰り返す

このサイクルを繰り返すことで、Pythonプログラムのパフォーマンスを大幅に向上させることができます。常に測定に基づいて最適化を行い、推測に頼らないようにしましょう。

おすすめの書籍

おすすめコンテンツ