メインコンテンツに移動

Boostで通信

インターネットがこんなに進歩しているのに,俺っちのプログラムは知らんぷり・・・てなこた,ねーだろう.と思うんだけど.というわけで,対応してみましょう.

Boost.Asio


Boost.Asio

ネットワークを含めた各種非同期通信のライブラリーです.まあこりゃわかるだろう

Boost.Beast

Boost.Asioは通信の基本が可能になりましたが,いや,大変基礎的すぎて,情報分野以外のユーザーは使えません.そこで,もう少し楽チンに通信するのがBeastです.具体的には, ブラウザとの http 通信に限定したライブラリです.

サンプルプログラムの作成

とりあえず役に立つプログラムを作成しよう. それは次のようなものである:

  • プログラムの作成者は,boostだとかasioだとか,知るわけがない,と仮定する.なぜならば,彼らの専門は情報工学ではなく, 情報通信の細かいことを学ぶ余裕がないからである.
  • それでも大丈夫,ブラウザと通信できるようになる.
    • 通信内容は非常に限定し,せいぜいテキスト文字列の交換であるとする.
      • ま,キーボードで入力できる程度のもの,ってことさ
    • ユーザーが知る必要がないことであるが,通信には「WebSocket」とかいうやつを利用する.
      • プログラム間の通信がブラウザでできるってやつなー
    • 現状暗号化SSLに非対応であるので,安全な室内とかでならOK.それ以外なら傍受されまくると思うよ
  • まず,TCPクラスを作成する.
    • TCP server(logger,port,N);
      • loggerは, サーバーの更新記録を残すlog4cplusのインスタンス
      • portはC++プログラムを実行するマシンで利用できる1024以上の数値. Firewallでブロックされるのはダメ
        • 安全な屋内でFirewallする必要はないでしょ
      • Nはスレッド数で, Nクライアントが同時にアクセス可能になる. あんたのCore数より多くしても無駄
  • TCPクラスに, 通信チャンネルを好きなだけ登録する
    • auto& NHK=server.Protocols["NHK"];
      auto& TBS=server.Protocols["TBS"];
      • あんたのテレビがアンテナ1つで裏番組が録画できるように,1ポートで同時並行独立して通信できる.
      • 各チャンネルで「こういう命令がきたら,このように反応する」という通信の段取り(プロトコル)を規定する.
      • テレビのようなブロードキャストとは異なり,同じチャンネルで,通信相手ごとに独立した通信である.
      • 正式にはWebSocketの「サブプロトコル」という.
        • WebSocke自身tが「プロトコル」なんで・・・その下じゃけん「サブプロトコル」なんよ
    • NHK.on_open=[](){....}   ブラウザが接続してきた時に,なんか余計な作業が必要なら,このラムダ式に書く.
      • ラムダ式のキャプチャ部分[]に,必要な変数を並べても良い.
      • 同様に, on_close, on_error も定義可能である.
    • TBS.on_message=[](TCP::Session& session,const std::string& message){.....}
      • ブラウザがなんか送ってきた時, それがmessageに入って,この関数が起動する.
      • 同じチャンネルを複数のブラウザが開いていた時でも, sessionの部分が異なるので,区別可能ってわけだ
        • 特定のブラウザが特定のチャンネルを開いて,なんか通信している,ってのを「セッション」という.ブラウザが通信を終了するとセッションは終了する.
        • TCPクラスには, 自分からブラウザを切断する機能は装備されていない.
      • session.send(reply)    受け取った message で何か処理したら,ブラウザに返事をしたいと思うだろう. それはsessionの関数send()で行える.送信できるのはconst std::stringのreply変数である.

もうめんどくさいのでサンプルに書いちゃった

 

 

TCPクラスの定義

TCPクラスの役目は,

  • 指定された数だけスレッド(受付嬢)を用意
  • 受信するチャンネルを設定

プロトコルクラスの定義

セッションクラスの定義

着信ベルの定義

 


Beast

BoostでHTTPとWebSocketを実現しようというものです.2015年頃に乱立してたWebSocketライブラリーの姿が消えたと思ってたら,正規軍がやってきてたんですね・・・そりゃアマチュアは撤退するわな・・・

  • HTTP サーバーおよびクライアントに対応
  • WebSocket サーバーおよびクライアントに対応

