Tasuke Hubのロゴ

ITを中心に困っている人を助けるメディア

分かりやすく解決策を提供することで、あなたの困ったをサポート。 全ての人々がスムーズに生活できる世界を目指します。

Golangの並行処理パターン:効率的なアプリケーション開発のための実践ガイド

記事のサムネイル

Golangの並行処理:シンプルかつパワフルな実装方法

Goプログラミング言語(Golang)の最大の魅力の一つは、並行処理(concurrency)のシンプルさです。従来の言語では複雑な実装が必要だった並列処理を、Golangではgoroutinechannelという2つの核となる概念によって、非常に簡潔に記述できます。

並行処理を効率的に実装できることは、現代のマルチコアCPUを最大限に活用し、高パフォーマンスなアプリケーションを構築する上で大きなアドバンテージとなります。特にWebサーバーやバックエンドサービスなど、多数のリクエストを同時に処理する必要があるシステムでは不可欠な技術です。

TH

Tasuke Hub管理人

東証プライム市場上場企業エンジニア

情報系修士卒業後、大手IT企業にてフルスタックエンジニアとして活躍。 Webアプリケーション開発からクラウドインフラ構築まで幅広い技術に精通し、 複数のプロジェクトでリードエンジニアを担当。 技術ブログやオープンソースへの貢献を通じて、日本のIT技術コミュニティに積極的に関わっている。

🎓情報系修士🏢東証プライム上場企業💻フルスタックエンジニア📝技術ブログ執筆者
// 基本的なgoroutineの例
package main

import (
    "fmt"
    "time"
)

func main() {
    // goroutineを起動
    go func() {
        fmt.Println("別のgoroutineで実行中...")
        time.Sleep(1 * time.Second)
        fmt.Println("goroutine完了")
    }()
    
    fmt.Println("メインgoroutineは継続して実行中...")
    time.Sleep(2 * time.Second) // goroutineが完了するのを待つ
    fmt.Println("プログラム終了")
}

この例では、匿名関数をgoキーワードで起動することで、メイン処理と並行して実行しています。この簡潔さがGolangの大きな強みです。しかし、並行処理の実装には適切なパターンとプラクティスを理解することが重要で、そうでなければデッドロックやレースコンディションなどの問題が発生する可能性があります。

この記事では、Golangの並行処理の基本から、実務で使える実践的なパターンまで、コード例を交えて解説します。

goroutineの基本と効率的な使い方

goroutineはGoの並行処理の基本単位で、従来の言語におけるスレッドに似ていますが、より軽量に設計されています。goroutineはGoランタイムによって管理され、OSスレッドにマップされます。通常のスレッドが数MBのメモリを消費するのに対し、goroutineは初期状態で2KBほどしか消費せず、数千、数万のgoroutineを同時に実行することが可能です。

goroutineの起動は非常に簡単で、任意の関数の前にgoキーワードを付けるだけです:

func someFunction() {
    // 何か時間のかかる処理
}

func main() {
    // 並行実行
    go someFunction()
    
    // メイン処理は続行する
}

goroutineの待機とsync.WaitGroup

上記の例では、メイン関数がgoroutineの終了を待たずに終了してしまう可能性があります。goroutineの完了を待つには、sync.WaitGroupを使用します:

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 関数終了時にDone()を呼び出し
    
    fmt.Printf("Worker %d: 処理開始\n", id)
    time.Sleep(time.Second) // 何か時間のかかる処理
    fmt.Printf("Worker %d: 処理完了\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    // 5つのワーカーを起動
    for i := 1; i <= 5; i++ {
        wg.Add(1) // カウンタをインクリメント
        go worker(i, &wg)
    }
    
    // すべてのgoroutineの完了を待つ
    wg.Wait()
    fmt.Println("すべてのワーカーが処理を完了しました")
}

この例では、WaitGroupというカウンタを使用しています。Addメソッドでカウンタを増やし、goroutineが完了したらDoneメソッドでカウンタを減らします。Waitメソッドはカウンタが0になるまでブロックするため、すべてのgoroutineが完了するまでメイン関数が終了しません。

goroutineのメモリ使用と適切な数の選定

goroutineは軽量ですが、無制限に作成すると結果的にメモリを大量に消費する可能性があります。特に大量のgoroutineが長時間実行される場合や、各goroutineが大きなメモリ領域を確保する場合に注意が必要です。

以下は、goroutineの数を制限する簡単な例です:

