読者です 読者をやめる 読者になる 読者になる

yohhoyの日記(別館)

もうちょい長めの技術的メモをしていきたい日記

条件変数 Step-by-Step入門

C++ C POSIX Boost

多くのプログラミング言語では、マルチスレッド処理向け同期プリミティブとして「ミューテックス(mutex)」と「条件変数(condition variable)」を提供しています*1 *2ミューテックス排他制御機構として有名ですし、その動作もロック(lock)/アンロック(unlock)と比較的理解しやすいため、不適切に利用されるケースはあまり無いと思います*3。一方、条件変数の動作仕様はしばしば誤解され、不適切な利用による並行性バグに悩まされるケースが多いようです。

本記事では、スレッドセーフなFIFO(First-In-First-Out)キューを段階的に実装していく事例を通して、条件変数の適切な使い方について説明していきます。例示コードではC++11標準ライブラリを用いますが、Pthreads(POSIX Threads)やC11標準ライブラリへは単純に読み替え可能です(→Pthreads/C++11/C11対比表)。また、C++11標準ライブラリからBoost.Threadライブラリへの対応関係は自明なはずです。

本記事中のソースコードはBoost Software License 1.0で公開しています。

Pthreadsに馴染みがある人向けの補足:pthread_cond_signalnotify_onepthread_cond_broadcastnotify_allへと読み替えてください。

さっそく

次の仕様を満たす、スレッドセーフ・FIFOキューmt_queueを実装していきます。

クラス仕様
int型要素を最大10個まで格納可能なFIFO操作コンテナ。各メンバ関数を異なるスレッドから同時に呼び出してもよい(スレッドセーフ)。*4
push操作
キューに新しい要素を1つ追加する。キューに空き容量が無ければ、空きができるまで呼び出し元スレッドをブロッキングする。
pop操作
キューから最も古い要素を1つ取り出す。キュー内に要素が存在しなれば、要素が追加されるまで呼び出し元スレッドをブロッキングする。

とりあえずロジックだけ実装すると、下記のような感じでしょうか。

#include <queue>

class mt_queue {
  static const int capacity = 10;
  std::queue<int> q_;
public:
  void push(int data) {
    while (q_.size() == capacity)
      ;
    q_.push(data);
  }
  int pop() {
    while (q_.empty())
      ;
    int data = q_.front();
    q_.pop();
    return data;
  }
};

このコードはマルチスレッド動作について一切考慮していないため、異なるスレッドからpush()/pop()メンバ関数が同時に呼び出されると、データ競合(data race)により正常に動作しません(データ競合についてはこちらの記事も参照下さい)。偶然に正常動作する可能性もゼロではありませんが、本質的に“壊れた”コードであると認識すべきです。マルチスレッド処理プログラミングでは並行処理設計が何よりも重要です。1万回に1回といった低頻度でのプログラムクラッシュ、システム高負荷状態でのみ発生するデッドロックデバッグビルドは問題無いのにリリースビルドだと処理ストールといった、ステキなデバッグ体験をしたいのでない限り、“とりあえず動けばOK”という方針はお勧めできません。

Step0: ミューテックス+ビジーループ