仕事で嫌々ながらプログラム作っているわけですから,さっさと正規軍に乗り換えましょう.

アマチュアさんたちは作業放棄して逃亡しているので,何か更新があるたびに俺のプログラムがビルドできなくなる・・・面倒というより,放棄されたライブラリを自力で維持するのは無駄骨で不可能

WebSocketサーバー・クライアント:理解する前に試してみる

Boostにサンプルがついている:

.../boost/libs/beast/example/websocket/server/async/

まあ他のも適当にゲットしてきた.CMakeLists.txt もまあ適当に用意する〜

CMakeLists.txt:
cmake_minimum_required(VERSION 3.9)
set(CMAKE_BUILD_TYPE Release CACHE STRING "None Debug Relase ...")
project(websocket CXX)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_C_STANDARD 99)
set(CMAKE_C_STANDARD_REQUIRED ON)
find_package(Boost 1.71.0 REQUIRED COMPONENTS
    date_time coroutine date_time filesystem graph log_setup program_options random regex serialization system thread timer
)
include_directories(
    ${Boost_INCLUDE_DIRS}
)
link_directories(
    ${Boost_LIBRARY_DIRS}
)
add_executable (server
    websocket_server_async.cpp
)
add_executable (client
    websocket_client_async.cpp

Linux_gcc.cmake:
set(CMAKE_C_COMPILER gcc CACHE STRING "GCC compiler" FORCE)
set(CMAKE_CXX_COMPILER g++ CACHE STRING "G++ compiler" FORCE)
set(CMAKE_CXX_FLAGS "-std=c++14" CACHE STRING "G++ CXX_FLAGS" FORCE)
set(CMAKE_CXX_FLAGS_DEBUG "-O0 -g" CACHE STRING "G++ CXX_FLAGS_DEBUG" FORCE)
set(CMAKE_CXX_FLAGS_RELEASE "-fopenmp -O3" CACHE STRING "G++ CXX_FLAGS_RELEASE" FORCE)
set(CMAKE_EXE_LINKER_FLAGS "-Ofast -lpthread" CACHE STRING "GCC LD_FLAGS" FORCE)

こんな感じでいいか?

mkdir Linux_gcc
cd Linux_gcc
cmake -C ../Linux_gcc.cmake ..
make server client

さあ動くかな? まずはサーバーマシン 10.249.229.225 でサーバーを稼働させる:

./server 10.249.229.225 5963 3  ← 5963ポートで3スレッド起動

ほんで別のマシン10.249.229.223からアクセスしてみる:

./client 10.249.229.225 5963 KABA
KABA

ほうほう. お返事を返してくれるのね.

サーバープログラムの構造

動作することがわかったので, 理解してみよう. まずはサーバーである. ドキュメントは「おまえ, ASIO知ってるんだよな」「お前,とーぜん,RFC6455には詳しいんですよね?」とか言っている.無茶言うな.そんな妙なもん知るわけないが,俺は使わないといけないんだタコ.完全無視でレッツゴー(なんか実験でも計算でも,いつもこのノリが必要なのはどうかと思うが)

・・・ん?他の注意もあるらしいぞ

  • あー,IOなんですからスレッドセーフではありませんよ ← そらな

さて,RFC6455を読んでおけと主張している割には,サンプルプログラムの問題設定の記述が欠落している.我々流体力学者が情報工学を知るわけがないので,そこを説明してくれないと困るのじゃが. 大前提は

  • Facebookのユーザー数(同時に書き込んでくる奴らの総数)よりも,サーバーのCPU数が圧倒的に小さい

ということのようですね. CPU数〜スレッド数ですので,次のことが従います:

  • 1スレッドで多数のコネクションを管理する必要がある
  • 多数のスレッドは同様の構造を持つ.
  • スレッド数の増加は同時並行処理能力の増大を意味するだけ

・・・これ,大多数の数値解析者には該当しないと思うんですけど.ガチの流体シミュレーションを何万ユーザーが並走させるなんてありえない.人の顔を勝手に並べてそこらの画面に表示するのとはワケが違うのでね.とはいえ,サンプルがそうなっているので,とりあえず話に乗って理解する必要があります.

Nスレッドで多数のサーバ・クライアント間コネクションを管理しようとすると,会話の順番を保持する仕組みが必要になります.順序に無関係に各スレッドが会話を行うと,

  1. コネクション1の会話
    • 1Aクライアント「おはよう!」
    • 1Bサーバー「あーXXちゃん,きのうは大変だったねー」
    • 1Cクライアント「ほんっと,たまんねえなー」
  2. コネクション2の会話
    • 2Aクライアント「昨日頼んでた書類できた?」
    • 2Bサーバー「ええっと,最後のページが欠けてる以外はできてますけど」
    • 2Cクライアント「ぜーんぜん大丈夫だよ!」

であるはずのものが,

  1. スレッド1の会話
    • 1Aクライアント「おはよう!」
    • 2Bサーバー「ええっと,最後のページが欠けてる以外はできてますけど」
    • 1Cクライアント「ほんっと,たまんねえなー」
  2. スレッド2の会話
    • 2Aクライアント「昨日頼んでた書類できた?」
    • 1Bサーバー「あーXXちゃん,きのうは大変だったねー」
    • 2Cクライアント「ぜーんぜん大丈夫だよ!」

になってしまうからです.というわけで,マルチスレッドで通信するときにはstrandってのを使うそうな.

例えばコネクション一つにstrandを1つ割り当てるとする. コネクションが5個なら,strandも5個だ.strand一つづつイベントとコールバックを設定できる. サンプルプログラムでは

  • listenオブジェクトが, 1つのスレッドで受信を待ち受ける. そのコールバックは
    • strandを一つ作成して,下の読み込みを待ち受け
  • strandでデータ読み込みを待つ.コールバックは
    • 読み込んだデータをそのまま返送し,その後再び読み込みを待ち受け

となるとですね,こんな感じだ.

スレッドの代わりに電話受付嬢が2人用意してあるとしましょう.受付嬢1に,あるクライアント1が接続し「ば〜かば〜かば〜かば〜か」とクレームを送信すると,受付嬢1がstrand1を一つ作成して, クレームを読み込んでから,クライアントに「おまえがば〜かば〜かば〜かば〜か」と返送したらクライアント1が「何それ.なめてんのかてめー」と返送したら受付嬢1は「それが仕様となっております」と返送してガチャっと受話器を戻した(セッション終了).この後,怒り狂ったクライアント1が「マジ信じらんない,ありえない,ば〜かば〜かば〜かば〜か」と送信してくるとしよう.さて,このゴタゴタの間に別のクライアント2が「タコ」と送信してきたとしよう.これは,暇している受付嬢2が同じようにstrand2を作成して作業を行うが, 文字数が小さいので早く終了する.その後,クライアント1の「マジ信じらんない,ありえない,ば〜かば〜かば〜かば〜か」が到着する(strand3になる).ここで受信処理をするのは受付嬢1とは限らない.受付嬢2が暇しているから,受付嬢2がstrand3を処理しても問題はない.

このようにして,多数の受付嬢が複数のコネクションを矛盾なく処理できるって寸法のようだ. 上の例では,受付嬢1は受話器を戻してセッション終了しているが,そうならない場合(strand3にならずstrand1のままの場合)でも受付嬢2が「マジ信じらんない,ありえない,ば〜かば〜かば〜かば〜か」を処理しても良いらしい.

もちろん受付嬢が多い方が,同時に多数のクレームを処理できるが,そりゃ予算(CPUパワー)の話であって,動作原理としては,受付嬢が1人であっても稼働する.受付嬢が1人であれば,strandの山が彼女の机に積み上がり,彼女が1つづつ処理する.要するに反応の遅いサーバー(Nコネクションで速度が1/Nになる),となるが,反応しないサーバー(2コネクション以降は永遠に反応を返さない),ではない.

BOOST-BEASTのサンプルを解読

さて,ここまで理解できたところで,サンプルプログラムに取り組もう.サンプルサーバーは次のような構造をしている:

boost::asio::io_context ioc{threads};  ← 念のため,()の代わりに{} でコンストラクタを呼び出している
std::make_shared<listener>(ioc,boost::asio::ip::tcp::endpoint{address,port})->run();

はて? io_contextにthreads引数を与えて初期化すると何が起こるのか知らないんだけれども.てゆうか,ちょっと前まで io_service とかゆうてた奴やよなおめえ?ここに引数を与えて良いなんて・・・誰も知らないのでは.で,何が起こるんですかねコレ?

listenerは上の方で定義したユーザーのサーバー本体で, スマートポインターで確保して, 直ちにrun()させていることはわかりますわ. endpointについてはASIOの部ですでに説明した. listner.run()は後で出るが, 着信時のコールバックを設定して終了する関数です.まあつまりio_contextに着信した時の設定が,ここまでで終了するわけ.

std::vector<std::thread> v; v.reserve(threads-1);   ← 指定されたスレッド数のスレッドの枠を準備してますね
for(auto i=threads-1;i>0;--i) v.emplace_back([&ioc]{   ← 枠にラムダを当てはめてはるようですな. io_contextは共有する気だ
     ioc.run();     ←  io_contextでrunすると通信を開始するってのはASIOのところで説明した通り
  });
ioc.run();  ← 親分も走る気だ
}   ← みんな死んだ.親分も自決する