func processItems(items []string, maxConcurrency int) {
    semaphore := make(chan struct{}, maxConcurrency)
    var wg sync.WaitGroup
    
    for _, item := range items {
        wg.Add(1)
        semaphore <- struct{}{} // 空いているスロットがなければブロック
        
        go func(item string) {
            defer func() {
                <-semaphore // スロットを解放
                wg.Done()
            }()
            
            // itemの処理
            processItem(item)
        }(item)
    }
    
    wg.Wait() // すべての処理が完了するまで待つ
}

func processItem(item string) {
    // 時間のかかる処理
    time.Sleep(100 * time.Millisecond)
    fmt.Println("Processed:", item)
}

この例では、チャネルをセマフォとして使用し、同時に実行するgoroutineの数をmaxConcurrencyに制限しています。これにより、リソース使用量を制御しながら並行処理のメリットを活かせます。

実務では、CPUコア数に基づいて最適なgoroutine数を決定することが多いです:

import "runtime"

func main() {
    // 論理CPUの数を取得
    numCPU := runtime.NumCPU()
    
    // 並行度をCPU数に基づいて設定
    // I/O待ちが多い場合はCPU数の数倍が効果的
    concurrency := numCPU * 2
    
    // 処理を実行
    processItems(items, concurrency)
}

この方法により、システムリソースを効率的に使用しながら最大限のパフォーマンスを得ることができます。

チャネルを使ったgoroutine間のコミュニケーション

Goにおける並行処理のもう一つの重要な概念が「チャネル(channel)」です。チャネルはgoroutine間でデータを安全に共有するための通信パイプラインです。Go言語の設計思想「メモリを共有してコミュニケーションするのではなく、コミュニケーションすることでメモリを共有する」を実現する中心的な機能です。

チャネルの基本

チャネルの作成と使用は以下のように行います:

package main

import "fmt"

func main() {
    // 整数型のチャネルを作成
    ch := make(chan int)
    
    // goroutineを起動してチャネルに値を送信
    go func() {
        ch <- 42 // チャネルに値を送信
    }()
    
    // チャネルから値を受信
    value := <-ch
    fmt.Println("受信した値:", value)
}

この例では、匿名goroutineがチャネルを通じて整数値42をメインgoroutineに送信しています。<-演算子はチャネルの方向を示し、ch <- 42はチャネルに値を送信、value := <-chはチャネルから値を受信することを意味します。

バッファ付きチャネルとブロッキング

デフォルトでは、チャネルは「バッファなし」で作成され、送信と受信が同期します。つまり、受信側が準備できていない限り送信はブロックし、送信側が値を提供するまで受信はブロックします。

バッファ付きチャネルを作成すると、指定したサイズまでのメッセージをバッファリングでき、バッファが満杯になるまで送信はブロックしません:

// バッファサイズ3のチャネルを作成
bufCh := make(chan string, 3)

// バッファが満杯になるまでブロックせずに送信可能
bufCh <- "first"
bufCh <- "second"
bufCh <- "third"

// バッファが満杯になると、次の送信は受信があるまでブロック
// bufCh <- "fourth" // これを実行するとデッドロック

// チャネルから値を受信
fmt.Println(<-bufCh) // "first"
fmt.Println(<-bufCh) // "second"
fmt.Println(<-bufCh) // "third"

select文を使った複数チャネルの監視

複数のチャネルを同時に監視するには、select文を使用します。これは、複数のチャネル操作のうち、最初に準備ができたものを実行する機能を提供します:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    // 2つのgoroutineを起動、それぞれ異なる時間で値を送信
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "チャネル1からのメッセージ"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "チャネル2からのメッセージ"
    }()
    
    // 両方のチャネルからメッセージを受信
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println(msg1)
        case msg2 := <-ch2:
            fmt.Println(msg2)
        }
    }
}

この例では、select文が2つのチャネルからの受信を同時に待機しています。どちらのチャネルが先に値を送信しても、その値を受信して処理を続行します。

チャネルのクローズとレンジループ

データの送信が完了したら、チャネルを閉じることができます。閉じられたチャネルからは値を受信できますが、閉じられたチャネルに送信するとパニックが発生します:

package main

import "fmt"

func generateNumbers(n int) <-chan int {
    out := make(chan int)
    
    go func() {
        defer close(out) // goroutine終了時にチャネルを閉じる
        
        for i := 0; i < n; i++ {
            out <- i
        }
    }()
    
    return out
}

