Tasuke Hubのロゴ

ITを中心に困っている人を助けるメディア

分かりやすく解決策を提供することで、あなたの困ったをサポート。 全ての人々がスムーズに生活できる世界を目指します。

Node.jsのStreamで大きなファイルを効率的に処理する方法

記事のサムネイル
TH

Tasuke Hub管理人

東証プライム市場上場企業エンジニア

情報系修士卒業後、大手IT企業にてフルスタックエンジニアとして活躍。 Webアプリケーション開発からクラウドインフラ構築まで幅広い技術に精通し、 複数のプロジェクトでリードエンジニアを担当。 技術ブログやオープンソースへの貢献を通じて、日本のIT技術コミュニティに積極的に関わっている。

🎓情報系修士🏢東証プライム上場企業💻フルスタックエンジニア📝技術ブログ執筆者

Node.jsのStreamで大きなファイルを効率的に処理する方法

大きなファイル処理でよくあるメモリ問題

Node.jsでファイル処理を行う際、最もよく使われる方法はfs.readFile()メソッドです。しかし、これには大きな落とし穴があります。このメソッドはファイル全体をメモリに読み込むため、大きなファイルを処理しようとすると「JavaScript heap out of memory」というエラーに悩まされることになります。

例えば、以下のようなコードで大きなファイルを読み込もうとしてみましょう:

const fs = require('fs');

fs.readFile('large-file.csv', 'utf8', (err, data) => {
  if (err) throw err;
  
  // ファイル全体がメモリに読み込まれる
  console.log(`ファイルサイズ: ${data.length} バイト`);
  
  // 何らかの処理を行う...
  const lines = data.split('\n');
  console.log(`行数: ${lines.length}`);
});

1GBを超えるファイルでこのコードを実行すると、Node.jsのデフォルトのメモリ制限(約1.7GB)を超えてしまい、プログラムがクラッシュします。これは特に本番環境で大きなログファイルやデータセットを処理する際に深刻な問題となります。

この問題を解決するのがStream API — ファイルを一度に全部ではなく、小さな「チャンク」に分けて処理する仕組みです。

Stream APIの基本概念と仕組み

Node.jsのStreamは、データを小さな塊(チャンク)で扱うためのインターフェースです。Streamは基本的に次の4種類があります:

  1. Readable Stream - データを読み込むためのストリーム
  2. Writable Stream - データを書き込むためのストリーム
  3. Duplex Stream - 読み込み・書き込みの両方が可能なストリーム
  4. Transform Stream - データを変換するための特殊なDuplex Stream

Streamの最大の利点は、一度にメモリに読み込むデータ量を制限できることです。これにより、Node.jsアプリケーションは非常に少ないメモリ使用量で大量のデータを処理できます。

以下はStreamの基本的な動作を表した図です:

ファイル全体 → [チャンク1] → 処理 → [チャンク2] → 処理 → ... → 完了

Stream処理では、チャンクが利用可能になったときにイベントが発火し、そのチャンクのみを処理することができます。これにより、ファイル全体がメモリに読み込まれるのを待つ必要がなくなります。

Node.jsでは、多くの組み込みモジュールがStreamインターフェースを実装しています:

  • fs モジュールは、ファイルの読み書きにストリームを提供
  • http モジュールでは、リクエストとレスポンスがストリーム
  • zlib モジュールは、圧縮・解凍処理にストリームを使用

最も一般的なのは、ファイル操作におけるストリームの使用です。

読み取りストリームでファイルを少しずつ処理する

Readable Stream(読み取りストリーム)を使うと、ファイルを少しずつメモリに読み込みながら処理できます。以下に基本的な使い方を示します:

const fs = require('fs');

// 読み取りストリームを作成
const readStream = fs.createReadStream('large-file.csv', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KBのチャンクサイズを指定
});

// 統計情報の初期化
let totalBytes = 0;
let lineCount = 0;
let buffer = '';

// dataイベント:新しいチャンクが利用可能になったときに発火
readStream.on('data', (chunk) => {
  totalBytes += chunk.length;
  
  // 不完全な行を扱うためのバッファリング処理
  buffer += chunk;
  const lines = buffer.split('\n');
  
  // 最後の行は不完全かもしれないので取っておく
  buffer = lines.pop();
  
  // 完全な行だけをカウント/処理
  lineCount += lines.length;
  
  // 各行に対して何らかの処理を行う
  for (const line of lines) {
    // ここで行ごとの処理を実装
    // 例:CSVの解析、特定のパターンの検索、データの集計など
  }
});

// endイベント:ストリームの終了時に発火
readStream.on('end', () => {
  // 残りのバッファを処理
  if (buffer.length > 0) {
    lineCount++;
    // バッファの最後の行を処理
  }
  
  console.log(`処理完了!合計 ${totalBytes} バイト、${lineCount} 行を処理しました。`);
});

// errorイベント:エラー処理
readStream.on('error', (err) => {
  console.error('ストリーム処理エラー:', err);
});

