2019-09-29に更新

Rustの並列処理を学ぼうぜ☆(^~^)?<その4>

読了目安:13分

前回の記事

Getting asynchronous

Getting asynchronous

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 ↑次は 非同期 のサンプル・プログラムを読んでいこう☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 今までは 必ず準備ができているコード だったからな☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 必ず準備ができている 非同期のコードは、 同期とおんなじなのね」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 非同期 というのは、 相手が待ち構えていないこちらも相手が準備できてなければ他のことをやる ことを指す言葉だからな☆」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 まず、TCP通信で ローカルIPアドレスに ポートに接続する非同期のコードを載せよう☆」

Cargo.toml

[dependencies]
tokio = "0.1"
futures = "0.1.26"

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 Cargo.toml の書き方は crates.io で自分で調べような☆」

gett-asyn/main.rs

//!
//! cargo new gett-asyn
//! cd C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\tokio\gett-asyn
//! cargo check
//! cargo build
//! cargo run
//! 
//! [Getting asynchronous](https://tokio.rs/docs/futures/getting_asynchronous/)
//!

extern crate tokio;
extern crate futures;

use tokio::net::{TcpStream, tcp::ConnectFuture};
use futures::{Future, Async, Poll};

struct GetPeerAddr {
    connect: ConnectFuture,
}

impl Future for GetPeerAddr {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        match self.connect.poll() {
            Ok(Async::Ready(socket)) => {
                println!("peer address = {}", socket.peer_addr().unwrap());
                Ok(Async::Ready(()))
            }
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(e) => {
                println!("failed to connect: {}", e);
                Ok(Async::Ready(()))
            }
        }
    }
}

fn main() {
    let addr = "192.168.0.1:1234".parse().unwrap();
    let connect_future = TcpStream::connect(&addr);
    let get_peer_addr = GetPeerAddr {
        connect: connect_future,
    };

    tokio::run(get_peer_addr);
}

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 ソースコードの全文はこんな感じだが、実行してみると……☆」

Terminal:

   Compiling gett-asyn v0.1.0 (C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\tokio\gett-asyn)
    Finished dev [unoptimized + debuginfo] target(s) in 27.94s
     Running `target\debug\gett-asyn.exe`
failed to connect: 接続済みの呼び出し先が一定の時間を過ぎても正しく応答しなかったため、接続できませんでした。または接
続済みのホストが応答しなかったため、確立された接続は失敗しました。 (os error 10060)

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 存在しないポートだから そもそも落ちる……☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 この前作ったエコー・サーバーを立てましょう!」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 エコー・サーバーに 何も送信しなければ だんまりサーバー というわけかだぜ☆」

echo server

echo-serv/main.rs

//!
//! Echo server - poppo.
//!
//! A simple server that writes "poppo\n" when reads any.
//! 
//! cd C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\another-terminal\echo-serv
//! cargo check
//! cargo run
//! 
//! See: [Rustにっき/8日目・TCPサーバ](https://cha-shu00.hatenablog.com/entry/2019/03/02/174532)
//! 
use std::net::{TcpListener, TcpStream, ToSocketAddrs};
use std::io::{BufRead, BufReader, BufWriter, Error, Write};
use std::thread;

fn main() {
    let host = "localhost";
    let port = 9696;
    let url = format!("{}:{}", host, port);
    let mut addrs = url.to_socket_addrs().unwrap();

    // Change to ip v4.
    if let Some(addr) = addrs.find(|x| (*x).is_ipv4()) {
        // Server standup  | 127.0.0.1:9696
        println!("Server standup  | {}", addr);

        // Wait for connection.
        let listener = TcpListener::bind(addr).expect("Error. failed to bind.");

        for streams in listener.incoming() {
            match streams {
                Err(e) => { eprintln!("error: {}", e)},
                Ok(stream) => {
                    println!("Create the thread.");
                    // Create the thread.
                    thread::spawn(move || {
                        handle_client(stream).unwrap_or_else(|error| eprintln!("{:?}", error));
                    });
                }
            }
        }
    } else {
        eprintln!("Invalid Host:Port Number");
    }
}

fn handle_client(stream: TcpStream) -> Result<(), Error> {
    println!("Connection from {}", stream.peer_addr()?);

    // Buffering.
    let mut reader = BufReader::new(&stream);
    let mut writer = BufWriter::new(&stream);

    let mut line = String::new();

    loop {
        println!("Info            | Waiting...");
        if let Err(err) = reader.read_line(&mut line) {
            panic!("error during receive a line: {}", err);
        }
        println!("Read            | {}", line);

        if line == "quit" {
            return Ok(());
        }

        // 改行で送信完了。
        let msg = "poppo";
        println!("Write           | {}", msg);
        writer.write(format!("{}\n", msg).as_bytes())?;
        writer.flush()?;            
    }
}

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 エコー・サーバー走らせたが☆、」