というわけで,本体は,io_contextにthread引数を入れるってのが初耳なくらいで, まあ,ありがちな感じですね.ただ単に, threadが1より増えれば作業が捗る,というだけである. 

listenerコンストラクタ

  • コンストラクタはio_contextとendpointを受け取り, boost::asio::ip::tcp::acceptorを作成する.
  • acceptor.open(endpoint.protocol(),error_code);    ←ポート開いてー
  • acceptor.set_option(boost::asio::socket_base::reuse_address(true),error_code)  ←終わったら解放する
  • acceptor.bind(endpoint,error_code);                    ← TCPに結合して
  • acceptor.listen(boost::asio::socket_base::max_listen_connections,error_code)    ←従来は引数なしで使ってましたわ

・・・いや?普通なんですけど. Beastってどこで使うのかにゃー

listener.run()

これは listener.do_acccept()するだけである.

listener.do_accept()

  • acceptor.async_accept( boost::asio::make_strand(io_context),   ← 通信が到来したら,strandを作成
           boost::beast::bind_front_handler(       ← ようやく boost.Beast の登場
                   &listener::on_accept,             ← なにやらコールバックを設定しちょる
                   shared_from_this()));

ようやく出てきました.async_accept()は, 非同期着信にコールバックを設定する関数だそうです.設定したらすぐに終了する. Boost.Beastは, ここでbind_front_handler()を呼び出す.これは

