doraemon

doraemon

let's go write some rusty code and revolutionize the world!

Rust Crate: crossbeam_channel

原文档:crossbeam_channel - Rust (docs.rs)

Rust 標準ライブラリはチャネルstd::sync::mpscを提供しており、ここでmpscは multiple producer, single consumer の略であり、このチャネルが複数の送信者をサポートすることを示していますが、唯一の受信者のみをサポートします。一方、crossbeam_channel は mpmc(複数送信者、複数受信者)の Rust ライブラリであり、Go 言語のチャネルと同様の特性を持っています。

こんにちは、世界!#

use crossbeam_channel::unbounded;

// 無制限の容量を持つチャネルを作成します。
let (s, r) = unbounded();

// チャネルにメッセージを送信します。
s.send("こんにちは、世界!").unwrap();

// チャネルからメッセージを受信します。
assert_eq!(r.recv(), Ok("こんにちは、世界!"));

チャネルの種類#

チャネルを作成する方法は 2 つあります:

  • bounded:制限付きバッファチャネル

    pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>)
    
  • unbounded:無制限バッファチャネル

    pub fn unbounded<T>() -> (Sender<T>, Receiver<T>)
    

両方の関数は、メッセージの送信と受信に使用されるSenderReceiverを返します。

制限付きチャネル

use crossbeam_channel::bounded;

// 最大5つのメッセージを同時に保持できるチャネルを作成します。
let (s, r) = bounded(5);

// ブロックせずに5つのメッセージのみを送信できます。
for i in 0..5 {
    s.send(i).unwrap();
}

// バッファサイズを超えて送信しようとするとブロックされます。
// s.send(5).unwrap();

無制限チャネル

use crossbeam_channel::unbounded;

// 無制限のバッファチャネルを作成します。
let (s, r) = unbounded();

// メモリがいっぱいになるまで任意の数のメッセージを送信できます。
for i in 0..1000 {
    s.send(i).unwrap();
}

制限付きバッファチャネルのバッファ値が 0 に設定されると、それは ** 同期チャネル(無バッファチャネル)** に変わります。これは、受信者が値を受け取ることができるときにのみ送信が成功することを意味し、そうでない場合は送信待機の状態になります。

use std::thread;
use crossbeam_channel::bounded;

// 無バッファチャネルを作成します。
let (s, r) = bounded(0);

// 送信は受信されるまでブロックされます。
thread::spawn(move || s.send("こんにちは!").unwrap());

// 受信は誰かが送信するまでブロックされます。
assert_eq!(r.recv(), Ok("こんにちは!"));

マルチスレッド間でのチャネルの共有#

Senders と receivers は、clone を介して複数のスレッドで保持できます。

use std::thread;
use crossbeam_channel::bounded;

let (s1, r1) = bounded(0);
let (s2, r2) = (s1.clone(), r1.clone());

// スレッドを開始し、メッセージを受信してからメッセージを送信します。
thread::spawn(move || {
    r2.recv().unwrap();
    s2.send(2).unwrap();
});

// メッセージを送信し、その後メッセージを受信します。
s1.send(1).unwrap();
r1.recv().unwrap();

注意してください、clone によって生成された新しい Senders と receivers は新しいメッセージストリームを作成するのではなく、元のチャネルと同じメッセージストリームを共有します。チャネル内のメッセージは先入先出(FIFO)の原則に従います。

use crossbeam_channel::unbounded;

let (s1, r1) = unbounded();
let (s2, r2) = (s1.clone(), r1.clone());
let (s3, r3) = (s2.clone(), r2.clone());

s1.send(10).unwrap();
s2.send(20).unwrap();
s3.send(30).unwrap();

assert_eq!(r3.recv(), Ok(10));
assert_eq!(r1.recv(), Ok(20));
assert_eq!(r2.recv(), Ok(30));

Senders と receivers は、参照を介してマルチスレッド間で共有することもできます。

use crossbeam_channel::bounded;
use crossbeam_utils::thread::scope;

let (s, r) = bounded(0);

// thread::scopeにクロージャを渡し、その範囲内でspawn操作を行い、最後に統一してjoinします。
// すべてのスレッドがjoinに成功するとOkを返し、そうでない場合はErrを返します。
scope(|scope| {
    // スレッドを開始し、メッセージを受信してからメッセージを送信します。
    scope.spawn(|_| {
        r.recv().unwrap();
        s.send(2).unwrap();
    });

    // メッセージを送信し、その後メッセージを受信します。
    s.send(1).unwrap();
    r.recv().unwrap();
}).unwrap();

チャネルの閉鎖(切断)#

チャネルに関連付けられたすべての senders またはすべての receivers がdropされると、そのチャネルは閉じられ(切断され)、メッセージを送信できなくなりますが、残りのメッセージは受信できます。切断されたチャネルでの送信および受信操作は決してブロックされません。

use crossbeam_channel::{unbounded, RecvError};

let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();

// 唯一の送信者が破棄され、チャネルが閉じられます。
drop(s);

