Original Document: crossbeam_channel - Rust (docs.rs)
The Rust standard library provides the channel std::sync::mpsc
, where mpsc
stands for multiple producer, single consumer, representing a channel that supports multiple senders but only one receiver. On the other hand, crossbeam_channel is a Rust library for mpmc (multiple producer, multiple consumer) channels, similar to the channel in the Go language.
Hello, world!#
use crossbeam_channel::unbounded;
// Create a channel of unbounded capacity.
let (s, r) = unbounded();
// Send a message into the channel.
s.send("Hello, world!").unwrap();
// Receive the message from the channel.
assert_eq!(r.recv(), Ok("Hello, world!"));
Channel Types#
There are two ways to create a channel:
-
bounded: bounded buffer channel
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>)
-
unbounded: unbounded buffer channel
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>)
Both functions return a Sender
and a Receiver
for sending and receiving messages.
bounded channel:
use crossbeam_channel::bounded;
// Create a channel with a maximum capacity of 5 messages.
let (s, r) = bounded(5);
// Can send only 5 messages without blocking.
for i in 0..5 {
s.send(i).unwrap();
}
// Sending beyond the buffer size will block.
// s.send(5).unwrap();
unbounded channel:
use crossbeam_channel::unbounded;
// Create an unbounded buffer channel.
let (s, r) = unbounded();
// Can send any number of messages until you run out of memory.
for i in 0..1000 {
s.send(i).unwrap();
}
When the buffer value of a bounded buffer channel is set to 0, it becomes a synchronous channel (unbuffered channel), which means that sending can only succeed when there is a receiver ready to receive the value, otherwise it will remain in the waiting-to-send state.
use std::thread;
use crossbeam_channel::bounded;
// Create an unbuffered channel.
let (s, r) = bounded(0);
// Sending will block until it is received.
thread::spawn(move || s.send("Hi!").unwrap());
// Receiving will block until someone sends.
assert_eq!(r.recv(), Ok("Hi!"));
Sharing Channels Between Threads#
Senders and receivers can be held by multiple threads through cloning.
use std::thread;
use crossbeam_channel::bounded;
let (s1, r1) = bounded(0);
let (s2, r2) = (s1.clone(), r1.clone());
// Spawn a thread to receive a message and then send a message.
thread::spawn(move || {
r2.recv().unwrap();
s2.send(2).unwrap();
});
// Send a message and then receive a message.
s1.send(1).unwrap();
r1.recv().unwrap();
Note that the new Senders and receivers produced by cloning do not create new message flows, but share the same message flow as the original channel, and the messages in the channel follow the first-in-first-out principle.
use crossbeam_channel::unbounded;
let (s1, r1) = unbounded();
let (s2, r2) = (s1.clone(), r1.clone());
let (s3, r3) = (s2.clone(), r2.clone());
s1.send(10).unwrap();
s2.send(20).unwrap();
s3.send(30).unwrap();
assert_eq!(r3.recv(), Ok(10));
assert_eq!(r1.recv(), Ok(20));
assert_eq!(r2.recv(), Ok(30));
Senders and receivers can also be shared between threads through references.
use crossbeam_channel::bounded;
use crossbeam_utils::thread::scope;
let (s, r) = bounded(0);
// The thread::scope takes a closure and does spawn operations within this scope,
// and finally does join. If all threads join successfully, it returns Ok, otherwise it returns Err.
scope(|scope| {
// Spawn a thread to receive a message and then send a message.
scope.spawn(|_| {
r.recv().unwrap();
s.send(2).unwrap();
});
// Send a message and then receive a message.
s.send(1).unwrap();
r.recv().unwrap();
}).unwrap();
Closing Channels (Disconnection)#
When all senders or all receivers associated with a channel are drop
, the channel will be closed (disconnected), and no more messages can be sent, but any remaining messages can still be received. Sending and receiving operations on a disconnected channel will never block.
use crossbeam_channel::{unbounded, RecvError};
let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// The only sender is dropped, and the channel is closed.
drop(s);
// The remaining messages can still be received.
assert_eq!(r.recv(), Ok(1));
assert_eq!(r.recv(), Ok(2));
assert_eq!(r.recv(), Ok(3));
// There are no more messages in the channel.
assert!(r.is_empty());
// Note that calling `r.recv()` does not block.
// After receiving all the messages, `Err(RecvError)` is immediately returned.
assert_eq!(r.recv(), Err(RecvError));
Blocking Channels#
There are three cases for sending and receiving operations:
- Non-blocking (returns immediately with success or failure)
- Blocking (waits until the operation succeeds or the channel is disconnected)
- Blocking with timeout (blocks for a certain period of time)
Here is a simple example that demonstrates the difference between non-blocking and blocking operations:
use crossbeam_channel::{bounded, RecvError, TryRecvError};
let (s, r) = bounded(1);
// Send a message to the channel
s.send("foo").unwrap();
// This call will block because the channel is full
// s.send("bar").unwrap();
// Receive a message
assert_eq!(r.recv(), Ok("foo"));
// This receive operation will block because the channel is empty
// r.recv();
// The try_recv() method attempts to receive a message without blocking, and returns an error if the channel is empty
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
// Close the channel
drop(s);
// This receive operation will not block because the channel is already closed
assert_eq!(r.recv(), Err(RecvError));
Iterators#
Receiving messages can be used as iterators. For example, the iter
method creates an iterator that receives messages until the channel is empty and closed. Note that if the channel is empty, the iteration will block until the channel is closed or the next message arrives.
use std::thread;
use crossbeam_channel::unbounded;
let (s, r) = unbounded();
thread::spawn(move || {
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
drop(s); // Close the channel.
});
// Receive all messages from the channel.
// Note that calling `collect` will block until the channel is closed.
let v: Vec<_> = r.iter().collect();
assert_eq!(v, [1, 2, 3]);
You can also use try_iter
to create a non-blocking iterator that receives all available messages until the channel is empty:
use crossbeam_channel::unbounded;
let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// Don't close the channel
// Receive all current messages in the channel
let v: Vec<_> = r.try_iter().collect();
assert_eq!(v, [1, 2, 3]);
Selection#
The select!
macro allows you to define a set of channel operations, wait for any of them to become ready, and then execute it. If multiple operations become ready at the same time, one of them will be randomly selected.
You can also define a default operation that will be executed immediately or after a certain period of time if no operation becomes ready.
If an operation is non-blocking, it is considered ready even if the channel is closed and an error is returned.
Here is an example that receives messages from two channels:
use std::thread;
use std::time::Duration;
use crossbeam_channel::{select, unbounded};
let (s1, r1) = unbounded();
let (s2, r2) = unbounded();
thread::spawn(move || s1.send(10).unwrap());
thread::spawn(move || s2.send(20).unwrap());
// Only one receive operation will be executed at most.
select! {
recv(r1) -> msg => assert_eq!(msg, Ok(10)),
recv(r2) -> msg => assert_eq!(msg, Ok(20)),
default(Duration::from_secs(1)) => println!("timed out"),
}
If you need to select a dynamically created list of channel operations, use Select
. The select! macro is just a convenient wrapper for Select.
Special Channels#
Three functions can be used to create special types of channels, all of which only return a Receiver:
-
after
Creates a channel that passes a message after a certain period of time.
-
tick
Creates a channel that passes messages at regular intervals.
-
never
Creates a channel that never passes messages.
These channels are very efficient because messages are only generated when receiving operations are performed.
Here is an example that prints the time every 50 milliseconds for 1 second:
use std::time::{Duration, Instant};
use crossbeam_channel::{after, select, tick};
let ticker = tick(Duration::from_millis(50));
let timeout = after(Duration::from_secs(1));
let start = Instant::now();
loop {
select! {
recv(ticker) -> _ => println!("elapsed: {:?}", start.elapsed()),
recv(timeout) -> _ => break,
}
}