このコードの重要なポイント:

  1. highWaterMark オプションでチャンクサイズを設定できます(デフォルトは64KB)
  2. dataイベントで各チャンクを受け取り処理します
  3. CSVなどの行指向データでは、行が複数のチャンクにまたがる可能性があるため、バッファリング処理が必要です
  4. endイベントでストリームの終了を検知し、最終処理を行います

このアプローチを使えば、数GBのファイルでも数MBのメモリで効率的に処理できます。

書き込みストリームでメモリ効率の良い出力

大きなファイルを作成する場合も、ストリームを使うことでメモリ効率を高めることができます。Writable Stream(書き込みストリーム)を使うと、データを小さなチャンクで書き込み、メモリ使用量を抑えられます。

const fs = require('fs');

// 書き込みストリームを作成
const writeStream = fs.createWriteStream('output-large-file.csv', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KBのチャンクサイズ
});

// 大量のデータを生成して書き込む例
function writeLotsOfData() {
  // 例えば100万行のデータを書き込む
  const ROWS_TO_WRITE = 1000000;
  let rowsWritten = 0;
  
  // メモリ効率の良い書き込み関数
  function writeNextBatch() {
    let ok = true;
    
    // 1000行ずつバッチ処理
    while (rowsWritten < ROWS_TO_WRITE && ok) {
      const data = `Row ${rowsWritten},Value ${Math.random()},TimeStamp ${new Date().toISOString()}\n`;
      rowsWritten++;
      
      // バックプレッシャーを処理
      // writeStream.write()はバッファが満杯になるとfalseを返す
      ok = writeStream.write(data);
    }
    
    // バッファが満杯になった場合、drainイベントを待ってから続行
    if (rowsWritten < ROWS_TO_WRITE) {
      writeStream.once('drain', writeNextBatch);
    } else {
      // 全データ書き込み完了
      writeStream.end();
    }
  }
  
  // 書き込み開始
  writeNextBatch();
}

// 書き込み完了イベント
writeStream.on('finish', () => {
  console.log('ファイルへの書き込みが完了しました!');
});

// エラーイベント
writeStream.on('error', (err) => {
  console.error('書き込みエラー:', err);
});

// 実行
writeLotsOfData();

このコードの重要なポイント:

  1. バックプレッシャー処理:write()メソッドがfalseを返したら書き込みを一時停止し、drainイベントを待ちます
  2. これにより、書き込みバッファがオーバーフローすることなく、メモリ使用量を一定に保てます
  3. end()メソッドでストリームを閉じることを忘れないようにしましょう
  4. finishイベントで書き込み完了を検知できます

このアプローチは、大量のログ出力、レポート生成、データエクスポートなどで役立ちます。

パイプを使った効率的なデータ変換処理

Node.jsのStreamの最も強力な機能の一つが「パイプ」です。パイプを使うと、あるストリームの出力を別のストリームの入力に直接接続できます。複数のストリームを連結させることで、データ処理パイプラインを作成できます。

基本的なパイプの構文は次のとおりです:

readableStream.pipe(writableStream);

複数のストリームをチェーンすることも可能です:

readableStream
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(writableStream);

実際の例を見てみましょう。大きなCSVファイルを読み込み、特定の条件でフィルタリングして新しいCSVに書き出す処理です:

const fs = require('fs');
const { Transform } = require('stream');

// 読み取りストリームを作成
const readStream = fs.createReadStream('large-data.csv', { 
  encoding: 'utf8' 
});

// 書き込みストリームを作成
const writeStream = fs.createWriteStream('filtered-data.csv', { 
  encoding: 'utf8' 
});

// 変換ストリームを作成
const filterTransform = new Transform({
  // テキストモードで処理
  decodeStrings: false,
  encoding: 'utf8',
  
  // 変換関数
  transform(chunk, encoding, callback) {
    // チャンクを行に分割
    const lines = chunk.toString().split('\n');
    let result = '';
    
    // 各行をフィルタリング(例:金額が1000以上の行だけを抽出)
    for (const line of lines) {
      const columns = line.split(',');
      
      // 2列目が数値で1000以上の行だけを通過させる
      if (columns.length > 1 && !isNaN(columns[1]) && Number(columns[1]) >= 1000) {
        result += line + '\n';
      }
    }
    
    // 変換結果をプッシュ
    callback(null, result);
  }
});

// ストリームをパイプでつなげる
readStream
  .pipe(filterTransform)
  .pipe(writeStream);

// 完了イベント
writeStream.on('finish', () => {
  console.log('フィルタリング処理が完了しました!');
});

// エラー処理
readStream.on('error', err => console.error('読み込みエラー:', err));
filterTransform.on('error', err => console.error('変換エラー:', err));
writeStream.on('error', err => console.error('書き込みエラー:', err));

このコードの重要なポイント:

  1. Transform Streamを使ってデータの変換処理を実装しています
  2. パイプで複数のストリームを連結することで、データの流れがシンプルになります
  3. 各ストリームの責任が明確に分かれているため、コードの見通しがよくなります
  4. メモリ使用量が抑えられます - 一度に処理されるのはチャンク単位だけです