// 残りのメッセージはまだ受信できます。
assert_eq!(r.recv(), Ok(1));
assert_eq!(r.recv(), Ok(2));
assert_eq!(r.recv(), Ok(3));

// チャネル内にもうメッセージはありません。
assert!(r.is_empty());

// 注意、`r.recv()`を呼び出してもブロックされません。
// メッセージを受信し終わった後、`Err(RecvError)`が即座に返されます。
assert_eq!(r.recv(), Err(RecvError));

ブロッキングチャネル#

送信および受信操作には 3 つの状況があります:

  • 非ブロッキング(成功または失敗を即座に返す)
  • ブロッキング(操作が成功するまでまたはチャネルが切断されるまで待機する)
  • タイムアウト付きのブロッキング(一定の時間だけブロックする)

以下は、非ブロッキング操作とブロッキング操作の違いを示す簡単な例です:

use crossbeam_channel::{bounded, RecvError, TryRecvError};

let (s, r) = bounded(1);

// チャネルにメッセージを送信します。
s.send("foo").unwrap();

// この呼び出しはブロックされます。なぜならチャネルが満杯だからです。
// s.send("bar").unwrap();

// メッセージを受信します。
assert_eq!(r.recv(), Ok("foo"));

// この受信操作はブロックされます。なぜならチャネルが空だからです。
// r.recv();

// try_recv()メソッドはメッセージを受信しようとし、ブロックせず、チャネルが空であればエラーを返します。
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));

// チャネルを閉じます。
drop(s);

// この受信操作はブロックされません。なぜならチャネルが既に閉じているからです。
assert_eq!(r.recv(), Err(RecvError));

イテレータ#

メッセージを受信するためにイテレータを使用できます。たとえば、iterメソッドは、チャネルが空になり、閉じるまでメッセージを受信するイテレータを作成します。注意すべきは、チャネルが空の場合、イテレーションはチャネルが閉じるか次のメッセージが到着するまでブロックされることです。

use std::thread;
use crossbeam_channel::unbounded;

let (s, r) = unbounded();

thread::spawn(move || {
    s.send(1).unwrap();
    s.send(2).unwrap();
    s.send(3).unwrap();
    drop(s); // チャネルを閉じます。
});

// チャネルからすべてのメッセージを受信します。
// 注意、`collect`を呼び出すと、チャネルが閉じるまでブロックされます。
let v: Vec<_> = r.iter().collect();

assert_eq!(v, [1, 2, 3]);

try_iter を使用して非ブロッキングイテレータを作成し、チャネル内のすべての利用可能なメッセージを受信することもできます。チャネルが空になると、イテレーションは終了します:

use crossbeam_channel::unbounded;

let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// チャネルを閉じません

// チャネル内の現在のすべてのメッセージを受信します
let v: Vec<_> = r.try_iter().collect();

assert_eq!(v, [1, 2, 3]);

選択#

select!マクロを使用すると、一連のチャネル操作を定義し、その中のいずれかの操作が準備が整うのを待ってから実行できます。複数の操作が同時に準備が整った場合、ランダムに 1 つが選択されます。

また、準備が整っている操作がない場合に即座に実行されるデフォルトの操作を定義することもできます。

操作がブロックされない場合、それは準備が整っていると見なされます。たとえチャネルが閉じてエラーを返してもです。

以下は、2 つのチャネルからメッセージを受信する例です:

use std::thread;
use std::time::Duration;
use crossbeam_channel::{select, unbounded};

let (s1, r1) = unbounded();
let (s2, r2) = unbounded();

thread::spawn(move || s1.send(10).unwrap());
thread::spawn(move || s2.send(20).unwrap());

// 最大で1つの受信操作のみが実行されます。
select! {
    recv(r1) -> msg => assert_eq!(msg, Ok(10)),
    recv(r2) -> msg => assert_eq!(msg, Ok(20)),
    default(Duration::from_secs(1)) => println!("タイムアウト"),
}

動的に作成されたチャネル操作のリストを選択する必要がある場合は、Selectを使用してください。select! マクロは Select の便利なラッパーに過ぎません。

特殊なチャネル#

3 つの関数は特殊なタイプのチャネルを作成でき、すべて 1 つの Receiver のみを返します:

  • after

    一定の時間後にメッセージを送信するチャネルを作成します。

  • tick

    定期的にメッセージを送信するチャネルを作成します。

  • never

    メッセージを決して送信しないチャネルを作成します。

これらのチャネルは非常に効率的で、メッセージは受信操作が行われるときにのみ生成されます。

以下は、50 ミリ秒ごとに時間を印刷し、1 秒間続ける例です:

use std::time::{Duration, Instant};
use crossbeam_channel::{after, select, tick};

let ticker = tick(Duration::from_millis(50));
let timeout = after(Duration::from_secs(1));
let start = Instant::now();

loop {
    select! {
        recv(ticker) -> _ => println!("経過時間: {:?}", start.elapsed()),
        recv(timeout) -> _ => break,
    }
}
読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。