Terminal:

PS C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\another-terminal\echo-serv> cargo run
   Compiling ep1 v0.1.0 (C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\another-terminal\echo-serv)
    Finished dev [unoptimized + debuginfo] target(s) in 0.78s
     Running `target\debug\ep1.exe`
Server standup  | 127.0.0.1:9696
Create the thread.
Connection from 127.0.0.1:57874
Info            | Waiting...
thread '<unnamed>' panicked at 'error during receive a line: 既存の接続はリモート ホストに強制的に切断されました。 (os error 10054)', src\main.rs:59:13     
stack backtrace:

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 gett-asyn の方が さっさとコネクション確立して終了するから、エコー・サーバーの方から見れば ガチャ切りだぜ☆」

pub fn connect(addr: A) -> Result

KIFUWARABE_80x100x8_01_Futu.gif
「 接続が確立するかどうかは ↑ TcpStream::connect メソッドがやってくれてるんじゃないか☆?」

    let connect_future = TcpStream::connect(&addr);

KIFUWARABE_80x100x8_01_Futu.gif
「 この代入式の左辺は、 connect_future じゃなくて、 Result型なのでは☆?」

pub fn connect(addr: &SocketAddr) -> ConnectFuture

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 いや、tokio の TcpStream なんだぜ☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 なんでぇ☆!」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 tokio の TcpStream と、 std の TcpStream で どんな違いがあるか分からずに 使っていていいの?」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 首が突っ込めるまで 待っていては数年 待たされる☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 非同期のテストにはなっていないけど、接続は確立したようね」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 大問題だぜ☆ 非同期のテストになっていない☆
非同期のループに入る前に connect メソッドでタイムアウトするか、常にすぐ成功するかのどちらかだぜ☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 いや、よく見ると☆」

            Err(e) => {
                println!("failed to connect\\(^q^)/: {}", e);
                Ok(Async::Ready(()))
            }
failed to connect\(^q^)/: 接続済みの呼び出し先が一定の時間を過ぎても正しく応答しなかったため、接続できませんでし
た。または接続済みのホストが応答しなかったため、確立された接続は失敗しました。 (os error 10060)

KIFUWARABE_80x100x8_01_Futu.gif
「 このメッセージは 非同期処理のエラー部が出力しているのでは?」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 じゃあ 動いてたのかだぜ☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 タイムアウトする時間とか どこで設定したの?」

KIFUWARABE_80x100x8_01_Futu.gif
「 それは分からないが……☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 ソースを解説していきましょう!」

use tokio::net::{TcpStream, tcp::ConnectFuture};

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 tokio の TcpStream を使うということに注意だな☆」

struct GetPeerAddr {
    connect: ConnectFuture,
}

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 Rust は クラス型言語じゃないんで、
オブジェクト指向プログラムから来た人は クラスを作る気分で 構造体を作れだぜ☆
プロパティは1つで ConnectFuture という型名を見ると、フューチャー型を実装してそうだよな☆」

impl Future for GetPeerAddr {
    type Item = ☆;
    type Error = ☆;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        ☆
    }
}

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 GetPeerAddr構造体に Futureトレイト を実装するぜ☆ この形はお決まりだぜ☆」

