I’ve been working in the Rust space for about a year now using tokio & async/await with DNS. The result of this work is a sizeable from-scratch tokio server using 0.2 (that’s now in production– yay! hopefully I can share more about this later). As a result, I’ve gotten to know the UDP API of tokio quite well, and have even got to submit a few PRs (double yay). I’d like to highlight some interesting changes that have happened in tokio’s API between 0.2
and 0.3
/1.0
. These are all externally facing changes, and will tend to focus on UDP since that’s what I know. I won’t discuss internals since I’m not a tokio team member and I don’t know, but I’ll do my best to point to relevant issues or PRs so you can read more. And if this prompts some discussion which further elucidates some of the details; all the better.
With that out of the way, let’s tuck in…
Types are no longer &mut self
As a result of #2779 net
types in tokio (UdpSocket
, TcpStream
, etc) no longer require &mut self
to recv
/send
(or read
/write
in TcpStream
’s case). This is a great thing for writing code that needs to concurrently read and write on the same socket. If you want to read/write from the same task, you can just use a regular &UdpSocket
reference, and if you want to read in one task and write in another, a simple Arc<UdpSocket>
will do. TcpStream
keeps the split
method, but under the hood it’s doing just what I mentioned.
Before
Now, for concurrent send/recv, I’m not sure how others used to handle this case, but I always would set up a channel and create a dedicated “sender” task, then I’d recv
, do some work, and send the response back over the channel. Putting that all together with split
looked more or less like this:
async fn run() -> Result<()> {
let udp = UdpSocket::bind("0.0.0.0:8080").await?;
// call split so we can give ownership of 'half' to one task
let (mut r, mut s) = udp.split();
let (mut tx, mut rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(1_000);
tokio::spawn(async move {
while let Some((msg, addr)) = rx.recv().await {
let len = s.send_to(&msg, &addr).await.unwrap();
println!("{:?} bytes sent", len);
}
});
let mut buf = [0; 1024];
loop {
let (len, addr) = r.recv_from(&mut buf).await?;
println!("{:?} bytes received from {:?}", len, addr);
tx.send((buf[..len].to_vec(), addr)).await.unwrap();
}
}
You can see here split
gets around the &mut self
property of recv
and send
. The split api let’s us get each “half” that can be moved into its own fully-owned task, we can then concurrently send
and recv
afterwards.
After
Now, with the above changes in 1.0, split
is gone for UdpSocket
and we can just use regular types from the std lib to accomplish the same task. If you’re using TcpStream
or UnixStream
, you can choose to use split
/into_split
or &
/Arc
. For these types, I’d recommend split if you only need two “halves” and use the std types if you want to use more than two.
async fn main() -> io::Result<()> {
let sock = UdpSocket::bind("0.0.0.0:8080").await?;
// Arc instead of split, make two refs
let r = Arc::new(sock);
let s = r.clone();
let (tx, mut rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(1_000);
tokio::spawn(async move {
while let Some((bytes, addr)) = rx.recv().await {
let len = s.send_to(&bytes, addr).await.unwrap();
println!("{:?} bytes sent", len);
}
});
let mut buf = [0; 1024];
loop {
let (len, addr) = r.recv_from(&mut buf).await?;
println!("{:?} bytes received from {:?}", len, addr);
tx.send((buf[..len].to_vec(), addr)).await.unwrap();
}
}
Another thing about &self
methods is that we can send
/recv
from BOTH tasks, whereas before there was a dedicated “sending” and “receiving” half. You could also imagine a more complicated setup where we give a reference to more than two tasks.
Note that in both before/after cases here, we could easily tokio::spawn
in the loop and pass in a clone
d Sender
in order to run each message in its own task (if we were doing something actually useful). That way we’re not doing anything in our read loop but pulling data off the socket.
Poll API
If you want to use a Stream
implementation for UdpSocket
, your previous option was pretty much just UdpFramed
in tokio-util
. If you haven’t used any of the util crates, you write a “decoder” which understands how to transform a buffer into a “frame” of your chosen type.
pub trait Decoder {
type Item;
type Error: From<Error>;
fn decode(
&mut self,
src: &mut BytesMut
) -> Result<Option<Self::Item>, Self::Error>;
...
}
This API has remained much the same in the new tokio-util
, although the internals of how it pulls down a frame has changed. Under the hood, Decoder
used a set of methods prefixed with poll_
, these are methods that are a mirror of their async counterparts, but that return Poll
. These methods have now been committed to the public API. In other words, UdpSocket
now supports a Poll
returning methods for recv
/recv_from
/send
/send_to
.
So now we have an alternative to UdpFramed
to write manual Stream
impls and a committed API for Future
impls. This is a welcome addition if you don’t want to be tied to other parts of the tokio ecosystem or you just need the extra flexibility.
If you’re unfamiliar with this poll_
/ async
dichotomy, count yourself as lucky. It’s nice that most users will only ever write async
methods and never manually implement a Future
or Stream
. But sometimes in library code you need to do these things, and so tokio provides the building blocks, as a low level library, for both an async
and Context
receiving– Poll
returning methods now for each read/write action.
To make things more concrete, UdpSocket
has the async method recv
:
pub async fn recv(&self, buf: &mut [u8]) -> Result<usize>
If you’re in async/await land this is great, use these methods. It’s just when you’re stuck writing Stream
or Future
impls that these methods don’t do you any good. Look at the type signature of a Future
:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Stream
has a similar style to this. If you’ve got async methods, calling them from poll
can be difficult and often produce bugs because you can easily end up creating a new future on every invocation of poll
rather than holding on to a single future and driving it to completion.
In short, we need a Poll
returning method that takes Context
. This is where the poll_*
variants come in:
pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<Result<()>>
This is a bit off-topic, but you can quickly turn a poll_*
method into an async fn with poll_fn
; and indeed, the async methods used to be written this way.
poll_fn(|cx| self.poll_recv_from(cx, buf)).await
However, there is an edge-case to be aware of here, and it’s outlined in the docs on the poll_*
methods:
/// Note that on multiple calls to a `poll_*` method in the send direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup.
and on recv:
/// Note that on multiple calls to a `poll_*` method in the recv direction, only the
/// `Waker` from the `Context` passed to the most recent call will be scheduled to
/// receive a wakeup.
You should not try to concurrently use either the poll_recv
or poll_send
methods in the same direction, and if you do not observe this advice– expect weirdness.
ReadBuf
You may have noticed the poll_recv
method takes a ReadBuf
type and not a &mut [u8]
, this is a new type that AsyncRead
and AsyncWrite
also use in order to read/write into possibly uninitialized memory #2716. ReadBuf
is a low level type that uses MaybeUninit<u8>
under the hood and tracks what parts of the buffer have been filled/initialized. Why would we want to do this? The main issue with stack buffers like [u8; 1024]
, as I understand it, is that the buffer must always be zero-initialized. This is a potentially costly operation, you have to allocate and zero out all of that memory. Because this is a divergence from the std types, there is a corresponding RFC to merge ReadBuf
into std.
tokio-stream
tokio-stream
is a new crate that moves tokio::stream::Stream
& related traits are out and into it’s own crate. There’s also a bunch of helpful tokio_stream::wrappers
which can turn a TcpListener
or UnixListener
into a Stream
. Other wrappers include Interval
for working with a stream of timers. These wrapper types often use the poll_*
API’s I mentioned earlier.
Sundries
UdpSocket
and its Tcp counterparts have gottenasync
readiness checking methods #3138.- Everything that uses
SocketAddr
now takes it by value, since the type isCopy
we don’t need the extra layer of indirection. Old tokio API’s would often take&SocketAddr
. async methods are still generic<T: ToSocketAddrs>
. broadcast::Sender<_>
usessend
instead ofbroadcast
nowwatch::Receiver<_>
useschanged
now instead ofrecv
, andchanged
only shows you readiness. You need to explicitlyborrow()
andclone()
yourself now if you want a freshT
acquire()
forSemaphore
returns aResult<SemaphorePermit, _>
now instead of justSemaphorePermit
time::delay_until
andtime::delay
have been renamed totime::sleep
to be more std-like in naming- Is there more missing here? Send me a message and let me know!
Something I’d like to see in the future
I’d like to see some way to bound on a type that can send
/recv
in the same way we can <T: AsyncRead + AsyncWrite>
. I think this probably won’t be added until there is a zero-cost way to make async methods in traits, and even then, I’m not sure if the tokio team even desires that behaviour. Personally, I think it would make writing Framed
impls much more generic and allow a function to take both a UdpSocket
or Arc<UdpSocket>
generically. This would help when you want to make a Decoder
for a socket that is shared among tasks concurrently.
A helpful reddit comment suggested that UdpFramed
could take a Borrow<UdpSocket>
and thus be generic over UdpSocket
and Arc<UdpSocket>
. I think this is a great idea and added it to my PR of tokio-util
here.
Wrapping up
As an early user of tokio and the futures ecosystem, I remember the slog of futures 0.1 and tokio 0.1, seeing that evolve into the more mature std::future
and tokio 0.2 and now 1.0 has been fascinating. It’s hard to believe it’s been something like 4 years since this all started. I’m happy that most will not have to experience a manual Future
impl or working with nested combinators. I’m even happier, as a user of tokio, that they’ve decided to commit to this API for a good length of time. That keeps some of the burden off of application developers having to “keep up” with everything that’s going on in the ecosystem. Not everyone has the desire to follow all of the PRs or subreddits & message boards. I’m not speaking for myself, personally I can’t get enough, but I understand that’s not how others want to spend their free time.
Anyway, congrats tokio team, you’ve done a wonderful job!