メインコンテンツに移動

C++MPI

C++でMPI

ここではMPIを用いたプログラムの書き方を説明しています. クラスターでMPIプログラムを実行する方法はこちらを参照してください.

MPIは複数のプロセス(実行中のプログラム)で通信を行う「メッセージ・パッシング」の仕組みです.適切な通信を行えば,並列計算が可能になります.例えば A[i]のi=0,1,2,...,Nの合計を計算する場合,

  1. A[0],...A[N]がプロセス0に存在する.
  2. プロセス0は, A[N/2+1],...A[N]をプロセス1宛の封筒に入れ郵送する.
  3. プロセス1は, 到着した封筒を開封し, A[N/2+1],...A[N]を得る. 
  4. プロセス0はA[0],...,A[N/2]の合計を計算し,これと並列に,プロセス1はA[N/2+1],...,A[N]の合計を計算する
  5. プロセス1は合計をプロセス0宛の封筒に入れ郵送する.
  6. プロセス0は,到着した封筒を開封し, 自分の合計に加える.これでプロセス0は合計値を取得する.
  7. 必要であれば,プロセス0は合計値をプロセス1に郵送する.

となります.この例から分かるように,メッセージ・パッシングでは

  • 複数のプロセスを整数値(rankと呼ぶ)で識別する.
  • プロセス同士が,送信と受信をタイミングよく行って通信を行う.
  • タイミングを崩すと,誤動作する(上の例では, 2の封筒と7の封筒を取り違えると,間違った結果になる)
  • 郵送タイミングが合ってさえいれば,プロセス同士が全く違う作業を行っていても問題はない.
  • プロセスは別のコンピュータで実行してても良いし,地球の裏側で動作していても(郵便が届くのが遅い以外は)問題ない.
  • プロセスのどれかがエラーすると,他のプロセスが郵便の到着を待ち続けてしまう

といった特徴があります.メッセージ・パッシンングを実現するには

  • 古生代のパラレル仮想マシン(PVM)
  • 現代のメッセージ・パッシンング・インターフェース(MPI)

ソフトを利用することが多いです.さらにC++では, MPIの難解なナンジャコリャ仕様を使いやすくした

が利用可能です.ここではBoost.MPIの使い方を説明します.

Boost.MPIの特徴

Boost.MPIは, 既存のMPIライブラリーにC++用の皮を被せたもので,

  • 初期化や終了が無意識にできて楽チン
  • 型宣言や転送する配列の数は自動になって楽チン
  • 自分のクラス型変数や,その配列を自動で(場合によっては最適化付きで)一発転送で楽チン

という特徴を有します.

Boost.MPIを利用する

以下では, Boost.MPIの使い方を練習しましょう. 面倒くさい人は,出来合いのプロジェクトをここからダウンロードしてください.

MPIの利用設定

MPIを利用するには, OSによって設定の必要があります. OSXの場合はこちら. Linuxの場合はこちら

MPIの起動と終了

MPIを利用するプログラムの簡単な形は

#include <iostream>
#include <boost/mpi.hpp>
int main(int argc, const char * argv[]) {
 boost::mpi::environment env;
 boost::mpi::communicator com;

 std::cout << "Hello, Boost.MPI CPU-" << com.rank() << std::endl;
 return 0;
}

これで起動と終了が実現します.communicatorオブジェクトの生成と消滅は,実行ファイルで1回だけにしてください.

とはいえ,通常のC++プログラムでmain()に細かい作業を記述することはありませんよね. 例えばCaseクラスを作成し, 

  • Caseの初期化はコンストラクタで行う
  • Caseの終了はデストラクタで行う
  • データロード作業 Case::Load()
  • 1ステップ作業 Case::Do()
  • データ保存作業 Case::Save()

といった風にプログラムするのが常識的です. この場合は

class Case {
  boost::mpi::communicator& com;
public:
  Case(boost::mpi::communicator& given_com):com(given_com){//初期化処理
    std::cout << "Hello, Boost.MPI CPU-" << com.rank() << std::endl;
  }
};
int main(int argc, const char * argv[]) {
  boost::mpi::environment env;
  boost::mpi::communicator com;
  Case myCase(com);//ある計算
  Case hisCase(com);//別の計算(必要なら)
  std::vector<Case> Cases;//計算の配列(必要なら)
 Cases.emplace_back(com);
}