マルチスレッド処理プログラミングでは、ミューテックスによる排他制御が全ての基本となります。複数スレッドから同時更新される可能性のある変数アクセスは、ミューテックスによるロックで全て保護する必要があります。まずは、最も単純なビジーループ(busy-loop; busy-wait)方式で仕様通り実装してみましょう。[mt_queue0.cpp

// mt_queue0.cpp抜粋
#include <mutex>

class mt_queue {
  //...
  std::mutex mtx_;  // ★ミューテックスを導入
public:
  void push(int data) {
    std::unique_lock<std::mutex> lk(mtx_);  // ロック獲得
    while (q_.size() == capacity) {
      lk.unlock();  // ロック一時解放
      std::this_thread::yield();
      lk.lock();    // ロック再獲得
    }
    q_.push(data);
  }  // ロック解放
  int pop() {
    std::unique_lock<std::mutex> lk(mtx_);  // ロック獲得
    while (q_.empty()) {
      lk.unlock();  // ロック一時解放
      std::this_thread::yield();
      lk.lock();    // ロック再獲得
    }
    int data = q_.front();
    q_.pop();
    return data;
  }  // ロック解放
};

メンバ関数実装のwhileループ中で、ロック一時解放lk.unlock()→ロック再獲得lk.lock()を忘れずに記述しましたか?このロック一時解放を行わないと、push()/pop()の同時呼び出しでデッドロックが発生します。例えば、1)キューが空っぽのときpop()呼び出し→2)ロックを保持したままwhile (q_.empty())ループ→3)別スレッドのpush()呼び出し先頭で永久にロック獲得待ちという状況です。もう1パターン自明なデッドロックシナリオがありますが、こちらは練習問題として考えてみて下さい。ヒント:push/popの状況を逆転
また、ロック一時解放領域でyield()を呼び出していますが、この処理を省略してもmt_queueクラスは仕様通り動きます。処理系によってはyield()の有無で動作性能が改善するでしょうが、ビジーループがCPUリソースを浪費する非効率な方式であることに変わりはありません。(ビジーループが常に不適切ということではなく、OSカーネルやハードウェア制御ドライバなどの低レイヤ・プログラミングでは有用なケースもあります。ただし、通常のアプリケーションプログラミングでは基本的に避けるべきでしょう。)

Step1: 条件変数による待機/通知処理

