2019-09-29に更新

Rustの並列処理を学ぼうぜ☆(^~^)?<その5>

読了目安:14分

前回の記事

Chaining computations

Chaining computations

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 次のサンプル・プログラムのお題は ↑ 計算の連鎖 だぜ☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 また休みが終わる……☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 休みが終わる前に プログラミングの独習 が終わればいいのよ!」

chai-comp/main.rs

extern crate tokio;
extern crate bytes;
#[macro_use]
extern crate futures;

use tokio::io::AsyncWrite;
use tokio::net::{TcpStream, tcp::ConnectFuture};
use bytes::{Bytes, Buf};
use futures::{Future, Async, Poll};
use std::io::{self, Cursor};

// HelloWorld has two states, namely waiting to connect to the socket
// and already connected to the socket
enum HelloWorld {
    Connecting(ConnectFuture),
    Connected(TcpStream, Cursor<Bytes>),
}

impl Future for HelloWorld {
    type Item = ();
    type Error = io::Error;

    fn poll(&mut self) -> Poll<(), io::Error> {
        use self::HelloWorld::*;

        loop {
            match self {
                Connecting(ref mut f) => {
                    let socket = try_ready!(f.poll());
                    let data = Cursor::new(Bytes::from_static(b"hello world"));
                    *self = Connected(socket, data);
                }
                Connected(ref mut socket, ref mut data) => {
                    // Keep trying to write the buffer to the socket as long as the
                    // buffer has more bytes available for consumption
                    while data.has_remaining() {
                        try_ready!(socket.write_buf(data));
                    }
                    return Ok(Async::Ready(()));
                }
            }
        }
    }
}

fn main() {
    let addr = "127.0.0.1:1234".parse().unwrap();
    let connect_future = TcpStream::connect(&addr);
    let hello_world = HelloWorld::Connecting(connect_future);

    // Run it, here we map the error since tokio::run expects a Future<Item=(), Error=()>
    tokio::run(hello_world.map_err(|e| println!("{0}", e)))
}

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 ソースコードを見ていこうぜ☆」

extern crate tokio;
extern crate bytes;
#[macro_use]
extern crate futures;

Cargo.toml

[dependencies]
tokio = "0.1"
futures = "0.1.26"
bytes = "0.4.12"

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 Cargo.toml の書き方は crates.io で自分で調べような☆」

use tokio::io::AsyncWrite;
use tokio::net::{TcpStream, tcp::ConnectFuture};
use bytes::{Bytes, Buf};
use futures::{Future, Async, Poll};
use std::io::{self, Cursor};

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 std::io::Cursor というのは初見だぜ☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 self がなんで std::io::{☆} の中にいるの?」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 std::io そのものという意味じゃないか☆?」

20190929comp45a1.png

// HelloWorld has two states, namely waiting to connect to the socket
// and already connected to the socket
enum HelloWorld {
    Connecting(ConnectFuture),
    Connected(TcpStream, Cursor<Bytes>),
}

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 ↑Rust言語での enum 列挙型は、あとで match 構文というのが出てきて、それと対応する☆」

impl Future for HelloWorld {
    type Item = ();
    type Error = io::Error;

    fn poll(&mut self) -> Poll<(), io::Error> {
        ☆
    }
}

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 ↑Futureトレイトの impl (実装)の書き方は お決まりだったな☆」

