Another Redis in Rust series. It’s really starting to come together now!
In this post, updating the state to store expiration times + a thread to handle said eviction + the implementation of a small pile more of the general Redis functions.
Better Redis function definitions
Where we left off last time, I was doing a lot to manually parse the parameters of more complicated functions (such as SET
, which really ends up being one of the most complicated, amusingly). But doing all that by hand is a bit annoying, not to mention long and error prone.
We’re writing in Rust, let’s write some macros to handle that for us!
To start with, all of these macros are currently part of the same lazy_static!
block that I’ve been using to define the COMMANDS
:
lazy_static! {
static ref COMMANDS: HashMap<&'static str, Command> = {
let mut m = HashMap::new();
macro_rules! assert_n_args {
($args:ident, $n:literal) => {
if $args.len() != $n {
return Err(String::from(format!("Expected {} args, got {}", $n, $args.len())));
}
}
}
// ...
m
}
}
And therein, we have the first. Essentially, when we are processing the args
sent to a command, we can call assert_n_args(args, 2)
. If the args
doesn’t have exactly 2 values, return an error. Turns the 3 lines into one much simpler one.
Likewise, if we have to have at least 2:
macro_rules! assert_n_or_more_args {
($args:ident, $n:literal) => {
if $args.len() < $n {
return Err(String::from(format!("Expected at least {} args, got {}", $n, $args.len())));
}
}
}
What starts getting more interesting is when we want to pull a specific kind of value out of the args. In this case, what if we want a string. Because we want to automatically convert types, we are going to handle two different cases here: RedisType::String
(direct comparison) and RedisType::Integer
(cast to a string first):
macro_rules! get_string_arg {
($args:ident, $index:expr) => {
{
if $index >= $args.len() {
return Err(String::from("Not enough args"));
}
match $args[$index].clone() {
RedisType::String{value} => value,
RedisType::Integer{value} => value.to_string(),
_ => return Err(String::from(format!("Attempted to use {} as a string", $args[$index]))),
}
}
}
}
Now we’re really getting into the savings. This not only checks that there is actually the requisite arg (handling index out of bounds for me) but also that we can match
on the type. All in one line:
let key = get_string_arg(args, 0);
let value = get_string_arg(args, 1);
Two lines instead of ~20. Not bad.
Next, we often want to do string comparisons (for keywords, such as SET key value NX
). So a macro for that:
macro_rules! is_string_eq {
($args:ident, $index:expr, $value:literal) => {
get_string_arg!($args, $index).to_ascii_uppercase() == $value.to_ascii_uppercase()
}
}
And it even uses get_string_arg!
behind the scenes!
Okay, what about other types? Well, we’re dealing with casting again, so how about integers and floats:
macro_rules! get_integer_arg {
($args:ident, $index:expr) => {
{
if $index >= $args.len() {
return Err(String::from("Not enough args"));
}
match $args[$index].clone() {
RedisType::String{value} => {
match value.parse() {
Ok(value) => value,
Err(_) => return Err(String::from(format!("Attempted to use {} as an integer", $args[$index]))),
}
},
RedisType::Integer{value} => value,
_ => return Err(String::from(format!("Attempted to use {} as an integer", $args[$index]))),
}
}
}
}
macro_rules! get_float_arg {
($args:ident, $index:expr) => {
{
if $index >= $args.len() {
return Err(String::from("Not enough args"));
}
match $args[$index].clone() {
RedisType::String{value} => {
match value.parse() {
Ok(value) => value,
Err(_) => return Err(String::from(format!("Attempted to use {} as a float", $args[$index]))),
}
},
RedisType::Integer{value} => value as f64,
_ => return Err(String::from(format!("Attempted to use {} as a float", $args[$index]))),
}
}
}
}
Again, if it’s the right type, we can just convert it, but if we have something that’s a string and can be a number, try to convert it. If not, error with details.
Not bad!
I do have one more, but that’s mostly specific to SET
, so let’s leave it for that.
Re-parsing SET
Okay, so how how do we handle SET
?
m.insert("SET", Command {
help: String::from("\
SET key value [NX | XX] [GET] [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL]
Sets key to a given value.
NX|XX - only set if the key does not / does already exist.
EX|PX|EXAT|PXAT - key expires after seconds/milliseconds or at a Unix timestamp in seconds/milliseconds
KEEPTTL - retain the previously set TTL
GET - return the previous value, returns NIL and doesn't return if the key wasn't set
Returns OK if SET succeeded, nil if SET was not performed for NX|XX or because of GET, the old value if GET was specified.
"),
f: Box::new(|state, args| {
assert_n_or_more_args!(args, 2);
let key = get_string_arg!(args, 0);
let value = get_string_arg!(args, 1);
let mut nx = false;
let mut xx = false;
let mut keepttl = false;
let mut get = false;
let mut expiration = None;
let mut i = 2;
loop {
if i >= args.len() {
break;
} else if is_string_eq!(args, i, "NX") {
nx = true;
i += 1;
} else if is_string_eq!(args, i, "XX") {
xx = true;
i += 1;
} else if is_string_eq!(args, i, "KEEPTTL") {
keepttl = true;
i += 1;
} else if is_string_eq!(args, i, "GET") {
get = true;
i += 1;
} else if let Some(ex) = get_expiration!(args, i) {
expiration = Some(ex);
i+= 2;
} else {
return Err(String::from(format!("Unexpected parameter: {:?}", args[i])));
}
}
if nx && xx {
return Err(String::from("SET: Cannot set both NX and XX"));
}
if keepttl && expiration.is_some() {
return Err(String::from("SET: Cannot set more than one of EX/PX/EXAT/PXAT/KEEPTTL"));
}
if expiration.is_some() {
tracing::debug!("Setting expiration for key {} to {:?}", key, expiration);
state.ttl.push(key.clone(), expiration.unwrap());
} else if keepttl {
// do nothing
} else {
state.ttl.remove(&key);
}
if nx && state.keystore.contains_key(&key) {
return Ok(RedisType::NullString);
}
if xx && !state.keystore.contains_key(&key) {
return Ok(RedisType::NullString);
}
let result = if get {
Ok(match state.keystore.get(&key) {
Some(value) => RedisType::String { value: value.to_owned() },
None => RedisType::NullString,
})
} else {
Ok(RedisType::String { value: "OK".to_owned() })
};
state.keystore.insert(key, value);
result
})
});
There are a few changes there (help
and state.ttl
) that I’ll have to come back to in a moment. I expect, you can guess what they do. But here already, you can see how helpful it is to have these macros. I can pull off key
and value
in two lines, then start processing the keywords (NX
/XX
etc) directly.
But then we come to the pile of arguments that set expiration. We’ll use them again, so how about we macro that as well:
macro_rules! get_expiration {
($args:ident, $index:expr) => {
if is_string_eq!($args, $index, "EX") {
// Seconds from now
let value = get_integer_arg!($args, $index + 1);
Some((
SystemTime::now()
+ Duration::from_secs(value as u64)
))
} else if is_string_eq!($args, $index, "PX") {
// Milliseconds from now
let value = get_integer_arg!($args, $index + 1);
Some((
SystemTime::now()
+ Duration::from_millis(value as u64)
))
} else if is_string_eq!($args, $index, "EXAT") {
// Seconds since epoch
let value = get_integer_arg!($args, $index + 1);
Some(UNIX_EPOCH + Duration::from_secs(value as u64))
} else if is_string_eq!($args, $index, "PXAT") {
// Milliseconds since epoch
let value = get_integer_arg!($args, $index + 1);
Some(UNIX_EPOCH + Duration::from_millis(value as u64))
} else {
None
}
}
}
Now this is an interesting one, since there are four ways to set expiration. Either in seconds or milliseconds and either as an absolute timestamp (since epoch) or relative to ’now’. In all cases, we can parse each and turn them into an absolute SystemTime
of when it will expire.
Implementing TTL on the state
Okay, we can parse the expiration time for SET
, now we need a place to store it. You might have noticed that we’re setting the ttl
in the State
, so what’s that?
#[derive(Debug, Default)]
pub struct State {
keystore: HashMap<String, String>,
ttl: PriorityQueue<String, SystemTime>,
}
That’s it. A PriorityQueue
in this case is implemented so that items in it (the expiration times) are sorted. That means that the next to expire will be first. It also has efficient O(log(n))
removal by key, which is nice.
Expiration thread
So now that we have that, how do we actually expire stored values?
Well, one open would be to expire them on demand. When we fetch a key, check if it’s expired. But that’s not super elegant, so instead let’s more as expected: make a thread that periodically checks the next-to-expire keys and expires any that have passed their date.
Something like this:
#[tokio::main]
async fn main() -> std::io::Result<()> {
tracing_subscriber::fmt::init();
let addr = "0.0.0.0:6379";
let listener = TcpListener::bind(addr).await?;
tracing::info!("Listening on {addr}");
let state = Arc::new(Mutex::new(State::default()));
let ttl_state = state.clone();
tokio::spawn(async move {
loop {
let now = SystemTime::now();
loop {
let evict = match ttl_state.lock().await.ttl.peek() {
Some((_, eviction_time)) => *eviction_time < now,
None => false,
};
if evict {
let mut ttl_state = ttl_state.lock().await;
let (key, _) = ttl_state.ttl.pop().unwrap();
tracing::debug!("Evicting {key} from keystore");
ttl_state.keystore.remove(&key);
} else {
break;
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
loop {
let (stream, addr) = listener.accept().await?;
let thread_state = state.clone();
tracing::debug!("Accepted connection from {addr:?}");
tokio::spawn(async move {
if let Err(e) = handle(stream, addr, thread_state).await {
tracing::warn!("An error occurred: {e:?}");
}
});
}
}
One difference here is that we’re using Arc<Mutex<State>>
. An Arc
is a reference counter to the item held inside, so that that as long as at least one copy still exists, it will stay in scope. In this case, we have one copy for the eviction thread and then one more per connected client.
This actually does make our client truly multi-threaded and thread safe, since we’re passing the same state
to each.
The Arc
alone though means we can’t actually modify the state
, just read it in multiple threads. Not super helpful. The Mutex
on the other hand, allows us to request a lock
in each thread and (once given the lock) read or modify. If we had more readers than writers, we could instead have used a RwLock
and I may try that in the future. But for now, it works great.
All that being said, now we have the ability to state.lock().await
to get a lock on the Mutex
so that we can read or write to it. While we have it, any other thread will await
the release.
And with that, we can run a thread once per second to check for eviction. It’s not nanosecond perfect for eviction, but I don’t think that’s a guarantee that we need.
The one last gotcha is in the eviction loop:
loop {
let now = SystemTime::now();
loop {
let evict = match ttl_state.lock().await.ttl.peek() {
Some((_, eviction_time)) => *eviction_time < now,
None => false,
};
if evict {
let mut ttl_state = ttl_state.lock().await;
let (key, _) = ttl_state.ttl.pop().unwrap();
tracing::debug!("Evicting {key} from keystore");
ttl_state.keystore.remove(&key);
} else {
break;
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
Why do I calculate evict
first and then get another lock to actually evict it? Well, that’s Rust’s fault. It doesn’t want to let me borrow ttl_state
as mutable (since we’re writing to it) if we’ve already borrowed it as read only (with the peek
). And we do want to peek, since if we actually did a pop
, then we’d have to always put the item back on.
Took a bit to get that working.
With all that together, we have eviction!
[server] $ RUST_LOG=debug cargo run --bin server
[server] Compiling redis-rs v0.1.0 (/Users/jp/Projects/redis-rs)
[server] Finished dev [unoptimized + debuginfo] target(s) in 3.29s
[server] Running `target/debug/server`
[server] 2023-03-26T18:35:23.329020Z INFO server: Listening on 0.0.0.0:6379
[client] $ redis-cli
[server] 2023-03-26T18:35:24.660161Z DEBUG server: Accepted connection from 127.0.0.1:49923
[server] 2023-03-26T18:35:24.660314Z INFO server: [127.0.0.1:49923] Accepted connection
[server] 2023-03-26T18:35:24.660388Z DEBUG server: [127.0.0.1:49923] Received 27 bytes
[server] 2023-03-26T18:35:24.660441Z DEBUG server: [127.0.0.1:49923 Received: COMMAND [String { value: "DOCS" }]
[client] 127.0.0.1:6379> SET test value EX 5
[server] 2023-03-26T18:35:30.420285Z DEBUG server: [127.0.0.1:49923] Received 49 bytes
[server] 2023-03-26T18:35:30.420417Z DEBUG server: [127.0.0.1:49923 Received: SET [String { value: "test" }, String { value: "value" }, String { value: "EX" }, String { value: "5" }]
[server] 2023-03-26T18:35:30.420585Z DEBUG server: Setting expiration for key test to Some(SystemTime { tv_sec: 1679891735, tv_nsec: 420567000 })
[client] "OK"
[client] 127.0.0.1:6379> GET test
[server] 2023-03-26T18:35:31.539128Z DEBUG server: [127.0.0.1:49923] Received 23 bytes
[server] 2023-03-26T18:35:31.539222Z DEBUG server: [127.0.0.1:49923 Received: GET [String { value: "test" }]
[client] "value"
[server] 2023-03-26T18:35:36.353723Z DEBUG server: Evicting test from keystore
[client] 127.0.0.1:6379> GET test
[server] 2023-03-26T18:35:40.465655Z DEBUG server: [127.0.0.1:49923] Received 23 bytes
[server] 2023-03-26T18:35:40.465713Z DEBUG server: [127.0.0.1:49923 Received: GET [String { value: "test" }]
[client] (nil)
[server] 2023-03-26T18:35:41.746856Z INFO server: [127.0.0.1:49923] Ending connection
Still need to figure out a better way to format that.
But in any case, we set test = value
with an expiration of 5 seconds. A GET test
within those five seconds returns value
and after: (nil)
. Nice!
So we’ve got eviction.
Cool.
A few more examples
That’s enough to write all of the string commands. Let’s do it. You can see them all in the full source, but I’ll point out a few interesting ones here:
DECR
m.insert("DECR", Command {
help: String::from("\
DECR key
Decrement the number stored at key by one.
If the key does not exist, it is set to 0 before performing the operation. An error is returned if the key contains a value of the wrong type or contains a string that can not be represented as integer. This operation is limited to 64 bit signed integers.
"),
f: Box::new(|state, args| {
assert_n_args!{args, 1};
let key = get_string_arg!(args, 0);
if let Some(current) = state.keystore.get_mut(&key) {
match current.parse::<i64>() {
Ok(value) => {
*current = (value - 1).to_string();
Ok(RedisType::Integer{ value: value - 1 })
},
Err(_) => Err(String::from("Value is not an integer or out of range")),
}
} else {
state.keystore.insert(key.clone(), "-1".to_owned());
Ok(RedisType::Integer{ value: -1 })
}
})
});
Primarily interesting because of the typing. If the value is already a string, cast it to an integer, subtract one, and cast it back. Ugly, but that’s how it must be.
127.0.0.1:6379> SET test 10
"OK"
127.0.0.1:6379> DECR test
(integer) 9
127.0.0.1:6379> DECR test
(integer) 8
127.0.0.1:6379> DECR test2
(integer) -1
GETRANGE
m.insert("GETRANGE", Command {
help: String::from("\
GETRANGE key start end
Get a substring of the string stored at a key."
),
f: Box::new(|state, args| {
assert_n_args!(args, 3);
let key = get_string_arg!(args, 0);
let mut start = get_integer_arg!(args, 1);
let mut end = get_integer_arg!(args, 2);
Ok(match state.keystore.get(&key) {
Some(value) => {
start = start.max(0).min(value.len() as i64 - 1);
end = end.max(0).min(value.len() as i64 - 1);
if start > end {
RedisType::String { value: String::new() }
} else {
RedisType::String { value: value[start as usize..end as usize].to_owned() }
}
},
None => RedisType::NullString,
})
})
});
Fetches substrings (although I think it’s supposed to work on arrays to? not yet anyways). I think the start.max(0).min(value.len() as i64 - 1)
is particularly neat.
MGET
m.insert("MGET", Command {
help: String::from("\
MGET key [key ...]
Get the values of all the given keys.
For every key that does not hold a string value or does not exist, the special value nil is returned.
"),
f: Box::new(|state, args| {
assert_n_or_more_args!(args, 1);
let mut values = Vec::new();
for i in 0..args.len() {
let key = get_string_arg!(args, i);
match state.keystore.get(&key) {
Some(value) => values.push(RedisType::String { value: value.to_owned() }),
None => values.push(RedisType::NullString),
}
}
Ok(RedisType::Array { value: values })
})
});
It’s pretty cool to be able to use get_string_arg!(args, i)
in a loop and it just works. And matching on the Option
of getting a value and pushing (nil)
if it’s not. Elegant.
Github Copilot
Full disclaimer time. Implementing all of those functions is a lot of busy work. I wasn’t particularly looking forward to it. But there’s been a big (and absolutely) fascinating push around AI/ML tools recently, so I decided to actually stop putting it off and give Github Copilot a try.
Holy crap.
That’s kind of awesome.
All I had to do was starting typing m.insert
and it would already somehow from context figure out that I was implementing Redis commands and start filling out the next one. Including help text (right in all but one case) and a description. Then in most cases, it would generate code for the function for me.
And the absolutely craziest part (to me)?
It used the helper macros that I’d already written.
So it not only understands the problem, but also understands the structure that I’ve already put in place.
Now don’t get me wrong, the code isn’t perfect. A few of them I’ve rewritten in part or in whole. In a few cases, it suggested additional macros (like for MSET
it wanted assert_even_args!
which, while cool, is just not necessary). And in a few cases, it just did something that wouldn’t work at all.
But for the moment… that’s really impressive.
Now there are also ethical concerns around Github Copilot: did Microsoft train it on private or GPL code? Probably. Will the code it generates have either no copywrite (if I don’t modify it) or step on someone else’s? Possibly. I feel like these are solvable problems though.
Am I still going to play with this more? Absolutely.
Full source
As always (although I don’t know if I’ve said it), the full source is on GitHub with tags available for the code at the state of each blog post.
Next steps
- Refactor commands into multiple files
- Implement another data type (sets, hashes, sorted sets, streams, etc)
- Implement persistance to disk / backups
- Improve the client to handle quoted strings
- Test the improved client against the official server
- Try redis-benchmark
So many things!