オリジナルのコールバック (listner::on_accept) を, 引き続く引数を与えて呼び出すコールバックを作成する

のだそうだ.shared_from_this()が何なのかも不明だ・・・って,これはリンク先に良い説明がある.ようするにshared_ptrで確保したオブジェクトを指し示すスマートポインターを1個余分に作って渡す仕組みですね.ええっとつまり, io_contextに着信があると, listner.on_accept()が走るのであるが, そのthis(オブジェクト)は,受話器を取った奴,となるように設定されているんだろうな..

listener.on_accept(error_code, boost::asio::ip::tcp::socket socket)

お客様センターの電話機が鳴動し,担当の受付嬢(thread)が受話器を取り上げた(ので,取り上げた受付嬢がthisで特定された)ところまできました.この後の作業は

std::make_shared<session>(std::move(socket))->run();
do_accept()

sessionクラスのオブジェクトが作成され, 引数のsocket(さっきshared_from_this()で作成したスマートポインタ)の所有権がsessionオブジェクトに移管される.ほんで,session.run()が行われる. session.run()が終了したら,明け渡したスマートポインタは消える

なんでstd::move()するのかというと,引数に書いたら (1) run()の終了で1個参照が減る (2) on_accept()の終了で1個参照が減る,で2個減ってしまって勘定が合わないから,だろうな

sessionが終わったら,新しいsessionを受け入れるべく, do_acceptを行う.

  • と,いうことは.このスレッドは,session.run()をやっている間は, 新規sessionを受け入れないってことなんですね.