20190929comp45a2.png

        use self::HelloWorld::*;

        loop {
            match self {
                Connecting(ref mut f) => {
                    let socket = try_ready!(f.poll());
                    let data = Cursor::new(Bytes::from_static(b"hello world"));
                    *self = Connected(socket, data);
                }
                Connected(ref mut socket, ref mut data) => {
                    // Keep trying to write the buffer to the socket as long as the
                    // buffer has more bytes available for consumption
                    while data.has_remaining() {
                        try_ready!(socket.write_buf(data));
                    }
                    return Ok(Async::Ready(()));
                }
            }
        }

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 その中に 前回まで 準備できているItem、準備できていない、エラーError の Poll型マッチが あったが、
今回は 接続しているFuture、接続できたTcpStreamとCursor という2択だな☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 そんなん変えていいのかだぜ☆?」

        use self::HelloWorld::*;

        loop {
            match self {
                ☆
            }
        }

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 前回までは プロパティに Future を持っていてPoll型マッチをしていたが、
今回は 自分自身を使ってマッチを行うみたいだな☆ 全体をループで囲んでいるのも 今までと違う☆」

            match self {

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 なんで こんな書き方ができるの?
self って HelloWorld 列挙型じゃない。
HelloWorld 列挙型が 自分に poll メソッドを付けて、 自分自身を条件にしてマッチ構文で分岐するの?」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 そうだぜ☆
列挙型のインスタンスは、自身のプロパティのどれか1つでもあるんだぜ☆ これを State(ステート; 状態)と考えることもできるぜ☆」

接続中のとき Connecting(ref mut f):

                    let socket = try_ready!(f.poll());
                    let data = Cursor::new(Bytes::from_static(b"hello world"));
                    *self = Connected(socket, data);

接続完了のとき Connected(ref mut socket, ref mut data):

                    // Keep trying to write the buffer to the socket as long as the
                    // buffer has more bytes available for consumption
                    while data.has_remaining() {
                        try_ready!(socket.write_buf(data));
                    }
                    return Ok(Async::Ready(()));

KIFUWARABE_80x100x8_01_Futu.gif
「 片方は *self = Connected(socket, data); 自身を変更していて、
もう片方は return Ok(Async::Ready(())); リターンしている、何だぜこれ☆?」

20190929comp46a1.png

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 全体を loop で囲んでいたことを思い出せだぜ☆
return しなければ抜けないし、
何もしなければ ループするぜ☆」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 このプログラムは 必ず準備ができている前提なんで、 準備中であるケースがあるのか知らないが……☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 じゃあ イベント・ループ とか、 メイン・ループ ってやつに近いわね」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 ゲーム・プログラミングするやつの基本テクニックだぜ☆」

                    let socket = try_ready!(f.poll());

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 f.pool() って結局 フューチャーから 準備できているItem、準備できていない、エラーError を取ることよね。
try_ready! マクロは、準備できているItem 以外の結果はエラーにするってことでしょ」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 その Poll 型が socket変数に入るぜ☆ あるいは 準備できているItem の Item を取り出して入れるのかもしらん☆」

                    let data = Cursor::new(Bytes::from_static(b"hello world"));

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 これは なんだか分からないけど 文字列 hello world をバイト列にして Bytes オブジェクトを作り、
それを引数に取った Cursor オブジェクトよね」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 なんだか分からないが、そうだな☆」

                    *self = Connected(socket, data);

KIFUWARABE_80x100x8_01_Futu.gif
「 それで自分自身を 接続完了というステータスにして socket と data を持つわけかだぜ☆」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 流れるようなコーディングだぜ☆」

                    while data.has_remaining() {
                        try_ready!(socket.write_buf(data));
                    }
                    return Ok(Async::Ready(()));

KIFUWARABE_80x100x8_01_Futu.gif
「 接続完了すると、接続完了のステータスになって ここに入ってくる☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 data には Cursor オブジェクトが入っているわよ」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 Cursor に残りがある限り ぐるぐる回るように見える……☆
try_ready!(socket.write_buf(data)); というのは、結果を捨てているだけで、やっていることは
socket.write_buf(data) だぜ☆ ソケットに向かって何かを書きだしているのだろう☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 全部 吐き出すと Ok 準備でけた空っぽ、だな☆」

fn main() {
    let addr = "127.0.0.1:1234".parse().unwrap();
    let connect_future = TcpStream::connect(&addr);
    let hello_world = HelloWorld::Connecting(connect_future);

    // Run it, here we map the error since tokio::run expects a Future<Item=(), Error=()>
    tokio::run(hello_world.map_err(|e| println!("{0}", e)))
}

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 最後は メイン・プログラム だが、見どころは……☆」

    let hello_world = HelloWorld::Connecting(connect_future);

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 列挙型の初期状態を Connecting にしているところだな☆」

    tokio::run(hello_world.map_err(|e| println!("{0}", e)))

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 最後に パイプラインに水を流して始まりだぜ☆
エラー処理を チェーンしてから run するのは なるほど こうやるものか、といったところだぜ☆」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 1記事につき 1サンプル・プログラムで進めていこう☆ 次☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 その前にプログラムを実行してくれだぜ☆」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 エコー・サーバーにつないでみるかだぜ……☆」

Output:

PS C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\another-terminal\echo-serv> cargo run
   Compiling ep1 v0.1.0 (C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\another-terminal\echo-serv)
    Finished dev [unoptimized + debuginfo] target(s) in 0.86s
     Running `target\debug\ep1.exe`
Server standup  | 127.0.0.1:9696
Connection from | 127.0.0.1:59067
   4: std::panicking::default_hook

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 なんか知らんが エコー・サーバーは落ちた……☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 あんたの クライアント側のプログラムが すぐ終了するからじゃないの?」

use std::thread;
use std::time::Duration;

// Omitted

    thread::sleep(Duration::from_millis(100));

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 試しにスリープを入れてみるかだぜ☆」

Output:

PS C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\another-terminal\echo-serv> cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.03s
     Running `target\debug\ep1.exe`
Server standup  | 127.0.0.1:9696
Connection from | 127.0.0.1:59101
Info            | Waiting...
thread '<unnamed>' panicked at 'error during receive a line: 既存の接続はリモート ホストに強制的に切断されました。 (os error 10054)', src\main.rs:58:13
stack backtrace:
   0: backtrace::backtrace::trace_unsynchronized

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 うーむ、なんだか分からんな☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 接続の確立までは できてんのよ」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 \n を送信していないとか、そういうことなんだろうか☆?」

Before:

                    let data = Cursor::new(Bytes::from_static(b"hello world"));

After

                    let data = Cursor::new(Bytes::from_static(b"hello world\n"));

Output:

PS C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\another-terminal\echo-serv> cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.03s
     Running `target\debug\ep1.exe`
Server standup  | 127.0.0.1:9696
Connection from | 127.0.0.1:59151
Info            | Waiting... 
Read            | hello world

Write           | poppo      
Info            | Waiting... 
thread '<unnamed>' panicked at 'error during receive a line: 既存の接続はリモート ホストに強制的に切断されました。 (os error 10054)', src\main.rs:58:13
stack backtrace:
   0: backtrace::backtrace::trace_unsynchronized

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 反応がちょっと変わったような……☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 改行は要らなかったのでは☆?」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 なんか ターミナルから 行が消える……☆」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 キャラクター単位で 送信するなんて 成立してないんじゃないか☆?」

                    let data = Cursor::new(Bytes::from_static(b"h\ne\nl\nl\no\n \nw\no\nr\nl\nd\n"));

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 1文字ずつ改行を付けましょう」

PS C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\another-terminal\echo-serv> cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.03s
     Running `target\debug\ep1.exe`
Server standup  | 127.0.0.1:9696
Connection from | 127.0.0.1:59183
Info            | Waiting...
Read            | h

Write           | poppo
Info            | Waiting...
Read            | h
e

Write           | poppo
Info            | Waiting...
Read            | h
e
l

Write           | poppo
Info            | Waiting...
Read            | h
e
l
l

Write           | poppo
Info            | Waiting...
Read            | h
e
l
l
o

Write           | poppo
Info            | Waiting...
Read            | h
e
l
l
o


Write           | poppo
Info            | Waiting...
Read            | h
e
l
l
o

w

Write           | poppo
Info            | Waiting...
Read            | h
e
l
l
o

w
o

Write           | poppo
Info            | Waiting...
Read            | h
e
l
l
o

w
o
r

Write           | poppo
Info            | Waiting...
Read            | h
e
d

Write           | poppo
Info            | Waiting...
thread '<unnamed>' panicked at 'error during receive a line: 既存の接続はリモート ホストに強制的に切断されました。 (os error 10054)', src\main.rs:58:13      
stack backtrace:
   0: backtrace::backtrace::trace_unsynchronized

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 確かに ぽっぽサーバー(エコー・サーバー)は バッファーのクリアーをしてないから こうなるか……☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 クリアーしろだぜ☆」

PS C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\another-terminal\echo-serv> cargo run
   Compiling ep1 v0.1.0 (C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\another-terminal\echo-serv)
    Finished dev [unoptimized + debuginfo] target(s) in 0.78s
     Running `target\debug\ep1.exe`
Server standup  | 127.0.0.1:9696
Connection from | 127.0.0.1:59205
Info            | Waiting...
Read            | h

Write           | poppo
Info            | Waiting...
Read            | e

Write           | poppo
Info            | Waiting...
Read            | l

Write           | poppo
Info            | Waiting...
Read            | l

Write           | poppo
Info            | Waiting...
Read            | o

Write           | poppo
Info            | Waiting...
Read            |

Write           | poppo
Info            | Waiting...
Read            | w

Write           | poppo
Info            | Waiting...
Read            | o

Write           | poppo
Info            | Waiting...
Read            | r

Write           | poppo
Info            | Waiting...
Read            | l

Write           | poppo
Info            | Waiting...
Read            | d

Write           | poppo
Info            | Waiting...
   4: std::panicking::default_hook
             at /rustc/eae3437dfe991621e8afdc82734f4a172d7ddf9b\/src\libstd\panicking.rs:214

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 ぽっぽサーバー(エコー・サーバー)の方が 改行で1コマンドとして読み取っているのか……☆?」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 これ エコーしてないわよね。 poppo サーバーなのよ」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 もっと細かく エラーを出力してみよう……☆」

Output:

Write           | poppo
Info            | Waiting...
Read            | r

Write           | poppo
Info            | Waiting...
Read            | l

Write           | poppo
Info            | Waiting...
Read            | d

Write           | poppo
Info            | Waiting...
Error           | 既存の接続はリモート ホストに強制的に切断されました。 (os error 10054)
Read            | 
Write           | poppo
Os { code: 10054, kind: ConnectionReset, message: "既存の接続はリモート ホストに強制的に切断されました。" }

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 おかしいな、スリープ入れたはずなのに……、あー、そうか☆!」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 今のプログラムだと、通信の最後は 強制的に切断 しか無いんだぜ☆
それを エラー扱いしてるだけで☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 ややこしいから、正常な通信の切断方法はないの?」

fn shutdown(&mut self) -> Poll

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 シャットダウンしてみるかだぜ☆」

                    try_ready!(socket.shutdown());

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 これをしても変わらなかった☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 通信終了時は全部 エラー判定なんじゃないか☆?」

<次の記事へ:書きかけ>

ツイッターでシェア
みんなに共有、忘れないようにメモ

むずでょ

光速のアカウント凍結されちゃったんで……。ゲームプログラムを独習中なんだぜ☆電王戦IIに出た棋士もコンピューターもみんな好きだぜ☆▲(パソコン将棋)WCSC29一次予選36位、SDT5予選42位▲(パソコン囲碁)AI竜星戦予選16位

Crieitは誰でも投稿できるサービスです。 是非記事の投稿をお願いします。どんな軽い内容でも投稿できます。

また、「こんな記事が読みたいけど見つからない!」という方は是非記事投稿リクエストボードへ!

有料記事を販売できるようになりました!

こじんまりと作業ログやメモ、進捗を書き残しておきたい方はボード機能をご利用ください。
ボードとは?

コメント