func main() {
    // 数値を生成するチャネルを取得
    ch := generateNumbers(5)
    
    // チャネルからすべての値を受信
    for num := range ch {
        fmt.Println("受信した数値:", num)
    }
    
    fmt.Println("チャネルが閉じられました")
}

この例では、generateNumbers関数が整数を生成するgoroutineを起動し、チャネルを返します。メイン関数ではrangeループを使用してチャネルからすべての値を受信しています。チャネルが閉じられると、ループは自動的に終了します。

チャネルの方向制限

関数のシグネチャでチャネルの方向を制限することで、意図しない使用を防ぎ、コードの安全性を高めることができます:

// 受信専用チャネルを返す関数
func producer() <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for i := 0; i < 5; i++ {
            ch <- i
        }
    }()
    return ch
}

// 送信専用チャネルを引数に取る関数
func consumer(in <-chan int, done chan<- bool) {
    for num := range in {
        fmt.Println("処理:", num)
    }
    done <- true
}

func main() {
    numbers := producer()
    done := make(chan bool)
    
    go consumer(numbers, done)
    
    <-done // 処理完了を待つ
    fmt.Println("すべての処理が完了しました")
}

この例では、producer関数は<-chan int(受信専用チャネル)を返し、consumer関数は<-chan int(受信専用)とchan<- bool(送信専用)チャネルを引数に取ります。この方向制限により、関数内でチャネルを誤って使用することを防ぎます。

並行処理のパターン:Worker Pool実装のベストプラクティス

並行処理の代表的なパターンの一つがWorker Poolです。これは、一定数のワーカー(goroutine)を事前に作成し、タスクをそれらのワーカーに分配するパターンです。このアプローチにより、大量のタスクを効率的に処理しながら、リソース使用量を制御することができます。

基本的なWorker Poolの実装

以下は、シンプルなWorker Pool実装の例です:

package main

import (
    "fmt"
    "sync"
    "time"
)

// タスクのタイプを定義
type Task struct {
    ID      int
    JobType string
    Data    interface{}
}

// Worker Poolを実装
func WorkerPool(numWorkers int, tasks <-chan Task, results chan<- string) {
    var wg sync.WaitGroup
    
    // 指定された数のワーカーを起動
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, tasks, results, &wg)
    }
    
    wg.Wait()        // すべてのワーカーの完了を待つ
    close(results)   // 結果チャネルを閉じる
}

// 個々のワーカーの実装
func worker(id int, tasks <-chan Task, results chan<- string, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for task := range tasks {
        fmt.Printf("Worker %d: タスク %d (%s) の処理を開始\n", id, task.ID, task.JobType)
        
        // タスクの処理(ここでは単純なスリープで模擬)
        time.Sleep(100 * time.Millisecond)
        
        // 結果を送信
        result := fmt.Sprintf("タスク %d の結果: 成功", task.ID)
        results <- result
        
        fmt.Printf("Worker %d: タスク %d の処理を完了\n", id, task.ID)
    }
}

func main() {
    // タスクと結果のチャネルを作成
    taskQueue := make(chan Task, 100)
    resultQueue := make(chan string, 100)
    
    // バックグラウンドでWorker Poolを起動
    go WorkerPool(5, taskQueue, resultQueue)
    
    // タスクをキューに追加
    for i := 1; i <= 20; i++ {
        taskQueue <- Task{
            ID:      i,
            JobType: fmt.Sprintf("JobType-%d", i%4),
            Data:    fmt.Sprintf("Data-%d", i),
        }
    }
    close(taskQueue) // これ以上タスクがないことを示す
    
    // すべての結果を収集
    for result := range resultQueue {
        fmt.Println("結果:", result)
    }
}

この実装では、タスクチャネルとレスポンスチャネルを作成し、指定数のワーカーgoroutineを起動します。各ワーカーはタスクチャネルからタスクを取得し、処理後に結果をレスポンスチャネルに送信します。

Fan-OutとFan-Inパターン

より複雑な処理フローでは、「Fan-Out, Fan-In」パターンが有効です。これは、一つの入力ストリームを複数のワーカーに分散(Fan-Out)し、それらの結果を一つのストリームに集約(Fan-In)するパターンです:

package main

import (
    "fmt"
    "sync"
)

// 入力を生成する関数
func generateItems(count int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := 1; i <= count; i++ {
            out <- i
        }
    }()
    return out
}