こんな感じでしょうか.

MPIで通信

実際にrank=dst_rank番CPUへのデータ通信を行ってみましょう. 通信を行うには, 通信の配列

std::vector<boost::mpi::request> requests;

を用意し, そこに通信を追加していきます. 配列データ std::vector<double> Vをdst_rankに送信する場合

int dst_rank=0;//CPU-0に送信
int my_tag=123;//123番通信
requests.push_back(com.isend(dst_rank,my_tag,V.data(),V.size()));

なお, MPIでNプロセスで併走する場合, rankは0からN-1までの整数値を取ります. プログラム中でNが必要になる場合には, com.size()で取得できます. MPIでは送信データを整数で区別することができます(例えば 0が流速で1が密度,といった風に).上の例では, これがmy_tagです. isend()は, 送信データのコピーをとったら終了します. 実際の送信作業はバックグラウンドで行われますので, 送信が終了するまでは, なにか別の計算でもして暇つぶしをしておいてください.

これに対応する受信を開始するには, 受信側のCPUで次のコードを実行します:

int src_rank=1;//CPU-1から受信
int my_tag=123;//123番通信
std::vector<double> V; V.resize(20);//配列要素数20個まで受信できるように準備
requests.push_back(com.irecv(src_rank,my_tag,V.data(),V.size()));
  • boost Version 1.69のBoost.MPIの仕様では, isend(dst_rank,my_tag,V), irecv(src_rank,my_tag,V) で十分なはずですが, まだ未完成なようです.
  • 送信データよりも受信の配列数が小さいとエラーします. 受信側で要素数を与えていますが, その数だけ受信する丁寧な対応は無しです.
  • std::vectorなどの長さも込めた自動転送は, のちに述べるSerializationを使うと実現できます.
  • 上の例では倍精度浮動小数点データの配列を通信しまたしが, 配列でないなら要素数は不要です:

    requests.push_back(com.isend(dst_rank,my_tag,V));

irecv()は受信準備ができると終了します. 実際の受信は, 次に説明するwait()系命令までは実施されません. 送信CPUやTAGを指定して受信していることに注意してください. これらが送信側の実際と異なる場合, 受信はおこなわれません . 自分が作ったクラスのオブジェクトであるとか, その配列であるとか, でも同様に通信できます.ただし, 自分の作ったクラスの場合,後で述べるSerializationに対応する必要があります.

送信側でも受信側でも, 開始したリクエストは終了しなければなりません. 特に受信は,実際の作業はリクエストの終了時に行われます. 例えばこれは次のように行います:

auto DONE=boost::mpi::wait_any(requests.begin(), requests.end());

この例では, requestsの何れかが終了すると, その通信情報がDONEに代入されます. 通信が全部終わるのを待つには, 例えば

while (requests.size()>0) {//通信が全部終わるまでループ湯
  auto DONE=boost::mpi::wait_any(requests.begin(), requests.end());
  auto ST=DONE.first;//ステータスを取得しても良い
  requests.erase(DONE.second);//すんだ通信は削除しとかないとダメ
  auto src_rank=ST.source();//送信者を特定しても良い
  if (src_rank<0) continue;//送信リクエストが完了した場合は, 他にすることがねーよ
  auto tag=ST.tag();//タグを特定しても良い. もちろん, しなくてもいいんだが
  boost::optional<int> count=ST.count<double>();//もし欲しければ受信データ数を得る
  if (!count) throw(std::exception("おお・・・数ではないぞ"));
  std::cout << count << "こ,ついたよ♪" << std::endl;
}

この例の boost::optional を利用するには, #include <boost/optional/optional_io.hpp> が必要です. wait系命令が終了した時には, irecv()で指定した受信変数にデータが入っています.

Serialization: お前のクラスをそのまんまMPIに突っ込め

あなたの送るべきデータがクラスのメンバー変数である場合, メンバーをいちいちsend/recvしていたらプログラムを書くのが大変です. そこで, クラスのメンバー変数を一発で送りましょう. これはSerializationで可能になります. 例えばあんたのクラスが次の形であったとします:

class Data {
private:
   ふにゃふにゃ
public:
 int id;
   double p;
   double T;
   double v[3];
  もごもご
};

boost::serialization::accessとおともだちになり, ひみつのガジェット「serialize()」を身につけると, メンバー変数id, p, T, v[3]を一発で転送できるようになります. 具体的には

#include <boost/serialization/serialization.hpp>
class Data {
private:
   ふにゃふにゃ
 friend class boost::serialization::access;
 template<class _U_> void serialize(_U_& ar,const unsigned int version){
     ar & id;
     ar & p;
     ar & T;
     ar & v;     //    ← ここ, v[3]ぢゃねえよ
  }

public:
 int id;
   double p;
   double T;
   double v[3];
  もごもご
};

これでOK! お前のクラスのメンバーがstd::string, std::vectorを含んでいても, Boost Version 1.64あたり?以降では

#include <boost/serialization/serialization.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/vector.hpp>

...
class Data {
    ほげほげ
    std::string Fuck;
    std::vector<double> AssHole;

    もにゃにゃ
template<class U> void serialize(U& ar,const unsigned int version){
    ふがふが
    ar & Fuck;
    ar & AssHole;

   ぽぽぽ
}

これでおーけー. 文字列の長さと内容, std::vectorの長さと内容は自動で設定されます. Serialization対応のクラスであれば, その配列 std::vector<Serialization対応クラス> の転送方法は, 通常の変数と同じです. まあこんな感じですかね:

switch (com.rank()) {
case 1:  requests.push_back(com.isend(0,this_tag , DS.data(), DS.size()));//Dataの配列を送信
    break;
default: requests.push_back(com.irecv(1,this_tag , DS.data(), DS.size()));//Dataの配列を受信
    break;
}
....
while (requests.size()>0) {
  auto DONE=boost::mpi::wait_any(requests.begin(), requests.end());
  auto ST=DONE.first;
  requests.erase(DONE.second);
  auto src_rank=ST.source();
  auto tag=ST.tag();
  boost::optional<int> count=ST.count<Data>();
  if (src_rank<0) continue;
  if (!count) continue;
  ...(必要なら何か処理)
}

これでお前の全てのクラスが自動的に配列サイズまで含めて別のCPUにコピーできる.

Serialization: お前のクラスを皮で包んでMPIに突っ込め

上で述べたSerializationの方法では, クラスにお友達を作成しました. しかし, システムや外部のライブラリーで定義されたクラスは, この方法では転送できません. あなたがクラスの定義を変更できないからです. そのような場合には, クラスを皮で包んでMPIに投入することが可能です. その方法は・・・ここの方が丁寧に書いてあります.

その他の便利命令

MPIには, いくつかの便利機能があります. 

com.size() 現在のプロセス数を取得
com.rank() 自分のプロセス番号を取得
com.barrier()

全てのプロセスがbarrier()を実行するまで待機します. 注意しなければならないのは, 「全てのプロセスが, プログラムの,この行のbarrier()を実行する」ことは保証されないことです.保証されるのは, 「全てのプロセスが, プログラム中のどこかのbarrier()を実行する」ことだけです.

当然ですが, barrier()は計算速度を著しく低下させます.
いや, barrier()が遅いのではなく, barrier()を使いたくなる貴方が遅いんですが.

本当にbarrier()が必要か,よく検討して利用してください.

com.abort() もし可能ならば,他のプロセスを道連れにして自爆します.暴走しているプロセスに自爆命令を出しても,聞き入れてもらえないでしょう.

特殊な通信

送信の基本はisend()してwait()することです:

requests.push_back(com.isend(dst_rank,my_tag,V));
他の計算
auto DONE=boost::mpi::wait_any(requests.begin(), requests.end());

通信が1個しかないなら, ちょっと面倒ですね. そこで, 面倒なので他の計算はしないことにして, 手短に書くことが可能です:

com.send(dst_rank,my_tag,V);

これでは通信中に計算を進めることができないので計算速度が低下してしまいます. しかし, まあ, 1行ですので, プログラマーの速度が向上します.

おいおい,そのためにエディターにはCode Snipet機能が付いているんじゃねえか?とはいえ,Code Snipet機能を理解する気がないユーザーも多いので, いたしかたありません

同様に受信も1行です.もちろんデータが到着するまで,ただ,ボケーっと待つ命令です.電気の無駄遣いですね.

com.recv(src_rank,my_tag,V);

見た目が簡単ですね♪ これを応用して,いろいろな命令が準備されています. コーディングの速度を向上させ,実行速度を低下させることが可能になります.まあでも, あまりちびちびしたデータをisend/waitするとスレッドの発生消滅が増えるので,場合によっては,速度が向上するかもです.でも, send(), recv()は後に説明するデッドロックが発生しやすいので, 気をつけましょう.

いやいや,ちびちびしたデータを通信するとオーバーヘッドが多くなるくらいは当然としてプログラム書くだろ普通?だいたいEthernetのヘッダーって20バイトくらいあるしな. 1バイトでも送れば21バイト通信だぞ.100本10円のネジを一本送料320円で買ってたら破産するし.どうせ送るなら,20バイトが無視できるくらい(パケットサイズ1500バイト未満の)でかいサイズで送るだろ?

boost::mpi::broadcast(com,V,src_rank) src_rankノードのデータVをcomのみんなにばらまく. Vは配列std::vector<T>でも良いが, std::vector<T*>はダメです. ポインターの転送はSerializationを利用してくださいね.
boost::mpi::gather(com,V,vec,dst_rank) データVをdst_rankに集める. 収集先はstd::vector<Type>のVec(それ以外のCPUでは省略可). これでVec[rank]にデータが得られる.
boost::mpi::reduce(com,V,op,val,dst_rank) gather後に演算opを行った結果を与える. dst_rank以外では演算結果valは省略しても良い. 演算子opは, 例えばstd::plus<T>()のとかboost::mpi::min<T>()のように指定する.
boost::mpi::all_reduce(com,V,op,val) reduceして結果をbroadcastする. どこに集めるか, は意味がないので指定しない. valを省略して戻り値を利用しても良い.

いくつかの例を示す:

boost::mpi::reduce(com, dat, output, std::plus<int>(), 0);
std::cout << com.rank() << " output(+)= " << output << std::endl;
boost::mpi::reduce(com, dat, output, boost::mpi::minimum<int>(), 0);
std::cout << com.rank() << " output(min)= " << output << std::endl;
boost::mpi::reduce(com, dat, output, boost::mpi::maximum<int>(), 0);

----

1 output(+)= -272636304     ← 収集先であるrank=0ではないので,outputの値はメチャクチャ
1 output(min)= -272636304
0 output(+)= 5
0 output(min)= 1

こんなかんじですわ.

デッドロック通信

1対1通信の基本は, 両者が送信及び受信を行うことです.では,両者が送信したらどうなるのでしょうか?答えは,データサイズに依存します.

小さなデータでは, 如何にsend()/recv()であっても送信用一時メモリー(バッファー)に入ってしまうので,まず両者の送信が終了し,次に,両者の受信が終了します.

データが大きくなると,バッファーが溢れます.すると,両者が「おまえ,さっさとrecv()しろよ」「いやいや,おれも漏れそうなんだよ.お前が先にrecv()しろよ」「いやいやいや,そこはあなたが先にry)」となって,動かなくなります.これをデッドロックと言います.

この対策として,世間では,「1方が送信→受信,別の方が受信→送信」でプログラムせよと指示が出ています.いい考えですね!では,3台のノードがそれぞれ1対1通信する場合は・・・32台のノードのいくつかのペアが1対1通信する場合は・・・どうするんでしょうね.まさか492通りの通信を順番を定めて行う・・・ほど暇ではないですよね?

この対策は大変です.表面上は簡単なsend()/recv()は,実際には, 大変な苦労が伴います.非同期通信 isend()/irecv()ではどうでしょうか?1234台のノードが同時に送信を開始してもブロックされません.なぜならば,それが仕様だからです.各ノードが1233通の受信(1e+3277通りの受信パターン)を待ち受けてもブロックとは無縁です.受信した順に,処理すれば良いのですから.