それでは、さっそく条件変数を使っていきましょう。排他制御用のミューテックスはそのままに、新しく条件変数を追加することに注意してください。ここはよく誤解されるポイントなのですが、セマフォのようにそれ単体で機能する同期プリミティブと異なり、条件変数は必ずミューテックスと紐付けて利用しなければ意味をなしません。[mt_queue1.cpp

// mt_queue1.cpp抜粋
#include <mutex>
#include <condition_variable>
 
class mt_queue {
  //...
  std::mutex mtx_;
  std::condition_variable cv_;  // ★条件変数を追加
public:
  void push(int data) {
    std::unique_lock<std::mutex> lk(mtx_);
    while (q_.size() == capacity) {
      cv_.wait(lk);    // 要素数変更があるまで待機
    }
    q_.push(data);
    cv_.notify_all();  // 要素数変更を通知
  }
  int pop() {
    std::unique_lock<std::mutex> lk(mtx_);
    while (q_.empty()) {
      cv_.wait(lk);    // 要素数変更があるまで待機
    }
    int data = q_.front();
    q_.pop();
    cv_.notify_all();  // 要素数変更を通知
    return data;
  }
};

このコードでは条件変数オブジェクトcv_を、「キュー内の要素数に何らかの変更があった」イベントの待機/通知処理に利用しています。このときCPUリソース利用の観点からは、条件変数を用いた待機ループwhile (条件式) { cv_.wait(lk); }は、ビジーループの効率的な実装と解釈できます。つまりクラス利用者の視点からは、ビジーループ実装と条件変数実装とでFIFOキューの動作仕様に違いはなく、プログラム実行時の振る舞いだけが異なるものとなります。

// Step0(mt_queue0.cpp)
while (条件式) {
  lk.unlock();
  std::this_thread::yield();
  lk.lock();
}
// Step1(mt_queue1.cpp)
while (条件式) {
  cv_.wait(lk);
}

条件変数に対する待機cv_.wait(lk)は、内部的には“ロック一時解除lk.unlock()+条件変数へ通知があるまで自スレッドを休止+ロック再取得lk.lock()”処理を行います。ビジーループ版では他スレッドに実行機会のヒントを与える(yield())だけの処理が、条件変数版では論理的に意味のあるイベントが起きるまで、つまり明示的に条件変数への通知が行われるまで自スレッドを休止状態とし、CPUリソースを他スレッドへ明け渡す処理となります。*5

Step2: 実行時オーバーヘッドの削減

Step1では待機処理に条件変数を利用する事で、CPUリソースの浪費を抑えることができました。Step2では、より効率的な条件変数の利用について検討しましょう。ここでは下記4つを順番に適用していきます。

  1. イベント種別に応じた条件変数の分離
  2. 冗長な条件変数通知の削減
  3. 条件変数通知先スレッドの限定
  4. 条件変数待機を述語(Predicate)版に書き換え

Step2-1: 条件変数の分離

Step1では条件変数オブジェクトcv_を、「キュー内の要素数に何らかの変化があった」イベントに対応付けていました。この設計では、あらゆるキュー操作によって条件変数への通知が発生するため、必要のない冗長な通知処理が行われています。push操作は「キューが満杯でない」状態を/pop操作は「キューが空でない」状態を待機すれば良いので、両イベントに直接対応する条件変数に分離しましょう。新しい設計では1つのミューテックスmtx_に対して、2つの条件変数オブジェクトcv_nofull_, cv_noempty_を紐付けます。

// Step1(mt_queue1.cpp)
std::mutex mtx_;
std::condition_variable cv_;
// Step2-1
std::mutex mtx_;
std::condition_variable cv_nofull_;   // 満杯でなくなった
std::condition_variable cv_noempty_;  // 空キューでなくなった

条件変数の設計では、待機側でどんなデータ状態を満たすべきかに基づいて抽出することをお勧めします。

Step2-2: 冗長な条件変数通知の削減

push()実装の通知処理に着目すると、Step1では条件変数への通知を無条件に行っていました。本質的には、push操作では「キューが空でなくなった=少なくとも1個要素が存在する」イベントのみをpop操作へ通知すれば十分なため、通知の発火条件を限定することで無駄な処理を減らし、より効率的な実装とできます。同様にpop操作では「キューが満杯でなくなった=少なくとも1要素を追加できる」イベントのみを通知すれば十分です。

// Step1: push()メンバ関数
q_.push(data);
cv_.notify_all();  // 要素数変更を通知
// Step2-2: push()メンバ関数
bool do_signal = q_.empty();  // 操作前が空キューのときのみ
q_.push(data);
if (do_signal)
  cv_noempty_.notify_all();   // 「空キューでなくなった」通知

条件変数への通知処理は、待機側の条件式を満足するよう変更を行ったタイミングを抽出すると、実行時オーバーヘッドを最小化することができます。

Step2-3: 通知先スレッドの限定

Step1では条件変数への通知処理にnotify_all()(broadcast)を用いていました。空キューへの1回のpush操作によって、pop操作を待機中の全スレッドが起こされますが、実際に要素の取り出しに成功するのは1スレッドだけで、それ以外のpop操作要求スレッドは全て待機処理に再突入します。つまり、push操作が通知すべきpop操作待機スレッドは1つで十分であり、それ以上のスレッドへ通知が行われても冗長ということです。このようなケースではnotify_one()(signal)を利用することで、通知先スレッドを限定して実行効率の改善をはかることができます。

// Step1: push()メンバ関数
cv_.notify_all();  // 待機中の全スレッドへ通知
// Step2-3: push()メンバ関数
cv_noempty_.notify_one();  // pop()待機中の任意の1スレッドへ通知

このようなnotify_all()からnotify_one()への変更は、i)通知先が最大で1スレッド かつ ii)通知を受信するスレッドの待機条件を常に満たす場合 にのみ安全に行えます。本記事で設計したFIFOキューの場合、push操作からの通知を受信した任意のpop操作スレッドは、必ず待機ループを抜けてpop操作を完了すると保証できるため、notify_one()へと安全に変更できます。特に条件ii)を正しく考慮していないと、デッドロックといった並行性バグの原因となるため十分注意してください。*6

追記:notify_one()により生じるデッドロックについては、おまけ記事「条件変数とデッドロック・パズル(出題編)(解答編)」も参考にしてください。

Step2-4: 条件変数待機の書き換え