// 各アイテムを処理するワーカー関数(Fan-Out部分)
func processItem(in <-chan int) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for n := range in {
            // 何らかの処理を行う
            result := fmt.Sprintf("処理結果 %d: %d^2 = %d", n, n, n*n)
            out <- result
        }
    }()
    return out
}

// 複数のチャネルからの結果を一つにマージ(Fan-In部分)
func mergeResults(cs ...<-chan string) <-chan string {
    var wg sync.WaitGroup
    out := make(chan string)
    
    // 各入力チャネルからの出力を単一のチャネルにコピー
    output := func(c <-chan string) {
        defer wg.Done()
        for n := range c {
            out <- n
        }
    }
    
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }
    
    // すべての出力処理が完了したらチャネルをクローズ
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    // 入力データを生成
    items := generateItems(10)
    
    // 複数のワーカーに分散処理させる(Fan-Out)
    numWorkers := 4
    workers := make([]<-chan string, numWorkers)
    for i := 0; i < numWorkers; i++ {
        workers[i] = processItem(items)
    }
    
    // 結果をマージする(Fan-In)
    results := mergeResults(workers...)
    
    // 結果を表示
    for result := range results {
        fmt.Println(result)
    }
}

このパターンは、CPUバウンドな作業を複数のCPUコアに分散したり、I/O操作を並行して行うのに効果的です。

Pipeline(パイプライン)パターン

複数の処理段階を連結する「パイプライン」パターンも、データ処理タスクでよく使われます:

package main

import (
    "fmt"
    "strings"
)

// パイプラインの最初のステージ:数値を生成
func generate(count int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := 1; i <= count; i++ {
            out <- i
        }
    }()
    return out
}

// 2番目のステージ:数値を2倍にする
func double(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * 2
        }
    }()
    return out
}

// 3番目のステージ:数値を文字列に変換
func stringify(in <-chan int) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        for n := range in {
            out <- fmt.Sprintf("%d", n)
        }
    }()
    return out
}

// 最後のステージ:文字列を結合
func concatenate(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        defer close(out)
        var result strings.Builder
        for s := range in {
            result.WriteString(s + ", ")
        }
        out <- result.String()
    }()
    return out
}

func main() {
    // パイプラインを構築
    numbers := generate(5)
    doubled := double(numbers)
    stringified := stringify(doubled)
    result := concatenate(stringified)
    
    // 結果を表示
    fmt.Println(<-result) // "2, 4, 6, 8, 10, "
}

このパターンでは、各ステージが独立したgoroutineで実行され、チャネルを通じてデータが次のステージに渡されます。これにより、各ステージが並行して実行され、データが流れるように処理されます。

タイムアウトの処理

Worker Poolでよくある課題の一つは、タイムアウト処理です。以下の例では、タスク処理にタイムアウトを設定する方法を示しています:

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context, id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d: 処理開始 %d\n", id, job)
        
        // タイムアウト付きの処理
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d: タスク %d はコンテキストキャンセルにより中断\n", id, job)
            return
        case <-time.After(time.Millisecond * time.Duration(job * 100)):
            // 処理完了
            fmt.Printf("Worker %d: 処理完了 %d\n", id, job)
            results <- job * 2
        }
    }
}

func main() {
    // タイムアウト付きコンテキストを作成
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()
    
    jobs := make(chan int, 10)
    results := make(chan int, 10)
    
    // ワーカーを起動
    for w := 1; w <= 3; w++ {
        go worker(ctx, w, jobs, results)
    }
    
    // タスクを送信
    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 結果を収集(タイムアウトまでに完了した分のみ)
    for a := 1; a <= 9; a++ {
        select {
        case <-ctx.Done():
            fmt.Println("処理がタイムアウトしました")
            return
        case result := <-results:
            fmt.Println("結果:", result)
        }
    }
}

この例では、context.WithTimeoutを使用して、1秒後にタイムアウトするコンテキストを作成しています。各ワーカーはこのコンテキストをモニタリングし、キャンセルされた場合は処理を中断します。

エラーハンドリングと並行処理におけるデッドロック回避

並行処理を実装する際に最も注意すべき点の一つが、エラーハンドリングとデッドロック(複数のgoroutineが互いに待ち合わせて処理が進まない状態)の回避です。

エラーハンドリングのベストプラクティス

goroutineでエラーが発生した場合、メイン処理にそれを適切に伝える必要があります。一般的なアプローチとしては、エラー情報をチャネルで返す方法があります:

