Async Channels
22 min read
Channels are how independent async tasks talk to each other without sharing memory directly. If you have reached for EventEmitter, a MessageChannel, an RxJS Subject, or a hand-rolled queue in Node.js, Tokio’s channel family covers the same ground — but with compile-time guarantees about who can send, who can receive, and what happens on shutdown.
Quick Overview
Section titled “Quick Overview”A channel is a typed pipe: one or more producers push values in, one or more consumers pull them out. Tokio ships four flavors — mpsc (multi-producer, single-consumer queue), oneshot (a single value, once), broadcast (every subscriber sees every message), and watch (subscribers see only the latest value). They are the idiomatic way to coordinate tasks, because passing ownership of a value through a channel sidesteps the shared-mutable-state problems that Arc<Mutex<T>> solves at a higher cost.
The crucial contrast for a JavaScript developer: these are async channels whose recv/send operations .await and yield to the runtime instead of blocking a thread. The standard library’s std::sync::mpsc looks similar but blocks the OS thread, which would stall a Tokio worker — so inside async code you almost always want the Tokio versions.
Note: Every runnable Rust snippet on this page was compiled and executed with
rustc/cargo1.96.0 (current stable; 2024 edition). Examples usetokio = { version = "1.52", features = ["full"] }. Rust ships no built-in async runtime — see Tokio Setup.
TypeScript/JavaScript Example
Section titled “TypeScript/JavaScript Example”JavaScript has no single “channel” primitive, so the same job is done with a grab-bag of tools. A common pattern is an async queue built on Promises, or an EventEmitter for fan-out. Here is a producer/consumer queue and a pub/sub emitter — the two shapes you will most want to translate:
import { EventEmitter } from "node:events";
// --- Producer/consumer: a hand-rolled async queue (mpsc-ish) ---class AsyncQueue<T> { private items: T[] = []; private resolvers: Array<(value: T | null) => void> = []; private closed = false;
send(item: T): void { if (this.closed) throw new Error("queue closed"); const resolve = this.resolvers.shift(); if (resolve) resolve(item); else this.items.push(item); }
close(): void { this.closed = true; // Wake any pending consumers with `null` to signal end-of-stream. for (const resolve of this.resolvers) resolve(null); this.resolvers = []; }
// Resolves to the next item, or null once closed and drained. recv(): Promise<T | null> { const item = this.items.shift(); if (item !== undefined) return Promise.resolve(item); if (this.closed) return Promise.resolve(null); return new Promise((resolve) => this.resolvers.push(resolve)); }}
// --- Pub/sub fan-out: every listener sees every event (broadcast-ish) ---const bus = new EventEmitter();bus.on("user.created", (id: number) => console.log(`audit log: ${id}`));bus.on("user.created", (id: number) => console.log(`send welcome email: ${id}`));bus.emit("user.created", 42);Notice what JavaScript does not give you: there is no static guarantee that exactly one consumer drains the queue, no automatic “the channel is closed because every producer went away,” and EventEmitter is untyped (any payload, any event name). Rust’s channels encode all of that in the type system.
Rust Equivalent
Section titled “Rust Equivalent”The mpsc channel is the workhorse and maps directly onto the producer/consumer queue above:
use tokio::sync::mpsc;
#[derive(Debug)]struct Job { id: u32, payload: String,}
#[tokio::main]async fn main() { // Bounded channel: at most 8 messages buffered before send() awaits. let (tx, mut rx) = mpsc::channel::<Job>(8);
// Producer task. let producer = tokio::spawn(async move { for id in 1..=3 { let job = Job { id, payload: format!("data-{id}"), }; // send() is async: it waits if the buffer is full (backpressure). if tx.send(job).await.is_err() { eprintln!("receiver dropped, stopping producer"); break; } } // tx dropped here when the task ends -> channel closes. });
// Consumer: recv() yields None once all senders are dropped and the buffer drains. while let Some(job) = rx.recv().await { println!("processing job {} with {}", job.id, job.payload); } println!("channel closed, all jobs done");
producer.await.unwrap();}Real output:
processing job 1 with data-1processing job 2 with data-2processing job 3 with data-3channel closed, all jobs doneTwo structural facts fall out of the type system for free: mpsc::Receiver is not Clone, so the compiler guarantees a single consumer; and when the last Sender is dropped, recv() returns None, so the while let loop ends on its own. No closed flag, no sentinel null.
Detailed Explanation
Section titled “Detailed Explanation”mpsc: multi-producer, single-consumer
Section titled “mpsc: multi-producer, single-consumer”mpsc::channel::<T>(capacity) returns a (Sender<T>, Receiver<T>) pair. The name encodes the contract: the Sender is Clone (many producers), the Receiver is not (one consumer).
tx.send(value).awaitmovesvalueinto the channel. On a bounded channel it.awaits when the buffer is full — this is backpressure, a built-in flow-control mechanism that JavaScript queues lack by default. It returnsResult<(), SendError<T>>; theErrarm hands your value back if the receiver is gone.rx.recv().awaitreturnsOption<T>:Some(value)for each message, thenNoneonce all senders are dropped and the buffer is empty.
To get multiple producers, clone the sender — but remember to drop the original so the channel can actually close:
use tokio::sync::mpsc;
#[tokio::main]async fn main() { let (tx, mut rx) = mpsc::channel::<String>(32);
// Spawn 3 producers, each gets its own clone of the sender. for worker_id in 1..=3 { let tx = tx.clone(); tokio::spawn(async move { tx.send(format!("worker {worker_id} reporting in")) .await .unwrap(); }); }
// Drop the original sender so the channel can close once the clones finish. drop(tx);
let mut received = Vec::new(); while let Some(msg) = rx.recv().await { received.push(msg); }
received.sort(); // task ordering is nondeterministic; sort for a stable print. for msg in received { println!("{msg}"); }}Real output:
worker 1 reporting inworker 2 reporting inworker 3 reporting inTip: There is also
mpsc::unbounded_channel(). Itssend()is synchronous (no.await, never waits) because the buffer is unlimited — but that means no backpressure, so a fast producer can grow the queue until you run out of memory. Prefer the boundedchannel(capacity)unless you have a specific reason not to.
oneshot: exactly one value, once
Section titled “oneshot: exactly one value, once”oneshot::channel::<T>() is for a single reply. The Sender::send method takes self by value, so it can only ever be called once — the type system enforces “exactly one message.”
use tokio::sync::oneshot;
#[tokio::main]async fn main() { let (tx, rx) = oneshot::channel::<u64>();
// Worker computes a single value and sends it back. tokio::spawn(async move { let result = (1..=100).sum::<u64>(); // send() takes self by value -> can only be called once. Not async. let _ = tx.send(result); });
// Await the single reply. Errors only if the sender was dropped without sending. match rx.await { Ok(value) => println!("computed sum = {value}"), Err(_) => println!("worker dropped the sender without replying"), }}Real output:
computed sum = 5050Note that the Receiver is itself a future — you rx.await it directly rather than calling .recv(). A oneshot is the Rust analogue of a single Promise whose resolve you hand to another task. Its most powerful use is embedding the reply channel inside a request message, which gives you request/response over an mpsc (see the actor pattern below).
broadcast: every subscriber sees every message
Section titled “broadcast: every subscriber sees every message”broadcast::channel::<T>(capacity) is fan-out: clone-free subscription via tx.subscribe(), and each receiver gets a copy of every value sent after it subscribed. The value type must be Clone.
use tokio::sync::broadcast;
#[tokio::main]async fn main() { // Capacity 16: each receiver has its own 16-slot ring buffer. let (tx, _rx) = broadcast::channel::<String>(16);
// Two independent subscribers. Each gets EVERY message sent after it subscribed. let mut rx1 = tx.subscribe(); let mut rx2 = tx.subscribe();
let sub1 = tokio::spawn(async move { while let Ok(event) = rx1.recv().await { println!("sub1 saw: {event}"); } }); let sub2 = tokio::spawn(async move { while let Ok(event) = rx2.recv().await { println!("sub2 saw: {event}"); } });
// send() returns the number of currently-subscribed receivers. tx.send("user.created".to_string()).unwrap(); tx.send("user.updated".to_string()).unwrap();
// Dropping the sender closes the channel; receivers then get RecvError::Closed. drop(tx);
sub1.await.unwrap(); sub2.await.unwrap();}Real output:
sub1 saw: user.createdsub1 saw: user.updatedsub2 saw: user.createdsub2 saw: user.updatedThis is the typed, ownership-aware replacement for EventEmitter. The capacity matters: broadcast uses a ring buffer per receiver, so a slow consumer that falls more than capacity messages behind will lag and skip the oldest messages (covered under Common Pitfalls).
watch: only the latest value matters
Section titled “watch: only the latest value matters”watch::channel(initial) is for state that changes over time where consumers only care about the current value, not the history — config reloads, a “current health” flag, the latest sensor reading. The sender overwrites; receivers borrow() the newest value.
use tokio::sync::watch;use tokio::time::{Duration, sleep};
#[derive(Clone, Debug, PartialEq)]struct Config { log_level: String,}
#[tokio::main]async fn main() { // watch holds a single latest value; receivers see the most recent one. let (tx, mut rx) = watch::channel(Config { log_level: "info".to_string(), });
let watcher = tokio::spawn(async move { loop { // changed() resolves when the value is updated since the last observation. if rx.changed().await.is_err() { println!("config sender dropped, watcher exiting"); break; } // borrow() gives a read guard to the latest value. let cfg = rx.borrow(); println!("config changed -> log_level = {}", cfg.log_level); } });
sleep(Duration::from_millis(20)).await; tx.send(Config { log_level: "debug".to_string() }).unwrap(); sleep(Duration::from_millis(20)).await; tx.send(Config { log_level: "trace".to_string() }).unwrap(); sleep(Duration::from_millis(20)).await;
drop(tx); // closes the channel; watcher's changed() returns Err and it exits. watcher.await.unwrap();}Real output:
config changed -> log_level = debugconfig changed -> log_level = traceconfig sender dropped, watcher exitingIf you send three times before a watcher calls changed(), it only ever sees the last value — intermediate values are coalesced. That is the defining difference from broadcast, which delivers every message.
Warning: Do not hold the guard returned by
watch::Receiver::borrow()across an.await. It is a read lock; keeping it alive while suspended can block senders. Copy out what you need, drop the guard, then await. The same hazard applies totokio::sync::RwLock— see sync-primitives.md.
Request/response: oneshot inside an mpsc message (the actor pattern)
Section titled “Request/response: oneshot inside an mpsc message (the actor pattern)”This is the single most useful composition. One task owns some state; everyone else talks to it by sending a request that carries its own private oneshot reply channel:
use tokio::sync::{mpsc, oneshot};
/// A request carries a oneshot sender for its individual reply.struct Request { key: String, reply_to: oneshot::Sender<Option<String>>,}
#[tokio::main]async fn main() { let (tx, mut rx) = mpsc::channel::<Request>(32);
// The "actor": owns the state, processes requests one at a time. let actor = tokio::spawn(async move { let store = std::collections::HashMap::from([ ("alice".to_string(), "admin".to_string()), ("bob".to_string(), "user".to_string()), ]); while let Some(req) = rx.recv().await { let answer = store.get(&req.key).cloned(); // Reply on this request's private oneshot channel. let _ = req.reply_to.send(answer); } });
// A client sends a request and awaits its dedicated reply. let role = lookup(&tx, "alice").await; println!("alice -> {role:?}"); let missing = lookup(&tx, "carol").await; println!("carol -> {missing:?}");
drop(tx); actor.await.unwrap();}
async fn lookup(tx: &mpsc::Sender<Request>, key: &str) -> Option<String> { let (reply_to, reply_rx) = oneshot::channel(); tx.send(Request { key: key.to_string(), reply_to, }) .await .expect("actor is alive"); reply_rx.await.expect("actor replied")}Real output:
alice -> Some("admin")carol -> NoneBecause the actor is the only task that touches store, there is no lock and no data race — the channel serializes access. This is how you replace Arc<Mutex<HashMap<..>>> with message passing.
Key Differences
Section titled “Key Differences”| Concern | JavaScript | Rust / Tokio |
|---|---|---|
| Producer/consumer queue | hand-rolled, or a userland library | tokio::sync::mpsc (built in) |
| One-shot reply | a single Promise you resolve | tokio::sync::oneshot |
| Pub/sub fan-out | EventEmitter, RxJS Subject | tokio::sync::broadcast |
| ”Latest value” state | a variable + manual notify | tokio::sync::watch |
| Type safety of payloads | none (any) | fully typed T, checked at compile time |
| Who can receive | not enforced | mpsc::Receiver is not Clone → single consumer guaranteed |
| Backpressure | manual | bounded mpsc::send().await waits when full |
| ”Channel closed” signal | manual flag / sentinel | automatic when all senders or the receiver drop |
| Blocking vs async | event loop never blocks | Tokio channels .await; std::sync::mpsc blocks the thread |
std::sync::mpsc vs tokio::sync::mpsc
Section titled “std::sync::mpsc vs tokio::sync::mpsc”The standard library has its own mpsc that looks like Tokio’s, and reaching for it inside async code is a classic mistake.
use std::sync::mpsc; // standard library, NOT tokiouse std::thread;
fn main() { // std channel: multi-producer, single-consumer, BLOCKING (no async). let (tx, rx) = mpsc::channel::<i32>();
for n in 0..3 { let tx = tx.clone(); thread::spawn(move || { tx.send(n * 10).unwrap(); // blocks the OS thread if needed }); } drop(tx);
let mut total = 0; // recv() blocks the thread until a value arrives or all senders drop. for value in rx { total += value; } println!("total = {total}");}Real output:
total = 30The decision rule:
std::sync::mpsc | tokio::sync::mpsc | |
|---|---|---|
recv | blocks the OS thread | .awaits, yields to the runtime |
send | synchronous | .await on bounded; sync on unbounded |
| Use it for | plain threads (05-ownership territory) | async tasks under a runtime |
| Inside a Tokio task? | no — it stalls a worker thread | yes |
Note: If you genuinely must call a blocking
stdchannel from async (e.g. bridging a synchronous library), do it on a thread that is allowed to block viatokio::task::spawn_blocking— see spawning-tasks.md.
Common Pitfalls
Section titled “Common Pitfalls”1. Forgetting to drop the extra Sender, so recv() never returns None
Section titled “1. Forgetting to drop the extra Sender, so recv() never returns None”recv() returns None only when every sender is dropped. If you clone a sender for workers but keep the original alive in scope, the consumer loop hangs forever waiting for more. The fix is the explicit drop(tx) shown in the multi-producer example above (or letting the original go out of scope before the loop).
2. Trying to clone the mpsc::Receiver
Section titled “2. Trying to clone the mpsc::Receiver”mpsc is single-consumer by design. Cloning the receiver to fan work out to several tasks does not compile:
use tokio::sync::mpsc;
#[tokio::main]async fn main() { let (_tx, rx) = mpsc::channel::<i32>(8); // mpsc is multi-producer, SINGLE-consumer: the Receiver is not Clone. let rx2 = rx.clone(); // does not compile (error[E0599]: no method named `clone`) drop((rx, rx2));}Real compiler error:
error[E0599]: no method named `clone` found for struct `tokio::sync::mpsc::Receiver` in the current scope --> src/bin/err_clone_rx.rs:7:18 |7 | let rx2 = rx.clone(); | ^^^^^ |help: there is a method `close` with a similar nameTo share one receiver across several worker tasks, wrap it in Arc<tokio::sync::Mutex<Receiver<T>>> (the worker pool below does exactly that), or restructure so each worker has its own channel.
3. Calling oneshot::Sender::send twice
Section titled “3. Calling oneshot::Sender::send twice”send consumes the sender, so a second call references a moved value:
use tokio::sync::oneshot;
#[tokio::main]async fn main() { let (tx, _rx) = oneshot::channel::<i32>(); tx.send(1).unwrap(); // send consumes `tx` by value tx.send(2).unwrap(); // does not compile (error[E0382]: use of moved value)}Real compiler error (abridged):
error[E0382]: use of moved value: `tx` --> src/bin/err_send_twice.rs:7:5 | 5 | let (tx, _rx) = oneshot::channel::<i32>(); | -- move occurs because `tx` has type `tokio::sync::oneshot::Sender<i32>`, which does not implement the `Copy` trait 6 | tx.send(1).unwrap(); // send consumes `tx` by value | ------- `tx` moved due to this method call 7 | tx.send(2).unwrap(); // ERROR: tx already moved | ^^ value used here after move |note: `tokio::sync::oneshot::Sender::<T>::send` takes ownership of the receiver `self`, which moves `tx`If you need to send more than once, you wanted mpsc (a stream of messages) or watch (the latest value), not oneshot.
4. A slow broadcast receiver lagging and skipping messages
Section titled “4. A slow broadcast receiver lagging and skipping messages”broadcast keeps a fixed-size ring buffer per receiver. A consumer that falls behind by more than the capacity loses the oldest messages and recv() returns Err(RecvError::Lagged(n)). Unlike a closed channel, lag is recoverable — you keep receiving after it:
use tokio::sync::broadcast::{self, error::RecvError};
#[tokio::main]async fn main() { // Tiny capacity of 2 to force lag. let (tx, mut rx) = broadcast::channel::<u32>(2);
// Send 4 values before the slow receiver reads anything. for n in 1..=4 { tx.send(n).unwrap(); }
// The first two values were overwritten in the ring buffer -> Lagged(2). loop { match rx.recv().await { Ok(value) => println!("received {value}"), Err(RecvError::Lagged(skipped)) => { println!("lagged: skipped {skipped} messages"); } Err(RecvError::Closed) => { println!("channel closed"); break; } } }}Real output:
lagged: skipped 2 messagesreceived 3received 4Always handle Lagged explicitly: log it, increase the capacity, or speed up the consumer. Treating it as a fatal error is usually wrong.
5. Using std::sync::mpsc::recv() inside an async task
Section titled “5. Using std::sync::mpsc::recv() inside an async task”Because std::sync::mpsc::recv() blocks the calling OS thread, calling it directly in a Tokio task parks one of the runtime’s worker threads. On a single-threaded runtime this deadlocks; on a multi-threaded runtime it silently degrades throughput. There is no compiler error — it just behaves badly. Use tokio::sync::mpsc in async code, or hop to spawn_blocking for genuinely blocking work.
Best Practices
Section titled “Best Practices”- Pick the channel by communication shape, not habit. Stream of work →
mpsc; single reply →oneshot; fan-out events →broadcast; latest-value state →watch. - Prefer bounded
mpsc. Backpressure protects you from unbounded memory growth. Choose a capacity, and treat a full channel as a real signal about load. - Drop senders you do not need. A lingering
Senderclone keeps the channel open and hangs the consumer. Let clones move into tasks;dropthe original explicitly when the borrow checker keeps it alive. - Always handle
broadcast’sLagged. A slow subscriber should degrade gracefully, not crash. - Use message passing instead of shared state when you can. The actor pattern (an
mpscof requests, each carrying aoneshotreply) replacesArc<Mutex<T>>and removes a whole class of locking bugs. When you do need shared state, see arc-mutex-pattern.md. - Keep
std::sync::mpscfor plain threads. Insideasync, default to the Tokio family. - Combine channels with
select!for shutdown. Awatch<bool>shutdown flag plustokio::select!lets a task react to either work or a stop signal — see select-join.md.
Real-World Example
Section titled “Real-World Example”A production graceful-shutdown worker pool brings the whole family together: an mpsc carries jobs (with backpressure), each job carries a oneshot for its individual reply, and a watch<bool> broadcasts the shutdown signal to every worker. The single mpsc::Receiver is shared across workers behind an Arc<tokio::sync::Mutex<_>>, and tokio::select! lets each worker react to either a new job or the shutdown flag.
use tokio::sync::{mpsc, oneshot, watch};use tokio::time::{Duration, sleep};
/// A unit of work submitted to the pool, carrying a private reply channel.struct Task { id: u32, reply: oneshot::Sender<String>,}
#[tokio::main]async fn main() { // Jobs flow worker-ward over a bounded mpsc (backpressure at 64). let (job_tx, job_rx) = mpsc::channel::<Task>(64); // A single shutdown flag broadcast to every worker via watch. let (shutdown_tx, shutdown_rx) = watch::channel(false);
// Spawn a small pool. mpsc is single-consumer, so we share the Receiver // behind an async Mutex; each worker pulls the next available job. let job_rx = std::sync::Arc::new(tokio::sync::Mutex::new(job_rx)); let mut workers = Vec::new(); for worker_id in 0..3 { let job_rx = job_rx.clone(); let mut shutdown_rx = shutdown_rx.clone(); workers.push(tokio::spawn(async move { loop { let task = { let mut guard = job_rx.lock().await; tokio::select! { // Stop promptly when the shutdown flag flips to true. _ = shutdown_rx.changed() => { if *shutdown_rx.borrow() { break; } continue; } maybe = guard.recv() => match maybe { Some(task) => task, None => break, // all senders dropped }, } }; // Simulate work, then answer this task's private oneshot. sleep(Duration::from_millis(5)).await; let _ = task .reply .send(format!("task {} handled by worker {worker_id}", task.id)); } })); }
// Submit 5 jobs and collect their individual replies. let mut replies = Vec::new(); for id in 0..5 { let (reply, reply_rx) = oneshot::channel(); job_tx.send(Task { id, reply }).await.unwrap(); replies.push(reply_rx); }
let mut results = Vec::new(); for rx in replies { results.push(rx.await.unwrap()); } results.sort(); for line in &results { println!("{line}"); }
// Graceful shutdown: stop accepting work, signal workers, await them. drop(job_tx); shutdown_tx.send(true).unwrap(); for w in workers { w.await.unwrap(); } println!("pool drained and shut down cleanly");}Real output (one representative run — which worker handles which task varies between runs, but the task ordering is deterministic because we sort by task id):
task 0 handled by worker 0task 1 handled by worker 1task 2 handled by worker 2task 3 handled by worker 2task 4 handled by worker 0pool drained and shut down cleanlyNote: This pattern — bounded
mpscfor work,oneshotfor replies,watchfor shutdown,select!to combine them — is the backbone of countless Rust services. It composes from small, independently testable pieces and never touches a shared lock for the business data.
Further Reading
Section titled “Further Reading”- Tokio Tutorial — Channels — the official walkthrough this page parallels.
tokio::syncmodule docs — overview of all four channels and when to use each.tokio::sync::mpsc·oneshot·broadcast·watch— per-channel API references.std::sync::mpsc— the blocking standard-library channel, for plain threads.- Actors with Tokio — Alice Ryhl’s canonical write-up of the request/response actor pattern.
Related sections of this guide:
- 11-async/00_promises-vs-futures.md — why Rust futures are lazy and need a runtime at all.
- 11-async/01_async-await.md —
async/awaitsyntax and?inside async, whichrecv().awaitbuilds on. - 11-async/03_tokio-setup.md — adding
tokiowithfeatures = ["full"]and#[tokio::main]. - 11-async/09_spawning-tasks.md —
tokio::spawn,JoinHandle, andspawn_blockingfor the producer/consumer tasks here. - 11-async/07_select-join.md —
tokio::select!andjoin!, used in the worker pool for shutdown. - 11-async/06_streams.md — turning a channel into a
Streamyou can iterate withwhile let. - 11-async/11_sync-primitives.md — the async
Mutex/RwLockused to share anmpsc::Receiver. - 11-async/12_arc-mutex-pattern.md — when shared state beats message passing, and how to do it safely.
- 05-ownership/README.md — move semantics, which explain why senders/receivers behave as they do.
- 02-basics/README.md — Rust fundamentals if you need a refresher.
- Next section: 12-modules-packages/README.md — organizing crates and modules.
Exercises
Section titled “Exercises”Exercise 1: Sum over an mpsc
Section titled “Exercise 1: Sum over an mpsc”Difficulty: Easy
Objective: Wire up a basic producer/consumer with a bounded mpsc channel.
Instructions:
- Create a bounded
mpsc::channel::<i32>(16). - Spawn a producer task that sends the numbers
1..=5. - In
main, drain the receiver withwhile let Some(n) = rx.recv().awaitand accumulate a sum. - Make sure the consumer loop actually terminates, then print the total.
Solution
use tokio::sync::mpsc;
#[tokio::main]async fn main() { let (tx, mut rx) = mpsc::channel::<i32>(16);
let producer = tokio::spawn(async move { for n in 1..=5 { tx.send(n).await.unwrap(); } // tx moved into the task and dropped here -> channel closes, // so rx.recv() will return None and the loop below ends. });
let mut sum = 0; while let Some(n) = rx.recv().await { sum += n; } producer.await.unwrap(); println!("sum = {sum}");}Output:
sum = 15The key insight: the producer task owns tx after the async move, so when the task finishes the sender is dropped and recv() returns None.
Exercise 2: One-shot square
Section titled “Exercise 2: One-shot square”Difficulty: Medium
Objective: Use oneshot to get a single computed value back from a spawned task, wrapped in a reusable async function.
Instructions:
- Write
async fn compute(input: u64) -> u64. - Inside it, create a
oneshotchannel, spawn a task that computesinput * inputand sends it, and.awaitthe receiver. - Call
compute(9)frommainand print the result.
Solution
use tokio::sync::oneshot;
async fn compute(input: u64) -> u64 { let (tx, rx) = oneshot::channel(); tokio::spawn(async move { let _ = tx.send(input * input); }); // The Receiver is itself a Future; await it directly (no .recv()). rx.await.expect("worker replied")}
#[tokio::main]async fn main() { let result = compute(9).await; println!("9 squared = {result}");}Output:
9 squared = 81rx.await yields Result<T, RecvError>; expect is fine here because we know the task always sends before dropping its sender.
Exercise 3: Fan-out with broadcast
Section titled “Exercise 3: Fan-out with broadcast”Difficulty: Medium–Hard
Objective: Build a pub/sub where two independent subscribers each collect every message, then verify they saw the same sequence.
Instructions:
- Create a
broadcast::channel::<i32>(8). - Subscribe twice (
tx.subscribe()), and spawn a task per subscriber that loops onrecv().await, pushing each value into aVecuntil the channel closes. - From
main, send1..=3, then drop the sender. - Await both tasks and print each subscriber’s collected
Vec. Both must be[1, 2, 3].
Solution
use tokio::sync::broadcast;
#[tokio::main]async fn main() { let (tx, _) = broadcast::channel::<i32>(8); let mut a = tx.subscribe(); let mut b = tx.subscribe();
let ta = tokio::spawn(async move { let mut seen = Vec::new(); // recv() returns Err(Closed) once the sender is dropped -> loop ends. while let Ok(v) = a.recv().await { seen.push(v); } seen }); let tb = tokio::spawn(async move { let mut seen = Vec::new(); while let Ok(v) = b.recv().await { seen.push(v); } seen });
for n in 1..=3 { tx.send(n).unwrap(); } drop(tx); // close the channel so both receivers stop
println!("a saw {:?}", ta.await.unwrap()); println!("b saw {:?}", tb.await.unwrap());}Output:
a saw [1, 2, 3]b saw [1, 2, 3]Both subscribers must subscribe before any message is sent — broadcast only delivers messages sent after a receiver subscribes. With capacity 8 and only three messages, no lag occurs.