impl Future for GetPeerAddr {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        ☆
    }
}

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 ItemとErrorを見ると、成功しても、失敗しても、何も値を返さないことが分かるな☆
次は pollメソッドの中を見ていこう☆」

        match self.connect.poll() {
            Ok(Async::Ready(socket)) => {
                println!("peer address = {}", socket.peer_addr().unwrap());
                Ok(Async::Ready(()))
            }
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(e) => {
                println!("failed to connect\\(^q^)/: {}", e);
                Ok(Async::Ready(()))
            }
        }

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 match構文だけがあるぜ☆」

        match self.connect.poll() {

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 self って何なの?
self.connect はどこにあるの?
self.connect.poll メソッドを持っているから Future なの?」

impl Future for GetPeerAddr {
                ^^^^^^^^^^^

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 self は GetPeerAddr構造体のインスタンスだな☆ つまり……☆」

    let get_peer_addr = GetPeerAddr { connect: connect_future, };
        ^^^^^^^^^^^^^

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 あとで こういうコードが出てくるが、 GetPeerAddr構造体のインスタンスは get_peer_addr に1つ入っている☆ self はそいつだぜ☆」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 self.connect は、インスタンス生成時に指定しているが……☆、」

    let get_peer_addr = GetPeerAddr { connect: connect_future, };
                                      ^^^^^^^^^^^^^^^^^^^^^^^

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 connect_future の中に入っているやつが、 self.connect だぜ☆
Rust言語の場合、C言語と違って 値渡しとか参照渡しではなく、所有権の移動になる☆ 同時に唯一存在する参照渡し みたいなやつだぜ☆」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 connect_future は Futureトレイトを実装しているのだろう☆ .poll メソッドは持っている☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 フーン」

            Ok(Async::Ready(socket)) => {
                ☆
            }
            Ok(Async::NotReady) => ☆,
            Err(e) => {
                ☆
            }

KIFUWARABE_80x100x8_01_Futu.gif
「 準備できているItem、準備できていない、エラーError の三択は定番だな☆」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 .poll メソッドの返却値がそういう形をしているわけだな☆ pollメソッドの返却値は Polll型という糖衣構文のような形をしている☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 この形は 覚えてしまうのがいいのね。2か月後に仕様変更されるかもしれないけど」

Ok(Async::Ready(☆)) のとき:

                println!("peer address = {}", socket.peer_addr().unwrap());
                Ok(Async::Ready(()))

Ok(Async::NotReady) のとき:

Ok(Async::NotReady)

Err(e) のとき:

                println!("failed to connect\\(^q^)/: {}", e);
                Ok(Async::Ready(()))

KIFUWARABE_80x100x8_01_Futu.gif
「 準備できていないとき Ok(Async::NotReady) を返せば 多分 バブル・アップ するのだろう☆」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 むずかしい言葉を知ってるな☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 準備できているとき、 Ok(Async::Ready(())) を返せば 空の値を持ったOk を バブル・アップ するのも想像できる☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 しかし、エラーのとき Ok(Async::Ready(())) を返すのは なぜだぜ☆!」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 サンプル・プログラムなんで さっさとOk扱いにして次に行けばいいや、ぐらいの気持ちだな☆ メッセージももう出したし☆」

fn main() {
    let addr = "192.168.0.1:1234".parse().unwrap();
    // let addr = "127.0.0.1:9696".parse().unwrap();
    println!("Connect         | {}", addr);
    let connect_future = TcpStream::connect(&addr);
    let get_peer_addr = GetPeerAddr {
        connect: connect_future,
    };

    tokio::run(get_peer_addr);
}

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 メイン・プログラムは ↑このようになっている☆」

    let addr = "192.168.0.1:1234".parse().unwrap();

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 こういうの Rust言語の型推論が利いている☆
addr変数は TcpStream::connect(☆) に渡されるから、std::net::SocketAddr 型だと推論して、 .parse() メソッドは 文字列を
SocketAddr 型に変換してくれるぜ☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 上から下に 読みづらくない?」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 結末を予想しながら読む☆ それが 型推論プログラミングの予定調和だぜ☆
結末から遡って 今目の前にあるコードを解き明かせだぜ☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 ライプニッツは哲学者だったんだな☆ 知らなかったぜ☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 じゃあ ファイルの後ろの方が違ったら、上の方のコードの意味は全部変わるの?」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 変わる☆ それが型推論だぜ☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 下から読んだ方がいいのかしら……、めんどくさ……」

    let connect_future = TcpStream::connect(&addr);

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 Future は だいぶ前の記事で説明した☆
非同期のコードは パイプラインの設計図を作っているだけで まだ水を流してはいないぜ☆」

    let get_peer_addr = GetPeerAddr {
        connect: connect_future,
    };

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 これも Future をプロパティに持っている Future だよな☆」

KIFUWARABE_80x100x8_01_Futu.gif
「 Future は入れ子にして使うのかだぜ☆」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 逆向きのパイプラインになっているはずだぜ☆
井戸に例えれば 非同期処理のプログラマーは パイプラインの底から設計図を 組み立てていっている☆」

    tokio::run(get_peer_addr);

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 最後は お決まりの run だぜ☆ パイプラインに水が流れ出すぜ☆」

OKAZAKI_Yumemi_80x80x8_02_Syaberu.gif
「 同期処理は インタープリター型 だったのに、
非同期処理は なにやら コンパイル型 みたいな動きをするわねぇ」

KIFUWARABE_80x100x8_01_Futu.gif
「 インタープリターも コンパイルも知らないんで☆」

KITASHIRAKAWA_Chiyuri_80x100x8_01_Futu.gif
「 まだ記事が短いが、サンプル・プログラム1つに 1記事 で区切っていこう☆」

次の記事へ

ツイッターでシェア
みんなに共有、忘れないようにメモ

むずでょ

光速のアカウント凍結されちゃったんで……。ゲームプログラムを独習中なんだぜ☆電王戦IIに出た棋士もコンピューターもみんな好きだぜ☆▲(パソコン将棋)WCSC29一次予選36位、SDT5予選42位▲(パソコン囲碁)AI竜星戦予選16位

Crieitは誰でも投稿できるサービスです。 是非記事の投稿をお願いします。どんな軽い内容でも投稿できます。

また、「こんな記事が読みたいけど見つからない!」という方は是非記事投稿リクエストボードへ!

有料記事を販売できるようになりました!

こじんまりと作業ログやメモ、進捗を書き残しておきたい方はボード機能をご利用ください。
ボードとは?

コメント