パイプを使うと、GZip圧縮やJSON変換など、複雑なデータ変換パイプラインも簡単に構築できます。

実践的なケーススタディ:10GBのログファイル分析

ここまでの知識を統合して、現実的なユースケースを考えてみましょう。たとえば、10GBのWebサーバーアクセスログファイルから有用な情報を抽出したいケースです。

以下の要件を考えます:

  1. 特定のURLパターンへのアクセスだけを抽出
  2. ステータスコード 500(サーバーエラー)を含むリクエストを特定
  3. 時間帯ごとのアクセス数をカウント
  4. メモリ使用量を最小限に保つ

以下のコードでこれを実現できます:

const fs = require('fs');
const { Transform } = require('stream');
const zlib = require('zlib');

// 大きなログファイルが gzip 圧縮されている想定
const logFilePath = '/var/log/access.log.gz';

// 結果を保存するファイル
const filteredLogPath = 'filtered-errors.log';
const statsFilePath = 'hourly-stats.json';

// 時間帯別アクセス統計用のオブジェクト
const hourlyStats = Array(24).fill(0);
const urlErrorCounts = new Map();

// URL パターンフィルター
const URL_PATTERN = '/api/';
// 最小限のメモリでログ行を処理するための変換ストリーム
const logProcessor = new Transform({
  decodeStrings: false,
  encoding: 'utf8',
  
  transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');
    let filteredLines = '';
    
    for (const line of lines) {
      // 簡易ログ解析(実際のログ形式に応じて調整が必要)
      if (line.includes(URL_PATTERN)) {
        // 時間帯の抽出(例: [10/May/2025:14:23:45 +0900] のような形式を想定)
        const timeMatch = line.match(/\[.*?:(\d{2}):/);
        if (timeMatch) {
          const hour = parseInt(timeMatch[1], 10);
          hourlyStats[hour]++;
        }
        
        // エラーログの抽出(500 エラー)
        if (line.includes(' 500 ')) {
          filteredLines += line + '\n';
          
          // URLごとのエラー数カウント
          const urlMatch = line.match(/"[A-Z]+ ([^ ]+) HTTP/);
          if (urlMatch) {
            const url = urlMatch[1];
            urlErrorCounts.set(url, (urlErrorCounts.get(url) || 0) + 1);
          }
        }
      }
    }
    
    // フィルタリングされたログ行を次のストリームに渡す
    callback(null, filteredLines);
  },
  
  // ストリーム終了時の処理
  flush(callback) {
    // 時間帯別統計を JSON ファイルに書き出す
    const stats = {
      hourlyAccess: hourlyStats,
      topErrorUrls: [...urlErrorCounts.entries()]
        .sort((a, b) => b[1] - a[1])
        .slice(0, 10)
        .map(([url, count]) => ({ url, count }))
    };
    
    // 統計情報をファイルに書き出す処理は別のストリームで
    fs.writeFile(statsFilePath, JSON.stringify(stats, null, 2), 'utf8', (err) => {
      if (err) console.error('統計データ保存エラー:', err);
      console.log(`統計データを ${statsFilePath} に保存しました`);
    });
    
    callback();
  }
});

// 処理開始時間
const startTime = Date.now();

// パイプラインの構築
fs.createReadStream(logFilePath)
  .pipe(zlib.createGunzip())  // gzip 圧縮解除
  .pipe(logProcessor)         // ログ処理
  .pipe(fs.createWriteStream(filteredLogPath)) // フィルタリング結果の保存
  .on('finish', () => {
    const elapsed = (Date.now() - startTime) / 1000;
    console.log(`処理が完了しました!所要時間: ${elapsed.toFixed(2)}秒`);
    console.log(`エラーログを ${filteredLogPath} に保存しました`);
  })
  .on('error', (err) => {
    console.error('ストリーム処理エラー:', err);
  });

このコードは次のような優れた特性を持っています:

  1. 効率的なメモリ使用: 10GBのファイルでも数十MBのメモリで処理可能
  2. モジュール化: 各処理が明確に分離されている
  3. パイプライン: データの流れが直感的で理解しやすい
  4. 拡張性: 新しい変換ステップを追加するのが簡単

この例では、リアルタイムで統計情報を集めながら、エラーログのみを別ファイルに抽出しています。タスク全体が完了するまで待たずに、データの処理を開始できるところがStreamの大きな利点です。

10GBのログファイルを従来の方法で処理しようとすると、おそらくメモリ不足でクラッシュするか、非常に遅くなるでしょう。Stream APIを使えば、メモリ使用量を抑えながら効率的に処理できます。

このトピックはこちらの書籍で勉強するのがおすすめ!

この記事の内容をさらに深く理解したい方におすすめの一冊です。実践的な知識を身につけたい方は、ぜひチェックしてみてください!

おすすめ記事

おすすめコンテンツ