package main

import (
    "errors"
    "fmt"
    "time"
)

// 結果とエラーを含む構造体
type Result struct {
    Value int
    Err   error
}

func doWork(id int) <-chan Result {
    resultCh := make(chan Result)
    
    go func() {
        defer close(resultCh)
        
        // 何らかの処理
        time.Sleep(100 * time.Millisecond)
        
        // エラー発生シミュレーション(IDが偶数の場合)
        if id%2 == 0 {
            resultCh <- Result{
                Err: errors.New(fmt.Sprintf("処理 %d でエラーが発生しました", id)),
            }
            return
        }
        
        // 正常処理
        resultCh <- Result{
            Value: id * 10,
            Err:   nil,
        }
    }()
    
    return resultCh
}

func main() {
    // 複数のタスクを実行
    tasks := []int{1, 2, 3, 4, 5}
    results := make([]<-chan Result, len(tasks))
    
    for i, task := range tasks {
        results[i] = doWork(task)
    }
    
    // すべての結果を処理
    for i, resCh := range results {
        res := <-resCh
        if res.Err != nil {
            fmt.Printf("タスク %d でエラー: %v\n", tasks[i], res.Err)
        } else {
            fmt.Printf("タスク %d の結果: %d\n", tasks[i], res.Value)
        }
    }
}

この例では、各goroutineがエラーの有無を含む結果構造体をチャネルで返します。メイン処理は、それぞれのチャネルから結果を取得し、エラーがあれば適切に処理します。

errgroup パッケージの活用

標準ライブラリのgolang.org/x/sync/errgroupパッケージを使うと、複数のgoroutineのエラーハンドリングがより簡潔になります:

package main

import (
    "context"
    "fmt"
    "time"
    
    "golang.org/x/sync/errgroup"
)

func main() {
    // コンテキストとエラーグループを作成
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    g, ctx := errgroup.WithContext(ctx)
    
    // 複数のgoroutineを起動
    for i := 1; i <= 5; i++ {
        id := i // ループ変数をキャプチャ
        
        g.Go(func() error {
            return processTask(ctx, id)
        })
    }
    
    // すべてのgoroutineが終了するのを待ち、最初のエラーがあればそれを取得
    if err := g.Wait(); err != nil {
        fmt.Printf("エラーが発生しました: %v\n", err)
    } else {
        fmt.Println("すべてのタスクが正常に完了しました")
    }
}

func processTask(ctx context.Context, id int) error {
    fmt.Printf("タスク %d を処理中...\n", id)
    
    // 処理時間をシミュレート
    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-time.After(time.Millisecond * time.Duration(id * 300)):
        // ID 4のタスクでエラーを発生させる
        if id == 4 {
            return fmt.Errorf("タスク %d で意図的なエラー", id)
        }
        fmt.Printf("タスク %d が完了しました\n", id)
        return nil
    }
}

errgroupパッケージは、複数のgoroutineのエラーを管理し、最初のエラーが発生した時点で他のgoroutineをキャンセルすることができます。これは、「最初のエラーで失敗」というポリシーを実装する場合に便利です。

デッドロックの回避

デッドロックはチャネル操作で特に注意が必要です。以下はデッドロックを避けるためのいくつかのベストプラクティスです:

  1. チャネルのバッファサイズの適切な設定:送信側が多く受信側が少ない場合は、十分なバッファを確保します。
// バッファなしのチャネル - 送信と受信が同期化
unbufferedCh := make(chan int)

// バッファありのチャネル - 指定したサイズまでブロックしない
bufferedCh := make(chan int, 10)
  1. タイムアウトの設定:チャネル操作が永遠にブロックすることを防ぎます。
select {
case data := <-ch:
    // データを処理
case <-time.After(500 * time.Millisecond):
    // タイムアウト処理
}
  1. チャネルの明示的なクローズ:送信が完了したらチャネルを閉じることで、受信側が無限に待機することを防ぎます。
func producer(ch chan<- int) {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch) // これ以上データがないことを示す
}
  1. default節を使ったノンブロッキング操作:select文でdefault節を使うと、チャネル操作がブロックされない実装が可能です。
select {
case data := <-ch:
    // データを処理
default:
    // データがなければすぐに次の処理へ
}

Race Conditionの検出

Goには、ランタイム時のレースコンディション(複数のgoroutineが同じメモリに同時にアクセスする競合状態)を検出する強力なツールが組み込まれています:

go run -race main.go  # Race Detectorを有効にして実行
go test -race ./...   # テスト実行時にRace Detectorを有効化

以下は、Race Detectorによって検出される典型的な問題の例です:

package main

import (
    "fmt"
    "sync"
)

func main() {
    counter := 0
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // データ競合が発生!
        }()
    }
    
    wg.Wait()
    fmt.Println("Counter:", counter)
}

上記のコードでは、複数のgoroutineが同時にcounter変数にアクセスし、Race Conditionが発生します。これを修正するには、sync.Mutexを使用して排他制御を行います:

package main

import (
    "fmt"
    "sync"
)

func main() {
    counter := 0
    var wg sync.WaitGroup
    var mu sync.Mutex
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            counter++ // 排他制御されているので安全
            mu.Unlock()
        }()
    }
    
    wg.Wait()
    fmt.Println("Counter:", counter)
}

コンテキストを使った並行処理の制御とタイムアウト管理

Goの標準ライブラリには、処理のキャンセルや期限設定、値の伝播などを行うためのcontextパッケージが用意されています。このパッケージは、特に複数のgoroutineにまたがる処理フローの制御に役立ちます。

コンテキストの基本

コンテキストは、リクエストのスコープに関連するデータを保持し、キャンセルシグナルを伝播するためのメカニズムを提供します:

package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    // バックグラウンドコンテキストを作成
    rootCtx := context.Background()
    
    // タイムアウト付きのコンテキスト
    ctx, cancel := context.WithTimeout(rootCtx, 2*time.Second)
    defer cancel() // リソースリークを防ぐためのキャンセル
    
    // 長時間実行される処理をgoroutineで実行
    go doSlowOperation(ctx)
    
    // メイン処理は続行
    fmt.Println("メイン処理がコンテキストの終了を待機しています...")
    <-ctx.Done() // コンテキストがキャンセルまたはタイムアウトするまで待機
    fmt.Println("コンテキストが終了しました:", ctx.Err())
}

func doSlowOperation(ctx context.Context) {
    select {
    case <-time.After(3 * time.Second):
        fmt.Println("処理が完了しました") // このケースは実行されない
    case <-ctx.Done():
        fmt.Println("処理が中断されました:", ctx.Err())
    }
}

この例では、2秒のタイムアウトを持つコンテキストを作成し、3秒かかる処理を実行しています。タイムアウトが先に発生するため、処理は中断されます。

コンテキストの種類

contextパッケージは、以下のようなコンテキスト作成関数を提供しています:

  1. WithCancel:明示的にキャンセルできるコンテキスト
  2. WithDeadline:指定した時刻にキャンセルされるコンテキスト
  3. WithTimeout:指定した時間が経過するとキャンセルされるコンテキスト
  4. WithValue:キーと値のペアを保持するコンテキスト
// キャンセル可能なコンテキスト
ctx, cancel := context.WithCancel(context.Background())

// 期限付きのコンテキスト
deadline := time.Now().Add(5 * time.Minute)
ctx, cancel := context.WithDeadline(context.Background(), deadline)

// タイムアウト付きのコンテキスト
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)

// 値を持つコンテキスト
ctx := context.WithValue(parentCtx, "user_id", "12345")

HTTPサーバーでのコンテキスト活用

コンテキストは、HTTPリクエストの処理においても特に役立ちます:

package main

import (
    "context"
    "fmt"
    "net/http"
    "time"
)

func slowHandler(w http.ResponseWriter, r *http.Request) {
    // リクエストからコンテキストを取得
    ctx := r.Context()
    
    // 長時間の処理をシミュレート
    select {
    case <-time.After(2 * time.Second):
        fmt.Fprintln(w, "処理が完了しました")
    case <-ctx.Done():
        // クライアントが接続を切断した場合など
        fmt.Println("処理がキャンセルされました:", ctx.Err())
        http.Error(w, "処理がキャンセルされました", http.StatusInternalServerError)
    }
}

func main() {
    http.HandleFunc("/slow", slowHandler)
    http.ListenAndServe(":8080", nil)
}

この例では、HTTPリクエストのコンテキストを使用して、クライアントが接続を切断した場合などに処理をキャンセルできるようにしています。

複数のgoroutineにコンテキストを伝播する

