<前回の記事>
「 通信は 常に2者間 で行われる☆
話しかける方、聞く方だぜ☆ このペアが通話できる状態になったときのことを、接続が確立する、と言う☆」
「 話しかける方が 話し終わるのを待って、
反対側のやつが 話始めるのが 同期通信(Sync) だぜ☆ 電話で会話 しているのに似ている☆」
「 間に、溜めて置くところ、Pool(プール)を置いておくことで、
反対側のやつは 自分の都合のいいときに取り出すことができるのが 非同期通信(Async) だぜ☆
E-メール に似ている☆」
「 上図の緑色のところが 非同期 だぜ☆
水道管がつながって 水を受け渡ししているのに似ているので パイプライン とも呼ばれる☆」
「 相手の レスポンス(Response; 応答)が まだ来なさそうな間 に 別のことをしていられるところを指して
非同期 と言っている☆」
「 同期通信は 相手の応答を じっと監視しているので すぐ対応できるが、
ある用途に使いたいとき、まるで役に立たない場合があるぜ☆」
「 前のやつが話し終わるまで何もできないし、他のこともできない☆」
「 このような同期通信の形を Request - Response (リクエスト レスポンス) と呼ぶ☆
日本では 会話のキャッチボール とか、アジアでは ボールはこちらの手にある とか言う☆」
「 リクエスト といのは コンピューターから見れば 口を開けてしゃべるのが ゆっくりな人間の話しを
録音して送信するところで……☆ ネットワーク上をデータが飛んでいる時間は この絵には表されていない☆」
「 レスポンス というのは 届いているものだが、
いつ届くのか 分からないので 同期通信では ずっと張り付いて 待ち構えている☆
レスポンスには また別の呼び方もある☆ ブロッキングだぜ☆」
「 待っている間(Blocking) 相手してくれてもよさそうだが、
相手してくれないし、本人も何もできない☆ 待つのが仕事だからだぜ☆」
「 いらいらいら☆! 仕事してないじゃないか☆!
マンガでも読んでたらダメなのかだぜ☆!」
「 同期通信のリクエスト レスポンスに比べ、非同期通信の場合……☆」
「 非同期通信の場合、リクエストではなく、プロミス と呼ぶ☆」
「 プロミス というのは 実行時間を ほぼ無視できるからだぜ☆」
「 例えば マルチ・コア・プログラミングであれば、他のコアに実行を 丸投げするからだぜ☆」
「 フューチャー (Future; 先物)は ここらへん☆」
「 フューチャーは、その何かやるべき処理が まだ何もやられていなくても 先に受け取ることができるぜ☆」
「 なんで そんなことができるの? 受け取った それは 何なの?」
# Immediately.
var future <= createAnimal();
# Please wait.
var name <= future.Name;
「 フューチャーは 先にもらっといて、 フューチャーを読み取りしたときに 待ち が発生するんだぜ☆」
「 そんなん、フィーチャーを読み取っているところで ただの同期処理じゃないの?」
「 計算を先に 別のコアでスタートさせておくことができる☆
フィーチャーからその計算の答えを読み取る頃には うまくいけば ノン・ブロッキングだろうという目論見だぜ☆
非同期処理で フィボナッチ数列の最大値を数えていたら 永遠にブロッキングだが……☆」
「 プロミスは丸投げ、フィーチャーは時間稼ぎか……☆ 最悪だな☆」
「 プログラマーは 立体的な複雑なツリーを 平面上の複数本のシーケンス に落とし込むこと が仕事だが☆」
「 非同期処理のプログラマーは 葉から根に向かって集約する逆向きのツリー構造をしたパイプライン を書くことが仕事になる☆」
「 非同期な逆さ向きのツリーを、どうやって プログラムで書いて並べるの?」
「 コールバック関数を使う☆ その関数はクロージャにする☆」
「 説明するのがめんどくさいんで 雑に進めると、
関数というのは 引数(ひきすう; パラメーター)が入ってきて、返り値が出ていくものだった☆」
「 関数の中で また 何か関数が呼び出されているのは ざらに見かけるだろう☆」
「 このとき、関数を実行した返り値ではなく、関数は実行せず、そのまま関数を渡すことができる☆
このような あとで使えよ、と渡す関数のことを コールバック関数 と呼ぶ☆」
「 コールバック関数を入れると、関数は こんな絵図になるとしよう☆
しかし これだけでは まだ不便で☆」
「 片方は 先手番用のコールバック関数、 もう片方は 後手番用のコールバック関数にしたいことがあるだろ☆」
「 配線が めちゃくちゃになる☆
関数の中に 初期値を埋め込む感じにしてから、コールバック関数として 関数に渡したいんだぜ☆」
「 ローカル変数を、関数に渡している見た目をしているだろ☆
袋に初期値を閉じ込んでるだろ☆ これが クロージャーだぜ☆」
「 コールバック関数に クロージャーを使うと、
引数を介さずに 中身に直接 初期値を注ぎ込んだみたいな感じになるわねぇ」
「 そして、電話がかかってきたときに この関数を呼び出すとか、
相手が電話を切った時に この関数を呼び出すとか、
エラーが起こったらこの関数を呼び出すとか、決めておくわけだぜ☆」
「 こういう使い方をすると イベント駆動 と呼ばれる☆
何かが起こったらそのときに関数が実行されるんだぜ☆ 非同期 に おあつらえ向きだろ☆」
「 コールバック関数の中には クロージャーを使って グローバル変数への参照でも埋め込んでおくの?」
「 コールバック関数の中で 共通の変数を持ち合いたいと 思わない?」
「 Rust では 所有権を move する☆ 参照と似たようなもんだぜ☆
たしかに3回も move するのは おかしいよな☆ エラーになるのかだぜ☆?」
「 Rust 言語で 所有権が2つや 3つに分かれることはないだろう☆」
Future パターン
Futures and promises
「 Rust言語で Future が実装されるの 2019年11月7日の Rust 1.39 以降なんで、おさらいしてようぜ☆?」
「 Promiseって何だぜ☆? Futureって何だぜ☆?」
「 Promise は容れ物で、Future は出てくるものだぜ☆ 形は関数だが、使われ方が ふつうの関数と違うということだな☆」
「 関数を ふつうに使う以外の 異常な使い方って どんなのよ?」
「 ふつうの関数は 同期 していて、 ふつうじゃない関数は 非同期 しているというわけだぜ☆
説明しよう☆
数学の非可換の群を使えば一発で説明できるだろう☆」
「 しかし絵を描くのが嫌になったので さっきの画像を使い回すぜ☆」
「 Fが子どもを生んで、Hが緑玉を作って、子どもが青玉を作ったの?」
「 そう☆ FとHは 順を入れ替えることができるが、Gは最後でなければならない☆
それが 同期処理 だぜ☆」
「 それに比べて 非同期処理 では、 F、H、G は 同時に並べる書き方が可能☆ 絵にすると……☆」
「 いわゆるふつうのプログラム、 同期のプログラム は実行順に並べられていた☆
非同期のプログラムは 実行順で並べられているわけではない ということだな☆
別の考え方に基づいた頭の使い方に変える必要がある☆」
「 ↑非同期プログラミングのプログラマーは、 実行順序を 気にすることができないんだぜ☆
実行順序は コンピューターに お任せ、譲ることになる……☆」
「 関数は Function だが、 ノン・ブロッキングしたいとき Promise と呼び方が変わると考えよう☆
Promise を中で使っている親の関数は 自分も Promise になる☆
すると どんどん親を遡って みんな Promise だらけになるわけだが……まとめて 同列になる☆」
「 ↑非同期プログラミングのプログラマーにできることは、
プロミス型の非同期処理では await を置くことだけだぜ☆
フィーチャー型の非同期処理では、フィーチャーから計算結果を読み取るタイミングだぜ☆」
「 await のタイミングで コンピューターが実行できる順で勝手にプログラムが走り、レースが始まる☆
そして最終結果を受け取ることができる☆」
「 その話だけで1記事書けてしまう☆
そろそろ Rust に戻ろうぜ☆」
// コマンド プロンプトからの入力があるまで待機します。
io::stdin()
.read_line(&mut line)
.expect("info Failed to read_line"); // OKでなかった場合のエラーメッセージ。
「 こう書けば 標準入力から テキストを受け取れるが、
子スレッドからメッセージが飛んできても 割り込めないよな☆」
「 じゃあ、メッセージが飛び交っている4つ目の子スレッドでも作ればいいのかだぜ☆?」
「 別ターミナルが必要じゃない。通信書かないとできないわよ」
mainBall = child1Exit.recv().unwrap();
println!("Main | Expected: 11, Got: {}.", mainBall);
「 こんな感じで 特定のスレッドからメッセージを受け取れるが、ここで待機してしまうだろ☆
メッセージが無ければ スルーできないのかだぜ☆?」
Rust in Detail: Writing Scalable Chat Service from Scratch
「 手始めにチャット・プログラムを作った方がいいのかだぜ☆?」
「 チャネルにタイムアウトの機能が無ければ 永遠に待ち続けるのでは☆?」
Are we async yet?
2018 年の非同期 Rust の動向調査
tokio
romio
「 通信の安全性をウリにしている Rust が、非同期通信の整備がまだできてないだなんて 考えられない……☆」
「 romio や、 tokio を調べたらいいんじゃないの?」
「 うーん☆ 理解することを目的としてやるなら tokio だな☆」
「 非同期通信やるのに rust 選ぶ方も頭がどうかしているのかもしれないが、
ドキュメントを読んでいこうぜ☆」
Cargo.toml
[dependencies]
tokio = "0.1"
「 Cargo.toml の働きや 書き方は 勝手に調べてこいだぜ☆」
src/main.rs
extern crate tokio;
「 extern crate
は src/main.rs の冒頭に書けだぜ☆」
examples/tokio-1.rs
use tokio::io;
use tokio::net::TcpStream;
use tokio::prelude::*;
examples/tokio-1.rs
/**
* cargo new ep1
* cd C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\tokio\ep1
* cargo check --example tokio-1
* cargo build --example tokio-1
* cargo run --example tokio-1
*/
use tokio::io;
use tokio::net::TcpStream;
use tokio::prelude::*;
fn main() {
// Parse the address of whatever server we're talking to
let addr = "127.0.0.1:6142".parse().unwrap();
let client = TcpStream::connect(&addr);
println!("Info | Finished.");
}
Command:
cargo run --example tokio-1
Output:
Compiling ep1 v0.1.0 (C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\tokio\ep1)
warning: unused import: `tokio::io`
--> examples\tokio-1.rs:8:5
|
8 | use tokio::io;
| ^^^^^^^^^
|
= note: #[warn(unused_imports)] on by default
warning: unused import: `tokio::prelude::*`
--> examples\tokio-1.rs:10:5
|
10 | use tokio::prelude::*;
| ^^^^^^^^^^^^^^^^^
warning: unused variable: `client`
--> examples\tokio-1.rs:16:9
|
16 | let client = TcpStream::connect(&addr);
| ^^^^^^ help: consider prefixing with an underscore: `_client`
|
= note: #[warn(unused_variables)] on by default
Finished dev [unoptimized + debuginfo] target(s) in 1.37s
Running `target\debug\examples\tokio-1.exe`
Info | Finished.
「 TCP通信の クライアントのストリームは すぐに確立できるな☆」
let client = TcpStream::connect(&addr).and_then(|stream| {
println!("created stream");
// Process stream here.
Ok(())
})
.map_err(|err| {
// All tasks must have an `Error` type of `()`. This forces error
// handling and helps avoid silencing failures.
//
// In our example, we are only going to log the error to STDOUT.
println!("connection error = {:?}", err);
});
「 説明によると、コールバック関数を2つ渡す方法があるようだぜ☆」
「 じゃあ クライアントに何か通信したいときは コールバック関数の中に処理を書けばいいのかだぜ☆?」
let client = TcpStream::connect(&addr).and_then(|stream| {
println!("created stream");
io::write_all(stream, "hello world\n").then(|result| {
println!("wrote to stream; success={:?}", result.is_ok());
Ok(())
})
})
「 説明によると そんな感じだな☆ .and_then
関数の引数に渡しているコールバック関数の引数を見ると、 stream があるな☆」
「 その日本語が わからない! 関数の引数に渡している関数の引数 って何なのよ!」
tokio/tokio/examples/hello_world.rs
PS C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\tokio\ep1> cargo run --example tokio-2
Finished dev [unoptimized + debuginfo] target(s) in 0.27s
Running `target\debug\examples\tokio-2.exe`
About to create the stream and write to it...
connection error = Os { code: 10061, kind: ConnectionRefused, message: "対象のコンピューターによって拒否さ
れたため、接続できませんでした。" }
Stream has been created and written to.
Microsoft Windows [Version 10.0.18362.356]
(c) 2019 Microsoft Corporation. All rights reserved.
C:\WINDOWS\system32>ncat -l 6142
'ncat' は、内部コマンドまたは外部コマンド、
操作可能なプログラムまたはバッチ ファイルとして認識されていません。
C:\WINDOWS\system32>nc -l 6142
'nc' は、内部コマンドまたは外部コマンド、
操作可能なプログラムまたはバッチ ファイルとして認識されていません。
C:\WINDOWS\system32>
「 Windows 用の ncat コマンドを探しましょう!」
main.rs
//!
//! Poppo server.
//!
//! A simple server that writes "Poppo\n" when reads any.
//!
//! cd C:\Users\むずでょ\OneDrive\ドキュメント\practice-rust\server\ep1
//! cargo check
//! cargo run
//!
use std::net::{TcpListener, TcpStream, ToSocketAddrs};
use std::io::{BufRead, BufReader, BufWriter, Error, Read, Write};
use std::thread;
/**
* See: [Rustにっき/8日目・TCPサーバ](https://cha-shu00.hatenablog.com/entry/2019/03/02/174532)
*/
fn main() {
let host = "localhost";
let port = 9696;
let url = format!("{}:{}", host, port);
let mut addrs = url.to_socket_addrs().unwrap();
// Change to ip v4.
if let Some(addr) = addrs.find(|x| (*x).is_ipv4()) {
// Address | 127.0.0.1:9696
println!("Address | {}", addr);
// Wait for connection.
let listener = TcpListener::bind(addr).expect("Error. failed to bind.");
// Server standup | 127.0.0.1:9696
println!("Server standup | {}", addr);
for streams in listener.incoming() {
match streams {
Err(e) => { eprintln!("error: {}", e)},
Ok(stream) => {
println!("Create the thread.");
// Create the thread.
thread::spawn(move || {
handle_client(stream).unwrap_or_else(|error| eprintln!("{:?}", error));
});
}
}
}
} else {
eprintln!("Invalid Host:Port Number");
}
}
fn handle_client(mut stream: TcpStream) -> Result<(), Error> {
println!("Connection from {}", stream.peer_addr()?);
// Buffering.
let mut reader = BufReader::new(&stream);
let mut writer = BufWriter::new(&stream);
let mut line = String::new();
loop {
println!("Read | ...");
if let Err(err) = reader.read_line(&mut line) {
panic!("error during receive a line: {}", err);
}
println!("Read | {}", line);
if line == "quit" {
return Ok(());
}
// 改行までがメッセージ。
let msg = "poppo";
println!("Write | {}", msg);
writer.write(format!("{}\n", msg).as_bytes())?;
writer.flush()?;
}
}
「 これで ポート 9696 に接続して何か文字を送れば、poppo
と返ってくるはずだぜ☆」
「 'hello, world' を送り付けて、返事を待たずに切断したな☆」
io::write_all(stream, "hello world\n").then(|result| {
println!("wrote to stream; success={:?}", result.is_ok());
Ok(())
})
「 じゃあ 'hello, world' を送り付けている部分は ここかだぜ☆(^~^)」
「 11/7(金)の Rust 1.39 で async/await 構文が追加されるらしいぜ☆?」
「 そこで延期になれば 次は来年の1月ぐらいかだぜ☆ 大会には間に合わないな☆」
Example: A Chat Server
Crate tokio
「 チャット・サーバーの例もあるが むずかしすぎる☆
もう1記事必要だぜ☆」
<次の記事へ>
Crieitは誰でも投稿できるサービスです。 是非記事の投稿をお願いします。どんな軽い内容でも投稿できます。
また、「こんな記事が読みたいけど見つからない!」という方は是非記事投稿リクエストボードへ!
こじんまりと作業ログやメモ、進捗を書き残しておきたい方はボード機能をご利用ください。
ボードとは?
コメント