Node.jsのStreamで大きなファイルを効率的に処理する方法

Node.jsのStreamで大きなファイルを効率的に処理する方法
大きなファイル処理でよくあるメモリ問題
Node.jsでファイル処理を行う際、最もよく使われる方法はfs.readFile()
メソッドです。しかし、これには大きな落とし穴があります。このメソッドはファイル全体をメモリに読み込むため、大きなファイルを処理しようとすると「JavaScript heap out of memory」というエラーに悩まされることになります。
例えば、以下のようなコードで大きなファイルを読み込もうとしてみましょう:
const fs = require('fs');
fs.readFile('large-file.csv', 'utf8', (err, data) => {
if (err) throw err;
// ファイル全体がメモリに読み込まれる
console.log(`ファイルサイズ: ${data.length} バイト`);
// 何らかの処理を行う...
const lines = data.split('\n');
console.log(`行数: ${lines.length}`);
});
1GBを超えるファイルでこのコードを実行すると、Node.jsのデフォルトのメモリ制限(約1.7GB)を超えてしまい、プログラムがクラッシュします。これは特に本番環境で大きなログファイルやデータセットを処理する際に深刻な問題となります。
この問題を解決するのがStream API — ファイルを一度に全部ではなく、小さな「チャンク」に分けて処理する仕組みです。
Stream APIの基本概念と仕組み
Node.jsのStreamは、データを小さな塊(チャンク)で扱うためのインターフェースです。Streamは基本的に次の4種類があります:
- Readable Stream - データを読み込むためのストリーム
- Writable Stream - データを書き込むためのストリーム
- Duplex Stream - 読み込み・書き込みの両方が可能なストリーム
- Transform Stream - データを変換するための特殊なDuplex Stream
Streamの最大の利点は、一度にメモリに読み込むデータ量を制限できることです。これにより、Node.jsアプリケーションは非常に少ないメモリ使用量で大量のデータを処理できます。
以下はStreamの基本的な動作を表した図です:
ファイル全体 → [チャンク1] → 処理 → [チャンク2] → 処理 → ... → 完了
Stream処理では、チャンクが利用可能になったときにイベントが発火し、そのチャンクのみを処理することができます。これにより、ファイル全体がメモリに読み込まれるのを待つ必要がなくなります。
Node.jsでは、多くの組み込みモジュールがStreamインターフェースを実装しています:
fs
モジュールは、ファイルの読み書きにストリームを提供http
モジュールでは、リクエストとレスポンスがストリームzlib
モジュールは、圧縮・解凍処理にストリームを使用
最も一般的なのは、ファイル操作におけるストリームの使用です。
読み取りストリームでファイルを少しずつ処理する
Readable Stream(読み取りストリーム)を使うと、ファイルを少しずつメモリに読み込みながら処理できます。以下に基本的な使い方を示します:
const fs = require('fs');
// 読み取りストリームを作成
const readStream = fs.createReadStream('large-file.csv', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KBのチャンクサイズを指定
});
// 統計情報の初期化
let totalBytes = 0;
let lineCount = 0;
let buffer = '';
// dataイベント:新しいチャンクが利用可能になったときに発火
readStream.on('data', (chunk) => {
totalBytes += chunk.length;
// 不完全な行を扱うためのバッファリング処理
buffer += chunk;
const lines = buffer.split('\n');
// 最後の行は不完全かもしれないので取っておく
buffer = lines.pop();
// 完全な行だけをカウント/処理
lineCount += lines.length;
// 各行に対して何らかの処理を行う
for (const line of lines) {
// ここで行ごとの処理を実装
// 例:CSVの解析、特定のパターンの検索、データの集計など
}
});
// endイベント:ストリームの終了時に発火
readStream.on('end', () => {
// 残りのバッファを処理
if (buffer.length > 0) {
lineCount++;
// バッファの最後の行を処理
}
console.log(`処理完了!合計 ${totalBytes} バイト、${lineCount} 行を処理しました。`);
});
// errorイベント:エラー処理
readStream.on('error', (err) => {
console.error('ストリーム処理エラー:', err);
});
このコードの重要なポイント:
highWaterMark
オプションでチャンクサイズを設定できます(デフォルトは64KB)data
イベントで各チャンクを受け取り処理します- CSVなどの行指向データでは、行が複数のチャンクにまたがる可能性があるため、バッファリング処理が必要です
end
イベントでストリームの終了を検知し、最終処理を行います
このアプローチを使えば、数GBのファイルでも数MBのメモリで効率的に処理できます。
書き込みストリームでメモリ効率の良い出力
大きなファイルを作成する場合も、ストリームを使うことでメモリ効率を高めることができます。Writable Stream(書き込みストリーム)を使うと、データを小さなチャンクで書き込み、メモリ使用量を抑えられます。
const fs = require('fs');
// 書き込みストリームを作成
const writeStream = fs.createWriteStream('output-large-file.csv', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KBのチャンクサイズ
});
// 大量のデータを生成して書き込む例
function writeLotsOfData() {
// 例えば100万行のデータを書き込む
const ROWS_TO_WRITE = 1000000;
let rowsWritten = 0;
// メモリ効率の良い書き込み関数
function writeNextBatch() {
let ok = true;
// 1000行ずつバッチ処理
while (rowsWritten < ROWS_TO_WRITE && ok) {
const data = `Row ${rowsWritten},Value ${Math.random()},TimeStamp ${new Date().toISOString()}\n`;
rowsWritten++;
// バックプレッシャーを処理
// writeStream.write()はバッファが満杯になるとfalseを返す
ok = writeStream.write(data);
}
// バッファが満杯になった場合、drainイベントを待ってから続行
if (rowsWritten < ROWS_TO_WRITE) {
writeStream.once('drain', writeNextBatch);
} else {
// 全データ書き込み完了
writeStream.end();
}
}
// 書き込み開始
writeNextBatch();
}
// 書き込み完了イベント
writeStream.on('finish', () => {
console.log('ファイルへの書き込みが完了しました!');
});
// エラーイベント
writeStream.on('error', (err) => {
console.error('書き込みエラー:', err);
});
// 実行
writeLotsOfData();
このコードの重要なポイント:
- バックプレッシャー処理:
write()
メソッドがfalse
を返したら書き込みを一時停止し、drain
イベントを待ちます - これにより、書き込みバッファがオーバーフローすることなく、メモリ使用量を一定に保てます
end()
メソッドでストリームを閉じることを忘れないようにしましょうfinish
イベントで書き込み完了を検知できます
このアプローチは、大量のログ出力、レポート生成、データエクスポートなどで役立ちます。
パイプを使った効率的なデータ変換処理
Node.jsのStreamの最も強力な機能の一つが「パイプ」です。パイプを使うと、あるストリームの出力を別のストリームの入力に直接接続できます。複数のストリームを連結させることで、データ処理パイプラインを作成できます。
基本的なパイプの構文は次のとおりです:
readableStream.pipe(writableStream);
複数のストリームをチェーンすることも可能です:
readableStream
.pipe(transformStream1)
.pipe(transformStream2)
.pipe(writableStream);
実際の例を見てみましょう。大きなCSVファイルを読み込み、特定の条件でフィルタリングして新しいCSVに書き出す処理です:
const fs = require('fs');
const { Transform } = require('stream');
// 読み取りストリームを作成
const readStream = fs.createReadStream('large-data.csv', {
encoding: 'utf8'
});
// 書き込みストリームを作成
const writeStream = fs.createWriteStream('filtered-data.csv', {
encoding: 'utf8'
});
// 変換ストリームを作成
const filterTransform = new Transform({
// テキストモードで処理
decodeStrings: false,
encoding: 'utf8',
// 変換関数
transform(chunk, encoding, callback) {
// チャンクを行に分割
const lines = chunk.toString().split('\n');
let result = '';
// 各行をフィルタリング(例:金額が1000以上の行だけを抽出)
for (const line of lines) {
const columns = line.split(',');
// 2列目が数値で1000以上の行だけを通過させる
if (columns.length > 1 && !isNaN(columns[1]) && Number(columns[1]) >= 1000) {
result += line + '\n';
}
}
// 変換結果をプッシュ
callback(null, result);
}
});
// ストリームをパイプでつなげる
readStream
.pipe(filterTransform)
.pipe(writeStream);
// 完了イベント
writeStream.on('finish', () => {
console.log('フィルタリング処理が完了しました!');
});
// エラー処理
readStream.on('error', err => console.error('読み込みエラー:', err));
filterTransform.on('error', err => console.error('変換エラー:', err));
writeStream.on('error', err => console.error('書き込みエラー:', err));
このコードの重要なポイント:
- Transform Streamを使ってデータの変換処理を実装しています
- パイプで複数のストリームを連結することで、データの流れがシンプルになります
- 各ストリームの責任が明確に分かれているため、コードの見通しがよくなります
- メモリ使用量が抑えられます - 一度に処理されるのはチャンク単位だけです
パイプを使うと、GZip圧縮やJSON変換など、複雑なデータ変換パイプラインも簡単に構築できます。
実践的なケーススタディ:10GBのログファイル分析
ここまでの知識を統合して、現実的なユースケースを考えてみましょう。たとえば、10GBのWebサーバーアクセスログファイルから有用な情報を抽出したいケースです。
以下の要件を考えます:
- 特定のURLパターンへのアクセスだけを抽出
- ステータスコード 500(サーバーエラー)を含むリクエストを特定
- 時間帯ごとのアクセス数をカウント
- メモリ使用量を最小限に保つ
以下のコードでこれを実現できます:
const fs = require('fs');
const { Transform } = require('stream');
const zlib = require('zlib');
// 大きなログファイルが gzip 圧縮されている想定
const logFilePath = '/var/log/access.log.gz';
// 結果を保存するファイル
const filteredLogPath = 'filtered-errors.log';
const statsFilePath = 'hourly-stats.json';
// 時間帯別アクセス統計用のオブジェクト
const hourlyStats = Array(24).fill(0);
const urlErrorCounts = new Map();
// URL パターンフィルター
const URL_PATTERN = '/api/';
// 最小限のメモリでログ行を処理するための変換ストリーム
const logProcessor = new Transform({
decodeStrings: false,
encoding: 'utf8',
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
let filteredLines = '';
for (const line of lines) {
// 簡易ログ解析(実際のログ形式に応じて調整が必要)
if (line.includes(URL_PATTERN)) {
// 時間帯の抽出(例: [10/May/2025:14:23:45 +0900] のような形式を想定)
const timeMatch = line.match(/\[.*?:(\d{2}):/);
if (timeMatch) {
const hour = parseInt(timeMatch[1], 10);
hourlyStats[hour]++;
}
// エラーログの抽出(500 エラー)
if (line.includes(' 500 ')) {
filteredLines += line + '\n';
// URLごとのエラー数カウント
const urlMatch = line.match(/"[A-Z]+ ([^ ]+) HTTP/);
if (urlMatch) {
const url = urlMatch[1];
urlErrorCounts.set(url, (urlErrorCounts.get(url) || 0) + 1);
}
}
}
}
// フィルタリングされたログ行を次のストリームに渡す
callback(null, filteredLines);
},
// ストリーム終了時の処理
flush(callback) {
// 時間帯別統計を JSON ファイルに書き出す
const stats = {
hourlyAccess: hourlyStats,
topErrorUrls: [...urlErrorCounts.entries()]
.sort((a, b) => b[1] - a[1])
.slice(0, 10)
.map(([url, count]) => ({ url, count }))
};
// 統計情報をファイルに書き出す処理は別のストリームで
fs.writeFile(statsFilePath, JSON.stringify(stats, null, 2), 'utf8', (err) => {
if (err) console.error('統計データ保存エラー:', err);
console.log(`統計データを ${statsFilePath} に保存しました`);
});
callback();
}
});
// 処理開始時間
const startTime = Date.now();
// パイプラインの構築
fs.createReadStream(logFilePath)
.pipe(zlib.createGunzip()) // gzip 圧縮解除
.pipe(logProcessor) // ログ処理
.pipe(fs.createWriteStream(filteredLogPath)) // フィルタリング結果の保存
.on('finish', () => {
const elapsed = (Date.now() - startTime) / 1000;
console.log(`処理が完了しました!所要時間: ${elapsed.toFixed(2)}秒`);
console.log(`エラーログを ${filteredLogPath} に保存しました`);
})
.on('error', (err) => {
console.error('ストリーム処理エラー:', err);
});
このコードは次のような優れた特性を持っています:
- 効率的なメモリ使用: 10GBのファイルでも数十MBのメモリで処理可能
- モジュール化: 各処理が明確に分離されている
- パイプライン: データの流れが直感的で理解しやすい
- 拡張性: 新しい変換ステップを追加するのが簡単
この例では、リアルタイムで統計情報を集めながら、エラーログのみを別ファイルに抽出しています。タスク全体が完了するまで待たずに、データの処理を開始できるところがStreamの大きな利点です。
10GBのログファイルを従来の方法で処理しようとすると、おそらくメモリ不足でクラッシュするか、非常に遅くなるでしょう。Stream APIを使えば、メモリ使用量を抑えながら効率的に処理できます。
このトピックはこちらの書籍で勉強するのがおすすめ!
この記事の内容をさらに深く理解したい方におすすめの一冊です。実践的な知識を身につけたい方は、ぜひチェックしてみてください!
おすすめコンテンツ
おすすめPython2025/5/12Pythonでのデータ変換・クリーニング:大規模データセットを効率的に処理する方法
大規模データセットを効率的に処理するためのPythonテクニックを解説します。メモリエラーや処理速度の問題を解決し、Pandas、NumPy、Dask、Vaexを活用した実践的なデータクリーニング手法...
続きを読む Docker2025/5/20Dockerコンテナ内Node.jsアプリをChromeDevToolsでリモートデバッグする方法
Dockerコンテナ内で動作するNode.jsアプリケーションをChromeDevToolsでリモートデバッグする具体的な手順と設定例を紹介します。コードの実行を一時停止して変数を調査したい開発者に役...
続きを読む Docker2025/5/20Docker環境でNode.jsのホットリロードが効かない問題の解決法
Docker環境でNode.jsアプリケーションを開発中にホットリロードが効かない問題に悩んでいませんか?この記事では、この特定の問題を解決するための具体的な対処法をシンプルに解説します。
続きを読む Docker2025/5/20Dockerコンテナ内Node.jsアプリの環境変数トラブル解決法
Dockerコンテナ内でNode.jsアプリケーションを実行すると、環境変数が正しく読み込まれない問題に遭遇することがあります。この記事では、具体的な原因と解決策を実用的なコード例で解説します。
続きを読む Flutter2025/5/14Flutter開発入門:クロスプラットフォームアプリを効率的に作る方法
Flutterをこれから始める方に向けた開発入門ガイド。環境構築から基本概念、UIコンポーネント、状態管理まで実践的なコード例で解説します。初心者でもスムーズに開発を始められる実用的な情報が満載です。
続きを読む IT技術2023/9/8あなたもReact JSXのマスターに!効率的な学習法とデプロイ方法を紹介
Reactアプリケーションのデプロイメントは、開発したアプリケーションをユーザーが利用できる状態にするための重要なフェーズです。ここでは、基本的なReactアプリケーションのデプロイメント手順について...
続きを読む IT技術2023/9/8ReactとJSXをマスターしたいあなたへ!デバッグのコツと効率的な学習方法
Reactのデバッグは、Reactアプリケーション開発における重要なプロセスの一つです。このパートでは、その重要性を強調し、実際のプロジェクトでのデバッグが発生する可能性のある問題について説明します。...
続きを読む AI2025/5/12【2025年最新】MLOpsの実践ガイド:機械学習モデルの運用を効率化する方法
機械学習モデルの開発から本番運用までを自動化・効率化するMLOpsの基本概念から実践的な導入方法まで解説します。初心者でもわかるCI/CDパイプラインの構築方法や監視ツールの選定など、具体的な実装例も...
続きを読む