コンテキストは、親から子に伝播させることで、複数のgoroutineにまたがる処理フローを制御できます:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 親ワーカー関数
func parentWorker(ctx context.Context) {
    // キャンセル可能な子コンテキストを作成
    childCtx, cancel := context.WithCancel(ctx)
    defer cancel() // 親の終了時に子をキャンセル
    
    var wg sync.WaitGroup
    
    // 複数の子ワーカーを起動
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go childWorker(childCtx, i, &wg)
    }
    
    // 親の処理(例:3秒後にキャンセル)
    select {
    case <-time.After(3 * time.Second):
        fmt.Println("親ワーカーの処理が完了しました")
    case <-ctx.Done():
        fmt.Println("親ワーカーがキャンセルされました:", ctx.Err())
    }
    
    // 子ワーカーの完了を待機
    wg.Wait()
    fmt.Println("すべての子ワーカーが完了しました")
}

// 子ワーカー関数
func childWorker(ctx context.Context, id int, wg *sync.WaitGroup) {
    defer wg.Done()
    
    // 子の処理(最大5秒)
    select {
    case <-time.After(time.Duration(id) * 2 * time.Second):
        fmt.Printf("子ワーカー %d の処理が完了しました\n", id)
    case <-ctx.Done():
        fmt.Printf("子ワーカー %d がキャンセルされました: %v\n", id, ctx.Err())
    }
}

func main() {
    // ルートコンテキストを作成
    ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
    defer cancel()
    
    // 親ワーカーを実行
    parentWorker(ctx)
}

この例では、親コンテキストがタイムアウトすると、すべての子コンテキストも連鎖的にキャンセルされます。

コンテキストの使用上の注意点

コンテキストを効果的に使用するためのベストプラクティスは以下の通りです:

  1. 常にDoneチャネルをモニタリングする:長時間実行される処理では、キャンセルシグナルを適切に処理する必要があります。

  2. キャンセル関数を適切に呼び出すcontext.WithCancelWithDeadlineWithTimeoutから返されるキャンセル関数は、リソースリークを防ぐために必ず呼び出す必要があります。

  3. コンテキストを関数の最初の引数として渡す:標準ライブラリのような一貫性を保つため、コンテキストは関数の最初の引数として渡すことが推奨されています。

  4. コンテキストの値の使用は最小限にWithValueで渡す値は、リクエストスコープのメタデータに限定し、依存関係注入のために使用しないほうが良いです。

// 良い例
func ProcessRequest(ctx context.Context, req *Request) (*Response, error) {
    // ctx.Doneをモニタリング
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
        // 通常の処理
    }
    
    // リクエストIDを取得(必要な場合のみ)
    if reqID, ok := ctx.Value("requestID").(string); ok {
        // リクエストIDを使用
    }
    
    return &Response{}, nil
}

まとめ:実務で活かせるGolangの並行処理テクニック

Golangの並行処理は、その簡潔さと効率性から、多くの開発者に支持されています。本記事では、goroutineとチャネルの基本から、実践的な並行処理パターン、エラーハンドリング、デッドロック回避、コンテキストによる制御まで幅広く解説しました。

実務でGolangの並行処理を効果的に活用するためのポイントをまとめると:

  1. goroutineの適切な管理:goroutineは軽量ですが、無制限に作成せず、sync.WaitGroupなどを使って適切に終了を管理しましょう。

  2. チャネルの正しい使用:バッファサイズの適切な設定、明示的なクローズ、方向制限などを活用し、安全なデータ共有を実現します。

  3. 並行処理パターンの活用:Worker Pool、Fan-Out/Fan-In、パイプラインなどのパターンを理解し、状況に応じて適切に使い分けることが重要です。

  4. エラーハンドリングとデッドロック回避:結果とエラーを含む構造体の使用やerrgroupパッケージの活用、タイムアウト設定などでロバストな並行処理を実現します。

  5. コンテキストによる処理制御contextパッケージを使って、タイムアウトやキャンセル処理を適切に実装しましょう。

上記の基本原則とパターンを理解し、適切に組み合わせることで、効率的で堅牢な並行プログラミングが可能になります。また、Race Detectorなどのツールを積極的に活用し、潜在的な問題を早期に発見することも重要です。

Golangの並行処理は、高いパフォーマンスが求められるバックエンドサービス、データ処理アプリケーション、WebサーバーなどのI/O集約型アプリケーションの開発に特に威力を発揮します。ぜひこの記事で紹介したテクニックを実際のプロジェクトで試してみてください。

おすすめコンテンツ