最後に、条件変数の待機処理wait()を述語(Predicate)をとる2引数オーバーロードに書き換えておきます。この変更は効率化とは無関係ですが、条件変数オブジェクトの名前(cv_nofull_, cv_noempty_)と、待機条件式(q_.size() < capacity, !q_.empty())の意味を一致させることができます。また後述Step3でFIFOキュー機能拡張を行うときに、待機条件の論理式を拡張しやすいというメリットもあります。

// Step1: push()メンバ関数
while (q_.size() == capacity) {
  cv_.wait(lk);
}
// Step1: pop()メンバ関数
while (q_.empty()) {
  cv_.wait(lk);
}
// Step2-4: push()メンバ関数
cv_nofull_.wait(lk, [&]{
  return (q_.size() < capacity);
});
// Step2-4: pop()メンバ関数
cv_noempty_.wait(lk, [&]{
  return !q_.empty();
});

Step2のまとめ

Step2での最終的なコードは次の通りです。[mt_queue2.cpp

// mt_queue2.cpp抜粋
class mt_queue {
  //...
  std::mutex mtx_;
  std::condition_variable cv_nofull_;   // 満杯でなくなった条件
  std::condition_variable cv_noempty_;  // 空キューでなくなった条件
public:
  void push(int data) {
    std::unique_lock<std::mutex> lk(mtx_);
    cv_nofull_.wait(lk, [&]{     // 「満杯でない」ことを待機
      return (q_.size() < capacity);
    });
    bool do_signal = q_.empty();
    q_.push(data);
    if (do_signal)
      cv_noempty_.notify_one();  // 「空キューでなくなった」通知
  }
  int pop() {
    std::unique_lock<std::mutex> lk(mtx_);
    cv_noempty_.wait(lk, [&]{   // 「空キューではない」ことを待機
      return !q_.empty();
    });
    bool do_signal = (q_.size() == capacity);
    int data = q_.front();
    q_.pop();
    if (do_signal)
      cv_nofull_.notify_one();  // 「満杯でなくなった」通知
    return data;
  }
};

さらなる改善策として、通知処理をミューテックスのロック解放後まで遅延させるという手法もあります。ただし、この変更によりコードが壊れるコーナーケースが存在すること、また処理系の実装品質が十分ならばあまり効果を期待できないため、本記事では適用しないこととします。詳細はこちらの記事を参考にしてください。

Step3: 提供機能の再考

Step2までで、性能面について十分考慮されたスレッドセーフ・FIFOキューを実装できました。Step3では機能面について改めて考えてみましょう。ここでは下記2点を検討します。

  1. データ列の終端処理
  2. 強制的な処理中断

Step3-1: データ列の終端処理

最初に設定したFIFOキュー仕様では、pushデータ列の終端処理について特別の考慮をしませんでした。いくぶん不恰好ですが、例えば-1を特殊な“データ終端マーカ”値とみなすことも出来るでしょう。

// mt_queueクラス利用コード
void producer_thread(mt_queue& mq) {
  for (int i = 1; i <= N; ++i)
    mq.push(i);
  mq.push(-1);  // データ終端を通知
}
void consumer_thread(mt_queue& mq) {
  int v;
  while ((v = mq.pop()) > 0)
    process_data(v);
}

生産者スレッド(producer_thread)と消費者スレッド(consumer_thread)が1つづつのSingle-Producer/Single-Consumerモデルならこの設計でも十分ですが、Multi-Producer/Multi-Consumerモデルでは面倒な問題が浮上します。例えば2生産者スレッド+3消費者スレッドで動作させた場合、FIFOキューには終端マーカが2個しかpushされないため、3番目の消費者スレッドはデータ終端到達を知ることが出来ません。生産者/消費者スレッド数が動的に変化する場合はもはやお手上げです。
結局、“終端マーカ”方式のようなクラス利用側コード設計に頼るのではなく、FIFOキュー自体に終端到達通知close()メンバ関数を追加するのが良いでしょう。これに伴って、push()は終端到達検知時に例外closed_queueを送出し、またpop()は終端到達状態を返すよう関数シグニチャを変更します。

class mt_queue {
  //...
  bool closed_ = false;  // 終端到達通知を保持
public:
  void close() {  // ★終端到達通知
    std::lock_guard<std::mutex> lk(mtx_);
    closed_ = true;
    cv_nofull_.notify_all();
    cv_noempty_.notify_all();
  }
  void push(int data) {
    std::unique_lock<std::mutex> lk(mtx_);
    cv_nofull_.wait(lk, [&]{
      return (q_.size() < capacity) || closed_;  // 条件追加
    });
    if (closed_)
      throw closed_queue();  // 例外送出
    //...
  }
  bool pop(int& data) {      // 関数シグニチャ変更
    std::unique_lock<std::mutex> lk(mtx_);
    cv_noempty_.wait(lk, [&]{
      return !q_.empty() || (q_.empty() && closed_);  // 条件追加
    });
    if (q_.empty() && closed_)
      return false;          // 終端到達を返す
    data = q_.front();
    //...
    return true;             // 終端ではない
  }
};

pop()実装の待機条件では... || (q_.empty() && closed_)、つまり“キューが空でかつ終端通知がされているとき”に始めてデータ終端と判定すべきです。またclose()実装では、全ての条件変数に対してnotify_all()通知が必要になります。

Step3-2: 強制的な処理中断

終端到達通知close()によって通常のデータ終端処理を表現できるようになりましたが、ブロッキング中のpush/pop操作に対する処理中断要求abort()があるとより実用的になります。push()/pop()では中断要求検知時に例外abort_exceptionを送出するよう拡張します。なおclose()ブロッキング操作ではないため、中断すべき処理をもちません。*7

class mt_queue {
  //...
  bool aborted_ = false;  // 処理中断要求を保持
public:
  void abort() {  // ★処理中断要求
    std::lock_guard<std::mutex> lk(mtx_);
    aborted_ = true;
    cv_nofull_.notify_all();
    cv_noempty_.notify_all();
  }
  void push(int data) {
    std::unique_lock<std::mutex> lk(mtx_);
    cv_nofull_.wait(lk, [&]{
      return (q_.size() < capacity) || closed_ || aborted_;  // 条件追加
    });
    if (closed_)
      throw closed_queue();
    if (aborted_)
      throw abort_exception();  // 例外送出
    q_.push(data);
    //...
  }
  //...
};

push()実装の待機条件式(q_.size() < capacity) || closed_ || aborted_と、後続の条件分岐式とを比べてみてください。下記の対応関係を読み取れたでしょうか?1つの条件変数オブジェクトに複合的な条件を設定する場合、この構造が基本形となるはずです。

条件変数.wait(lk, [&]{
  return 条件A || 条件B || 条件C;
}):
if (条件A) { 処理A; }
if (条件B) { 処理B; }
if (条件C) { 処理C; }

ゴール!

本記事で実装したスレッドセーフ・FIFOキューの最終コードは次の通りです。[mt_queue3.cpp

// mt_queue3抜粋
struct closed_queue : std::exception {};
struct abort_exception : std::exception {};

class mt_queue {
  static const int capacity = 10;
  std::queue<int> q_;
  std::mutex mtx_;
  std::condition_variable cv_nofull_;
  std::condition_variable cv_noempty_;
  bool closed_ = false;
  bool aborted_ = false;
public:
  void push(int data)
  {
    std::unique_lock<std::mutex> lk(mtx_);
    cv_nofull_.wait(lk, [&]{
      return (q_.size() < capacity) || closed_ || aborted_;
    });
    if (closed_)
      throw closed_queue();
    if (aborted_)
      throw abort_exception();
    bool do_signal = q_.empty();
    q_.push(data);
    if (do_signal)
      cv_noempty_.notify_one();
  }
  bool pop(int& data)
  {
    std::unique_lock<std::mutex> lk(mtx_);
    cv_noempty_.wait(lk, [&]{
      return !q_.empty() || (q_.empty() && closed_) || aborted_;
    });
    if (q_.empty() && closed_)
      return false;
    if (aborted_)
      throw abort_exception();
    bool do_signal = (q_.size() == capacity);
    data = q_.front();
    q_.pop();
    if (do_signal)
      cv_nofull_.notify_one();
    return true;
  }
  void close()
  {
    std::lock_guard<std::mutex> lk(mtx_);
    closed_ = true;
    cv_nofull_.notify_all();
    cv_noempty_.notify_all();
  }
  void abort()
  {
    std::lock_guard<std::mutex> lk(mtx_);
    aborted_ = true;
    cv_nofull_.notify_all();
    cv_noempty_.notify_all();
  }
};

これでもう条件変数は怖くありません…よね?

*1:スレッド間での通知/待機を行う同期プリミティブとしては、A)「セマフォ(semaphore)」に代表されるイベント変数方式、B)ミューテックス+条件変数ペアによる方式があり、これらを提供する/しないはライブラリ設計ポリシーに依存します。ただし、比較的新しいライブラリ仕様では方式B)条件変数のみを提供する事が多いようです。例えばBoost.Threadライブラリではその設計論拠に基づき、セマフォ同期プリミティブを提供しません。Boost.Threadをベースに標準化されたC++11標準ライブラリも、同様に条件変数のみを提供します。なお、セマフォを用いて条件変数を実装する/条件変数を用いてセマフォを実装することが可能なため、両同期プリミティブは本質的には互換性があるといえます。

