<前回の記事>
「 ↑次は 非同期 のサンプル・プログラムを読んでいこう☆」
「 必ず準備ができている 非同期のコードは、 同期とおんなじなのね」
「 非同期 というのは、 相手が待ち構えていない、 こちらも相手が準備できてなければ他のことをやる ことを指す言葉だからな☆」
「 まず、TCP通信で ローカルIPアドレスに ポートに接続する非同期のコードを載せよう☆」
Cargo.toml
[dependencies]
tokio = "0.1"
futures = "0.1.26"
「 Cargo.toml の書き方は crates.io で自分で調べような☆」
gett-asyn/main.rs
//!
//! cargo new gett-asyn
//! cd C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\tokio\gett-asyn
//! cargo check
//! cargo build
//! cargo run
//!
//! [Getting asynchronous](https://tokio.rs/docs/futures/getting_asynchronous/)
//!
extern crate tokio;
extern crate futures;
use tokio::net::{TcpStream, tcp::ConnectFuture};
use futures::{Future, Async, Poll};
struct GetPeerAddr {
connect: ConnectFuture,
}
impl Future for GetPeerAddr {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.connect.poll() {
Ok(Async::Ready(socket)) => {
println!("peer address = {}", socket.peer_addr().unwrap());
Ok(Async::Ready(()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
println!("failed to connect: {}", e);
Ok(Async::Ready(()))
}
}
}
}
fn main() {
let addr = "192.168.0.1:1234".parse().unwrap();
let connect_future = TcpStream::connect(&addr);
let get_peer_addr = GetPeerAddr {
connect: connect_future,
};
tokio::run(get_peer_addr);
}
「 ソースコードの全文はこんな感じだが、実行してみると……☆」
Terminal:
Compiling gett-asyn v0.1.0 (C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\tokio\gett-asyn)
Finished dev [unoptimized + debuginfo] target(s) in 27.94s
Running `target\debug\gett-asyn.exe`
failed to connect: 接続済みの呼び出し先が一定の時間を過ぎても正しく応答しなかったため、接続できませんでした。または接
続済みのホストが応答しなかったため、確立された接続は失敗しました。 (os error 10060)
「 エコー・サーバーに 何も送信しなければ だんまりサーバー というわけかだぜ☆」
echo-serv/main.rs
//!
//! Echo server - poppo.
//!
//! A simple server that writes "poppo\n" when reads any.
//!
//! cd C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\another-terminal\echo-serv
//! cargo check
//! cargo run
//!
//! See: [Rustにっき/8日目・TCPサーバ](https://cha-shu00.hatenablog.com/entry/2019/03/02/174532)
//!
use std::net::{TcpListener, TcpStream, ToSocketAddrs};
use std::io::{BufRead, BufReader, BufWriter, Error, Write};
use std::thread;
fn main() {
let host = "localhost";
let port = 9696;
let url = format!("{}:{}", host, port);
let mut addrs = url.to_socket_addrs().unwrap();
// Change to ip v4.
if let Some(addr) = addrs.find(|x| (*x).is_ipv4()) {
// Server standup | 127.0.0.1:9696
println!("Server standup | {}", addr);
// Wait for connection.
let listener = TcpListener::bind(addr).expect("Error. failed to bind.");
for streams in listener.incoming() {
match streams {
Err(e) => { eprintln!("error: {}", e)},
Ok(stream) => {
println!("Create the thread.");
// Create the thread.
thread::spawn(move || {
handle_client(stream).unwrap_or_else(|error| eprintln!("{:?}", error));
});
}
}
}
} else {
eprintln!("Invalid Host:Port Number");
}
}
fn handle_client(stream: TcpStream) -> Result<(), Error> {
println!("Connection from {}", stream.peer_addr()?);
// Buffering.
let mut reader = BufReader::new(&stream);
let mut writer = BufWriter::new(&stream);
let mut line = String::new();
loop {
println!("Info | Waiting...");
if let Err(err) = reader.read_line(&mut line) {
panic!("error during receive a line: {}", err);
}
println!("Read | {}", line);
if line == "quit" {
return Ok(());
}
// 改行で送信完了。
let msg = "poppo";
println!("Write | {}", msg);
writer.write(format!("{}\n", msg).as_bytes())?;
writer.flush()?;
}
}
Terminal:
PS C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\another-terminal\echo-serv> cargo run
Compiling ep1 v0.1.0 (C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\another-terminal\echo-serv)
Finished dev [unoptimized + debuginfo] target(s) in 0.78s
Running `target\debug\ep1.exe`
Server standup | 127.0.0.1:9696
Create the thread.
Connection from 127.0.0.1:57874
Info | Waiting...
thread '<unnamed>' panicked at 'error during receive a line: 既存の接続はリモート ホストに強制的に切断されました。 (os error 10054)', src\main.rs:59:13
stack backtrace:
「 gett-asyn の方が さっさとコネクション確立して終了するから、エコー・サーバーの方から見れば ガチャ切りだぜ☆」
pub fn connect(addr: A) -> Result
「 接続が確立するかどうかは ↑ TcpStream::connect
メソッドがやってくれてるんじゃないか☆?」
let connect_future = TcpStream::connect(&addr);
「 この代入式の左辺は、 connect_future
じゃなくて、 Result型なのでは☆?」
pub fn connect(addr: &SocketAddr) -> ConnectFuture
「 tokio の TcpStream と、 std の TcpStream で どんな違いがあるか分からずに 使っていていいの?」
「 非同期のテストにはなっていないけど、接続は確立したようね」
「 大問題だぜ☆ 非同期のテストになっていない☆
非同期のループに入る前に connect メソッドでタイムアウトするか、常にすぐ成功するかのどちらかだぜ☆」
Err(e) => {
println!("failed to connect\\(^q^)/: {}", e);
Ok(Async::Ready(()))
}
failed to connect\(^q^)/: 接続済みの呼び出し先が一定の時間を過ぎても正しく応答しなかったため、接続できませんでし
た。または接続済みのホストが応答しなかったため、確立された接続は失敗しました。 (os error 10060)
「 このメッセージは 非同期処理のエラー部が出力しているのでは?」
use tokio::net::{TcpStream, tcp::ConnectFuture};
「 tokio の TcpStream を使うということに注意だな☆」
struct GetPeerAddr {
connect: ConnectFuture,
}
「 Rust は クラス型言語じゃないんで、
オブジェクト指向プログラムから来た人は クラスを作る気分で 構造体を作れだぜ☆
プロパティは1つで ConnectFuture という型名を見ると、フューチャー型を実装してそうだよな☆」
impl Future for GetPeerAddr {
type Item = ☆;
type Error = ☆;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
☆
}
}
「 GetPeerAddr構造体に Futureトレイト を実装するぜ☆ この形はお決まりだぜ☆」
impl Future for GetPeerAddr {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
☆
}
}
「 ItemとErrorを見ると、成功しても、失敗しても、何も値を返さないことが分かるな☆
次は pollメソッドの中を見ていこう☆」
match self.connect.poll() {
Ok(Async::Ready(socket)) => {
println!("peer address = {}", socket.peer_addr().unwrap());
Ok(Async::Ready(()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
println!("failed to connect\\(^q^)/: {}", e);
Ok(Async::Ready(()))
}
}
match self.connect.poll() {
「 self
って何なの?
self.connect
はどこにあるの?
self.connect
は .poll
メソッドを持っているから Future なの?」
impl Future for GetPeerAddr {
^^^^^^^^^^^
「 self は GetPeerAddr構造体のインスタンスだな☆ つまり……☆」
let get_peer_addr = GetPeerAddr { connect: connect_future, };
^^^^^^^^^^^^^
「 あとで こういうコードが出てくるが、 GetPeerAddr構造体のインスタンスは get_peer_addr に1つ入っている☆ self はそいつだぜ☆」
「 self.connect は、インスタンス生成時に指定しているが……☆、」
let get_peer_addr = GetPeerAddr { connect: connect_future, };
^^^^^^^^^^^^^^^^^^^^^^^
「 connect_future の中に入っているやつが、 self.connect だぜ☆
Rust言語の場合、C言語と違って 値渡しとか参照渡しではなく、所有権の移動になる☆ 同時に唯一存在する参照渡し みたいなやつだぜ☆」
「 connect_future は Futureトレイトを実装しているのだろう☆ .poll
メソッドは持っている☆」
Ok(Async::Ready(socket)) => {
☆
}
Ok(Async::NotReady) => ☆,
Err(e) => {
☆
}
「 準備できているItem、準備できていない、エラーError の三択は定番だな☆」
「 .poll
メソッドの返却値がそういう形をしているわけだな☆ pollメソッドの返却値は Polll型という糖衣構文のような形をしている☆」
「 この形は 覚えてしまうのがいいのね。2か月後に仕様変更されるかもしれないけど」
Ok(Async::Ready(☆)) のとき:
println!("peer address = {}", socket.peer_addr().unwrap());
Ok(Async::Ready(()))
Ok(Async::NotReady) のとき:
Ok(Async::NotReady)
Err(e) のとき:
println!("failed to connect\\(^q^)/: {}", e);
Ok(Async::Ready(()))
「 準備できていないとき Ok(Async::NotReady)
を返せば 多分 バブル・アップ するのだろう☆」
「 準備できているとき、 Ok(Async::Ready(()))
を返せば 空の値を持ったOk を バブル・アップ するのも想像できる☆」
「 しかし、エラーのとき Ok(Async::Ready(()))
を返すのは なぜだぜ☆!」
「 サンプル・プログラムなんで さっさとOk扱いにして次に行けばいいや、ぐらいの気持ちだな☆ メッセージももう出したし☆」
fn main() {
let addr = "192.168.0.1:1234".parse().unwrap();
// let addr = "127.0.0.1:9696".parse().unwrap();
println!("Connect | {}", addr);
let connect_future = TcpStream::connect(&addr);
let get_peer_addr = GetPeerAddr {
connect: connect_future,
};
tokio::run(get_peer_addr);
}
let addr = "192.168.0.1:1234".parse().unwrap();
「 こういうの Rust言語の型推論が利いている☆
addr変数は TcpStream::connect(☆)
に渡されるから、std::net::SocketAddr
型だと推論して、 .parse()
メソッドは 文字列を
SocketAddr 型に変換してくれるぜ☆」
「 結末を予想しながら読む☆ それが 型推論プログラミングの予定調和だぜ☆
結末から遡って 今目の前にあるコードを解き明かせだぜ☆」
「 じゃあ ファイルの後ろの方が違ったら、上の方のコードの意味は全部変わるの?」
let connect_future = TcpStream::connect(&addr);
「 Future は だいぶ前の記事で説明した☆
非同期のコードは パイプラインの設計図を作っているだけで まだ水を流してはいないぜ☆」
let get_peer_addr = GetPeerAddr {
connect: connect_future,
};
「 これも Future をプロパティに持っている Future だよな☆」
「 逆向きのパイプラインになっているはずだぜ☆
井戸に例えれば 非同期処理のプログラマーは パイプラインの底から設計図を 組み立てていっている☆」
tokio::run(get_peer_addr);
「 最後は お決まりの run
だぜ☆ パイプラインに水が流れ出すぜ☆」
「 同期処理は インタープリター型 だったのに、
非同期処理は なにやら コンパイル型 みたいな動きをするわねぇ」
「 まだ記事が短いが、サンプル・プログラム1つに 1記事 で区切っていこう☆」
<次の記事へ>
Crieitは誰でも投稿できるサービスです。 是非記事の投稿をお願いします。どんな軽い内容でも投稿できます。
また、「こんな記事が読みたいけど見つからない!」という方は是非記事投稿リクエストボードへ!
こじんまりと作業ログやメモ、進捗を書き残しておきたい方はボード機能をご利用ください。
ボードとは?
コメント