あとは, sessionを理解するだけだ!あーもーいやだ,なんでupdateのたびにこうなる?!って,ガラクタライブラリはWebSocketが最後だから,これで,このゴタゴタともおさらばだ.もう少しだ!

Sessionコンストラクタ

こいつあ,なんとも通信らしいクラスである.メンバー変数には

boost::beast::websocket::stream<boost::beast::tcp_stream> ws;

が含まれている. これはもちろんTCPストリームからWebSocketストリームを作成する. こいつの初期化はコンストラクタのリストで

explicit session(boost::asio::tcp::socket&& socket) : ws(std::move(socket))

となっているワケね. 

session.run()

ではrun()は何するのか?

boost::asio::dispatch(ws.get_executor(),
     boost::beast::bind_front_handler(
        &session::on_run,
        shared_from_this()
     )
);

get_executor()とは? 多分, wsストリームを駆動しているひとーつまり, io_contextのこと.dispatch()とは? 多分,自分のスレッドに仕事をサブミットする(後で暇だったらやっちまえ,と依頼する)みたいな感じか?

・・・・不明なことも多いが, まずは session.on_run()が実行されるのではないだろうか.

session.on_run()

ではon_run()は何するのか?

//   なんだか知らんけどtimeoutをサーバー的な値にでも設定している感じ
ws_.set_option( boost::beast::websocket::stream_base::timeout::suggested(boost::beast::role_type::server));
//  なんかデコデコとか言っているんだが,なんだこれ
ws_.set_option( boost::beast::websocket::stream_base::decorator(
     []( boost::beast::websocket::response_type& res)
            {
                  res.set(http::field::server,
                         std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async");
            }
       )
);
//   ここで普通にaccept
ws_.async_accept(
    boost::beast::bind_front_handler(
        &session::on_accept,
        shared_from_this()));

謎のデコって野郎はここだな. たしかWebsocketはHTTPの規約に則って文字列を通信するのだが, そこで「おいこら以降はWebSocketだupgradeしやがれこのタコ」「了解したupgradeしたぞクズ野郎」とかなんとか,罵り合っているという話を聞いたことがある.それをupgrade要求と,返答というらしい.ここに,なんの役に立つのか不明なオプション文字列をつけられるらしい.なんの役に立つんだ?まあええか.

session.on_accept()

do_read()します. んまあ,read/writeの無間地獄への入り口ですねー

session.do_read()

ws_.async_read(
  buffer_,    ← あ,これメンバー変数ね.boost::beast::flat_buffer と宣言されている. ここに受信データが入る
  boost::beast::bind_front_handler(    ← いい加減こればっかで,飽きてきた
      &session::on_read,      
      shared_from_this()));

ま,要するに, on_read()するってことで.

session.on_read()

on_read( boost::beast::error_code ec, std::size_t bytes_transferred){
   boost::ignore_unused(bytes_transferred);
   ...エラー処理....
ws.text(ws.got_text());  ← 到来したデータが文字列ならws.got_text()=trueになる.送信データも同じに設定 
ws_.async_write(
   buffer_.data(),  ← ここに送信データを指定.この例では, 受信データを指定しているな
   boost::beast::bind_front_handler(      ← げっぷ
      &session::on_write,
      shared_from_this()));
}

受け取った文字列をそのまま突き返しているね.で,on_write()をやる.

session.on_write()

on_write(beast::error_code ec,std::size_t bytes_transferred){
        boost::ignore_unused(bytes_transferred);
     ・・・エラー処理してる・・・
  buffer_.consume(buffer_.size());
  do_read();    ← はい,無間地獄に戻るワケです
}

ん.一応,サーバープログラムは読んだ.疑問はあるが大体動きがわかった.で,面白いから改造してみる.

  • 数値シミュレーションでやりたいのは,まあちょっとWebページとおしゃべりしたいだけである.だからスレッドは1つで良い.
  • スレッド1つでも,この実装では,一人がセッションやっている間は,絶対2人目は待たされることになるのだろう.
    • それを確認する.
    • 待たされるだけで,一人目が立ち去ったらちゃんと二人目の面倒を見てくれるのなら,問題ない
    • そうでない場合,2スレッドで実行することになる
      • 現在の何ちゃってWebSocketライブラリーは,意味不明だが3スレッド使ってたと思う
  • 1つのソケットで多数チャンネルの通信ができたと思うんだけど,その機能がない気がする.
    • たとえばJavaScriptでWebSocketを作成するときには, http://アドレス:ポート/プロトコル  で指定するのだが,この「プロトコル」部分がない.Sec-Websocket-Protocol とかいうヘッダーのやつな,
    • それは,これだわ
    • てか,そりゃクライアントの話でした.サーバーは・・・お・・・Beastの作者は興味がないみたいね.それは,たぶん,方針として,かなり間違っていると思いますよ.みんなWebSocketは http://サーバー名:ポート/サブプロトコル である,と信じてますからね.

サーバー側でサブプロトコル検出

サブプロトコルの指定は,クライアントとサーバーの罵り合いの最中に叫ばれる.ので,それを別途読み込んでからwebsocketのacceptをすれば良いのだ.つまりこうだ:

listner::on_accept:
    on_accept(beast::error_code ec, tcp::socket socket) {
        if(ec) fail(ec, "accept");
        else {
            // Inspect header
            http::request<http::string_body> req;//ここでrequestを入れる場所を確保
            beast::flat_buffer req_buffer;
            http::read(socket,req_buffer,req);//ここでreqに色々入る
            if (!websocket::is_upgrade(req)) return;
            std::make_shared<session>(std::move(socket),req)->run();
        }
        // Accept another connection
        do_accept();
}

どこをいじったのかというと,listnerに着信があったとき, とりあえずhttpのリクエストなんじゃ?と読み込み, reqに入れました.はてそれがwebsocketのupdateリクエストであれば, sessionをmake_sharedで作成するんですが, そこに, reqも渡しちゃうわけよ. もちろんsessionのコンストラクタも変更だ:

class session:public std::enable_shared_from_this<session>{
    boost::beast::websocket::stream<boost::beast::tcp_stream> ws_;
    boost::beast::flat_buffer buffer_;
    http::request<http::string_body> req_;//session headers
public:// Take ownership of the socket
    explicit session(tcp::socket&& socket,http::request<http::string_body>& req) : ws_(std::move(socket)), req_(req) {
        std::cout << "Session open with:" << std::endl;
        for(auto const& field:req) std::cout << "\t" << field.name() << " = " << field.value() << std::endl;
}

コンストラクタで, まあ面白いからという理由で, セッションリクエストの中身を画面に表示している. さて,これでupgradeをacceptするのであるが, 肝心のリクエストはreq_変数に入ってしまっているので, それを使うように変更:

void session::on_run() {
   ....
   ws_.async_accept(req_, beast::bind_front_handler(&session::on_accept,shared_from_this()));
}

これで, クライアントが接続してきたら, そのリクエストの詳細をドバッと画面に出してからacceptするように変更できたって訳だ.

クライアント側でサブプロトコル指定

クライアント側でのサブプロトコル指定は,handshakeする前にこんな感じでOK

...
ws_.set_option(websocket::stream_base::decorator(
    [](websocket::request_type& req) {
       req.set(http::field::sec_websocket_protocol,"oeeee");   ←サブプロトコル「oeeee」を指定
       req.set(http::field::user_agent,std::string(BOOST_BEAST_VERSION_STRING) +" websocket-client-async");
}));
...

実行してみる:

$ tcp2 127.0.0.1 59634 1
Session open with:
    Host = 127.0.0.1
    Upgrade = websocket
    Connection = upgrade
    Sec-WebSocket-Key = YzwNCEAAA/l5OjaFy3s4pw==
    Sec-WebSocket-Version = 13
    Sec-WebSocket-Protocol = oeeee  ←吐きそうなサブプロトコルだな・・
    User-Agent = Boost.Beast/248 websocket-client-async
GOT 8[aho-baka]
Session ended.

サーバーのsessionインスタンスでreqがわかっているので, サブプロトコルに応じた処理を記述すればオーケーだよん.