*2:スレッド間の排他制御は2値セマフォ(binary semapohore)を用いても実現可能ですが、一般的にミューテックスではスレッド間“排他”処理という目的に合わせて、スレッドによる“ロックの所有”という概念が導入されます。このため、あるスレッドがミューテックスのロック処理(所有権を獲得)を行った場合、必ず同じスレッドでアンロック処理(ロック所有を解放)を行う必要があります。一方でセマフォには所有権という概念は無く、制約のあるミューテックスよりも便利に思えるかもしれません。しかし、特にマルチスレッド処理プログラミングにおいては、バグ検知可能性や実行時効率などの観点から、セマフォのように自由度の高い原始的な機構よりも、ミューテックスのように目的に特化した専用機構を用いるべきです。

*3:ミューテックスの不適切な利用の一例として、“ロック獲得のFIFO動作を期待したスレッド・スケジューリング”が挙げられます。これは、複数スレッドから同時にロック獲得要求を行い、ロック獲得成功順序がFIFO動作であると仮定し、各スレッドが順次実行されることを期待した処理を指します。通常、ミューテックスのロック獲得はFIFO順を保証しないため(unfair)、このような利用は適切ではありません。

*4:オブジェクト破棄処理をスレッドセーフとするには大きな制限事項を伴うため、通常、スレッドセーフの議論からはクラスのデストラクタを除外します。つまりオブジェクト破棄時には、他のあらゆるメンバ関数と同時に呼び出さないことを、クラス利用側が保証すべきです。

*5:条件変数の動作仕様では“Spurious Wakeup”が起こりうると明記されており、誰も通知を行っていないのに待機が解除されることがあります。本記事のようにビジーループを起点に考えた場合は、既にループ構造+条件変数の待機処理となっており、Spurious Wakeupも考慮済みとなるため、本文中では特に言及しません。詳細は条件変数とspurious wakeupを参照ください。

*6:具体例:片方は奇数のみを/もう一方は偶数のみを取り出す選択的pop操作に対して、1)pop要求で2スレッドとも待機→2)偶数値をpushし任意1スレッドへ通知→3)奇数待機スレッドが通知受信して再待機→4)偶数待機スレッドは休眠状態のまま。

*7:close処理の内部実装では、ロック獲得のために呼び出しスレッドがブロッキングされる場合があります。この排他制御FIFOキューのデータ一貫性を維持するため機構であり、いずれのスレッドも有限ステップ内にロック獲得することが保証できます。このため、クラス利用者視点ではcloseメンバ関数はノンブロッキング操作とみなせます。