Following up from Cloning Redis in Rust: RESP [Part 1], we can parse the protocol. So now… let’s do something with it.
The obvious(ish) next step, in my mind? Make a server. It’s all going to be over the network eventually, so it’s either here or storing data. Here it is!
Specifically, my goal is not to build the networking and data structures for this project from scratch. Where there are primitives or libraries that will do something like networking for me, I’m going to use them.
Ergo:
So, how do I write a simple server in Tokio?
I’m listening!
Okay, very first version. Let’s start up Tokio, listen on a port, get a client, and say hello.
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncWriteExt};
#[tokio::main]
async fn main() -> std::io::Result<()> {
let addr = "0.0.0.0:6379";
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, _) = listener.accept().await?;
let _result = handle(stream).await;
}
}
async fn handle(mut stream: TcpStream) -> std::io::Result<()> {
stream.write_all("Hello world!\n".as_bytes()).await?;
Ok(())
}
One thing to note is that this does have cargo add tokio --features full
:
cargo add tokio --features full
Updating crates.io index
Adding tokio v1.25.0 to dependencies.
Features:
+ bytes
+ fs
+ full
+ io-std
+ io-util
+ libc
+ macros
+ memchr
+ net
+ num_cpus
+ parking_lot
+ process
+ rt
+ rt-multi-thread
+ signal
+ signal-hook-registry
+ socket2
+ sync
+ time
+ tokio-macros
- mio
- stats
- test-util
- tracing
- windows-sys
That gives us the macro for #[tokio::main]
(which allows us to make main
async
and handles setting up a task runner for us), the TcpListener
and TcpStream
modules that are specific to Tokio, and also AsyncWriteExt
which lets us call stream.write_all
. I’m not 100% sure why it’s all broken up like that. Binary size? Compilation time?
In any case, it’s not much code. It just starts up a server with TcpListener::bind
, waiting for connections. For each connection, just say hi.
# Server
$ cargo run --bin server
Compiling redis-rs v0.1.0 (/Users/jp/Projects/redis-rs)
Finished dev [unoptimized + debuginfo] target(s) in 1.02s
Running `target/debug/server`
# Client
$ nc localhost 6379
Hello world!
Sweet.
What did you say again?
Okay, so the server can talk to the client. What about the reverse? Can we read from the client?
To be able to stream.read
, we need a buffer for it to read into. Handle that and go ahead and do the .await?
thing to allow it to run async
(although we’re not actually doing that well yet, I’ll come back to that…) and return errors immediately (the ?
).
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> std::io::Result<()> {
let addr = "0.0.0.0:6379";
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, _) = listener.accept().await?;
let _result = handle(stream).await;
}
}
async fn handle(mut stream: TcpStream) -> std::io::Result<()> {
stream.write_all("Hello world!\n".as_bytes()).await?;
let mut buf = [0; 1024];
loop {
let bytes_read = stream.read(&mut buf).await?;
if bytes_read == 0 {
break;
}
println!("From client: {:?}", &buf[0..bytes_read]);
stream.write_all(&buf[0..bytes_read]).await?;
}
Ok(())
}
Trying it out?
# Server
$ cargo run --bin server
Compiling redis-rs v0.1.0 (/Users/jp/Projects/redis-rs)
Finished dev [unoptimized + debuginfo] target(s) in 0.52s
Running `target/debug/server`
From client: [72, 111, 119, 32, 97, 114, 101, 32, 121, 111, 117, 63, 10]
From client: [71, 111, 111, 100, 32, 98, 121, 101, 10]
# Client
$ nc localhost 6379
Hello world!
How are you?
How are you?
Good bye
Good bye
^C
Sweet.
But we have a bit of a problem:
# Server
cargo run --bin server
Finished dev [unoptimized + debuginfo] target(s) in 0.44s
Running `target/debug/server`
# Client 1
$ nc localhost 6379
# Client 2
$ nc localhost 6379
# Client 1 RECV: Hello world!
# Client 1 SEND: Hello from 1
# Server: From client: [72, 101, 108, 108, 111, 32, 102, 114, 111, 109, 32, 49, 10]
# Client 1 RECV: Hello from 1
# Client 2 SEND: Hello from 2
# ...
# Client 2 SEND: Hello? Are you listening?
# ...
# Client 1 SEND: Hello again from 1
# Server: From client: [72, 101, 108, 108, 111, 32, 50, 32, 102, 114, 111, 109, 32, 49, 10]
# Client 1 RECV: Hello again from 1
# Client 1 CLOSE
# Client 2 RECV: Hello world!
# Server: From client: [72, 101, 108, 108, 111, 32, 102, 114, 111, 109, 32, 50, 10, 72, 101, 108, 108, 111, 32, 50, 32, 102, 114, 111, 109, 32, 50, 10]
# Client 2 RECV: Hello from 2
# Client 2 RECV: Hello? Are you listening?
In a nutshell, we’re not actually handling these clients asynchronously at all. Luckily, netcat buffered for us, so when the first client let go, the second client got accepted
and the server responded. But what we really want is to be able to talk to 2 (or even more?!) clients at the same time.
Actually asynchronously
It’s actually amazing how easy this part is with Tokio:
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> std::io::Result<()> {
let addr = "0.0.0.0:6379";
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, _) = listener.accept().await?;
tokio::spawn(async move {
let _result = handle(stream).await;
});
}
}
async fn handle(mut stream: TcpStream) -> std::io::Result<()> {
// ...
}
That’s it. That’s the difference. We just need to wrap the call to handle
in tokio::spawn
and essentially for free, the two can run async along side one another:
# Server
cargo run --bin server
Compiling redis-rs v0.1.0 (/Users/jp/Projects/redis-rs)
Finished dev [unoptimized + debuginfo] target(s) in 1.71s
Running `target/debug/server`
$ nc localhost 6379
# Client 1 RECV: Hello world!
# Client 1 SEND: Hello from 1
# Server: From client: [72, 101, 108, 108, 111, 32, 102, 114, 111, 109, 32, 49, 10]
# Client 1 RECV: Hello from 1
$ nc localhost 6379
# Client 2 RECV: Hello world!
# Client 1 SEND: Hello again from 1
# Server: From client: [72, 101, 108, 108, 111, 32, 97, 103, 97, 105, 110, 32, 102, 114, 111, 109, 32, 49, 10]
# Client 1 RECV: Hello again from 1
# Client 2 SEND: Hello from 2
# Server: From client: [72, 101, 108, 108, 111, 32, 102, 114, 111, 109, 32, 50, 10]
# Client 2 RECV: Hello from 2
# Client 2 SEND: Hello again from 2
# Server: From client: [72, 101, 108, 108, 111, 32, 97, 103, 97, 105, 110, 32, 102, 114, 111, 109, 32, 50, 10]
# Client 2 RECV: Hello again from 2
# Client 2 SEND: Good bye
# Server: From client: [71, 111, 111, 100, 32, 98, 121, 101, 10]
# Client 2 RECV: Good bye
# Client 2 CLOSE
# Client 1 SEND: Good bye
# Server: From client: [71, 111, 111, 100, 32, 98, 121, 101, 10]
# Client 1 RECV: Good bye
# Client 1 CLOSE
Nice!
Doing the lumberjack thing.
One thing that I always want to build in relatively early is good logging. For Python, I almost always use the built in logging
module + coloredlogs
. For Rust, the equivalent would seem to be tracing
.
So let’s add that!
cargo add tracing tracing_subscriber
Updating crates.io index
warning: translating `tracing_subscriber` to `tracing-subscriber`
Adding tracing v0.1.37 to dependencies.
Features:
+ attributes
+ std
+ tracing-attributes
- async-await
- log
- log-always
- max_level_debug
- max_level_error
- max_level_info
- max_level_off
- max_level_trace
- max_level_warn
- release_max_level_debug
- release_max_level_error
- release_max_level_info
- release_max_level_off
- release_max_level_trace
- release_max_level_warn
- valuable
Adding tracing-subscriber v0.3.16 to dependencies.
Features:
+ alloc
+ ansi
+ fmt
+ nu-ansi-term
+ registry
+ sharded-slab
+ smallvec
+ std
+ thread_local
+ tracing-log
- env-filter
- json
- local-time
- matchers
- once_cell
- parking_lot
- regex
- serde
- serde_json
- time
- tracing
- tracing-serde
- valuable
- valuable-serde
- valuable_crate
It’s actually pretty cool that you can actually use --features max_level_info
to just automatically log up to info for free. That’s neat. But for now, I’ll use tracing_subscribe
manually.
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing_subscriber;
#[tokio::main]
async fn main() -> std::io::Result<()> {
tracing_subscriber::fmt::init();
let addr = "0.0.0.0:6379";
let listener = TcpListener::bind(addr).await?;
tracing::info!("[server] Listening on {addr}");
loop {
let (stream, addr) = listener.accept().await?;
tracing::debug!("[server] Accepted connection from {addr:?}");
tokio::spawn(async move {
if let Err(e) = handle(stream, addr).await {
tracing::warn!("[server] An error occurred: {e:?}");
}
});
}
}
async fn handle(mut stream: TcpStream, addr: SocketAddr) -> std::io::Result<()> {
tracing::info!("[{addr}] Accepted connection");
let mut buf = [0; 1024];
loop {
let bytes_read = stream.read(&mut buf).await?;
if bytes_read == 0 {
break;
}
tracing::debug!("[{addr}] Received {bytes_read} bytes");
let string = String::from_utf8_lossy(&buf[0..bytes_read]);
tracing::debug!("[{addr} Received {string:?}");
stream.write_all(string.as_bytes()).await?;
}
tracing::info!("[{addr}] Ending connection");
Ok(())
}
Mostly, we just have a bunch of tracing::info!
and tracing::debug!
calls around. I did try to get spans working, but they seemed to act funny with multiple threads, so this is enough for now.
Checking it out:
# Server
$ RUST_LOG=debug cargo run --bin server
Compiling redis-rs v0.1.0 (/Users/jp/Projects/redis-rs)
Finished dev [unoptimized + debuginfo] target(s) in 0.56s
Running `target/debug/server`
# Client
$ nc localhost 6379
# Client I/O
# > Hello
# < Hello
# > Goodbye
# < Goodbye
# Server logs
2023-02-05T02:23:44.647267Z INFO server: [server] Listening on 0.0.0.0:6379
2023-02-05T02:23:47.556100Z DEBUG server: [server] Accepted connection from 127.0.0.1:53986
2023-02-05T02:23:47.556184Z INFO server: [127.0.0.1:53986] Accepted connection
2023-02-05T02:23:49.148031Z DEBUG server: [127.0.0.1:53986] Received 6 bytes
2023-02-05T02:23:49.148187Z DEBUG server: [127.0.0.1:53986 Received "Hello\n"
2023-02-05T02:23:52.525595Z DEBUG server: [127.0.0.1:53986] Received 8 bytes
2023-02-05T02:23:52.525690Z DEBUG server: [127.0.0.1:53986 Received "Goodbye\n"
2023-02-05T02:23:53.329806Z INFO server: [127.0.0.1:53986] Ending connection
That’s pretty cool. And it does work fine with multiple clients as well:
$ RUST_LOG=debug cargo run --bin server
Finished dev [unoptimized + debuginfo] target(s) in 0.10s
Running `target/debug/server`
2023-02-05T02:26:03.249739Z INFO server: [server] Listening on 0.0.0.0:6379
2023-02-05T02:26:07.012916Z DEBUG server: [server] Accepted connection from 127.0.0.1:54523
2023-02-05T02:26:07.013035Z INFO server: [127.0.0.1:54523] Accepted connection
2023-02-05T02:26:08.963769Z DEBUG server: [127.0.0.1:54523] Received 13 bytes
2023-02-05T02:26:08.964088Z DEBUG server: [127.0.0.1:54523 Received "Hello from 1\n"
2023-02-05T02:26:10.421350Z DEBUG server: [server] accepted connection from 127.0.0.1:54527
2023-02-05T02:26:10.421475Z INFO server: [127.0.0.1:54527] Accepted connection
2023-02-05T02:26:12.230319Z DEBUG server: [127.0.0.1:54527] Received 13 bytes
2023-02-05T02:26:12.230440Z DEBUG server: [127.0.0.1:54527 Received "Hello from 2\n"
2023-02-05T02:26:15.365158Z DEBUG server: [127.0.0.1:54523] Received 11 bytes
2023-02-05T02:26:15.365239Z DEBUG server: [127.0.0.1:54523 Received "Still here\n"
2023-02-05T02:26:17.203135Z DEBUG server: [127.0.0.1:54527] Received 9 bytes
2023-02-05T02:26:17.203212Z DEBUG server: [127.0.0.1:54527 Received "Good bye\n"
2023-02-05T02:26:19.171721Z DEBUG server: [127.0.0.1:54523] Received 9 bytes
2023-02-05T02:26:19.171809Z DEBUG server: [127.0.0.1:54523 Received "Good bye\n"
2023-02-05T02:26:19.507445Z INFO server: [127.0.0.1:54523] Ending connection
2023-02-05T02:26:20.473141Z INFO server: [127.0.0.1:54527] Ending connection
So… weren’t we doing something with Redis?
Right. Redis. This is already getting a bit long, so for now, let’s just assume we have a client that knows how to speak RESP (see Cloning Redis in Rust: RESP [Part 1]).
Take that, parse it, log the parsed version, turn it back into a string, and send it back.
use std::net::SocketAddr;
use std::str::FromStr;
use redis_rs::RedisType;
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing_subscriber;
#[tokio::main]
async fn main() -> std::io::Result<()> {
tracing_subscriber::fmt::init();
let addr = "0.0.0.0:6379";
let listener = TcpListener::bind(addr).await?;
tracing::info!("[server] Listening on {addr}");
loop {
let (stream, addr) = listener.accept().await?;
tracing::debug!("[server] Accepted connection from {addr:?}");
tokio::spawn(async move {
if let Err(e) = handle(stream, addr).await {
tracing::warn!("[server] An error occurred: {e:?}");
}
});
}
}
async fn handle(mut stream: TcpStream, addr: SocketAddr) -> std::io::Result<()> {
tracing::info!("[{addr}] Accepted connection");
let mut buf = [0; 1024];
loop {
let bytes_read = stream.read(&mut buf).await?;
if bytes_read == 0 {
break;
}
tracing::debug!("[{addr}] Received {bytes_read} bytes");
let string = String::from_utf8_lossy(&buf[0..bytes_read]);
let data = match RedisType::from_str(&string) {
Ok(data) => data,
Err(err) => {
tracing::warn!("[{addr}] Error parsing input: {err:?}");
continue;
},
};
tracing::debug!("[{addr} Received {data:?}");
stream.write_all(data.to_string().as_bytes()).await?;
}
tracing::info!("[{addr}] Ending connection");
Ok(())
}
The main difference is that we’re taking the buf
, converting it into a string
(for reasons?), turning that into a RedisType
, logging it, and then turning it back to_string().as_bytes()
to send back to the client.
Whew.
# Server
$ RUST_LOG=debug cargo run --bin server
Compiling redis-rs v0.1.0 (/Users/jp/Projects/redis-rs)
Finished dev [unoptimized + debuginfo] target(s) in 1.02s
Running `target/debug/server`
# Client
$ nc localhost 6379
> :42
# Server logs
2023-02-05T02:29:40.849183Z INFO server: [server] Listening on 0.0.0.0:6379
2023-02-05T02:29:43.191540Z DEBUG server: [server] Accepted connection from 127.0.0.1:55112
2023-02-05T02:29:43.191725Z INFO server: [127.0.0.1:55112] Accepted connection
2023-02-05T02:29:44.694273Z DEBUG server: [127.0.0.1:55112] Received 4 bytes
2023-02-05T02:29:44.694439Z WARN server: [127.0.0.1:55112] Error parsing input: InvalidSuffix
Hmm. It turns out that the version of nc
I have doesn’t have a --crlf
flag to send \r\n
as we’re expecting. So… we have to cheat a bit with echo:
# Server
RUST_LOG=debug cargo run --bin server
Finished dev [unoptimized + debuginfo] target(s) in 0.10s
Running `target/debug/server`
# Client
$ echo -ne ':42\r\n' | nc localhost 6379
:42
# Server logs
2023-02-05T02:30:45.463949Z INFO server: [server] Listening on 0.0.0.0:6379
2023-02-05T02:30:50.970001Z DEBUG server: [server] Accepted connection from 127.0.0.1:55312
2023-02-05T02:30:50.970090Z INFO server: [127.0.0.1:55312] Accepted connection
2023-02-05T02:30:50.970109Z DEBUG server: [127.0.0.1:55312] Received 5 bytes
2023-02-05T02:30:50.970130Z DEBUG server: [127.0.0.1:55312 Received Integer { value: 42 }
2023-02-05T02:30:50.970145Z INFO server: [127.0.0.1:55312] Ending connection
Much better!
So now, we can receive proper RESP data and parse it! Sweet.
Is that it?
For now, that’s it.
Next up though, I think I’ll probably want to actually add a few commands to the server. It looks like the expected input is always an RESP Array of Bulk Strings. And probably write a simple client that can turn commands into this, so that I don’t have to keep using echo.
My goal:
$ RUST_LOG=debug cargo run --bin server
...
$ RUST_LOG=debug cargo run --bin client
> SET greeting "Hello world!"
"OK"
> GET greeting
"Hello world!"
But that is for another day.