Building a pub/sub server with Sonr (part 1) 10 Apr 2019

Intro

This is the first of two posts, where we will go over how to build a basic pub/sub server in Rust using Sonr.

By the end we’ll have a multi threaded pubsub server, where you can subscribe to channels, and publish plain text messages to other subscribers.

When it comes to network programming it seems like a chat is the equivalent of “Hello, world!”, but rather than making yet another chat I want to take a slightly different approach: a pub/sub server.

Assumptions about the reader: you should be somewhat familiar with the Rust programming language. If you are not I recommend The Book.

For an introduction to Sonr see this post.

A direct link to the source for this can be found here, and every code listing with a in the top right corner has a link to the file on Github.

There are probably performance improvements that can be done to the code, but I have tried to keep the code as simple as possible. On my machine Intel(R) Core(TM) i7-6700K CPU @ 4.00GHz with 16gb ram running the send benchmark I get an average of 2,423,872 messages ack’ed per second.

Setup

Create a new Rust project:

$ cargo new --bin pubsub

and add the following dependencies to Cargo.toml:

1
2
3
4
5
[dependencies]
sonr = { git = "https://github.com/hagsteel/sonr" }
serde = { version = "1.0.90", features = ["derive"] }
serde_json = "1.0.39"
bytes = "0.4.12"

We’ll use the bytes crate for passing bytes around, serde and serde_json for message serialization/de-serialization and finally sonr for networking.

The next step is to add the following files to the src directory.

src/
    codec.rs
    connection.rs
    lib.rs
    main.rs
    messages.rs
    publisher.rs
    subscriber.rs
    timer.rs

Open up src/lib.rs and add every file listed above as a module except for lib and main.

src/lib.rs
1
2
3
4
5
6
7
8
pub mod codec;
pub mod connection;
pub mod messages;
pub mod publisher;
pub mod subscriber;
pub mod timer;

const BUFFER_SIZE: usize = 1024 * 8;
By making the modules public we can use them later if we want to write benchmarks or clients etc.

Note that we also added BUFFER_SIZE. This is the default buffer size we’ll use for each connection’s read/write buffer, and for each subscriber to buffer messages to be written.

We will start editing the files in the order they are listed above, starting with codec.rs.

Codec

Since messages are read and written as bytes we need a way to encode and decode the messages from bytes to the concrete types defined in messages.rs.

Open up src/codec.rs in an editor and add the following code:

src/codec.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
use bytes::{Bytes, BytesMut};
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::error::Result;

use crate::BUFFER_SIZE;

pub struct LineCodec;

impl LineCodec {
    pub fn decode<T: DeserializeOwned>(buf: &mut BytesMut) -> Option<T> {
        let p = buf.iter().position(|b| b == &b'\n');
        match p {
            None => None,
            Some(n) => {
                let res = serde_json::from_slice(&buf.split_to(n).freeze()).ok();
                buf.advance(1); // Skip the newline char
                buf.reserve(BUFFER_SIZE); // Make sure the buffer can hold more data
                res
            }
        }
    }

    pub fn encode<T: Serialize>(t: T) -> Result<Bytes> {
        let mut payload = serde_json::to_vec(&t)?;
        payload.push(b'\n');
        Ok(Bytes::from(payload))
    }
}

This will allow us to decode bytes into messages, and encode messages into bytes.

Now that we can encode and decode messages we can move on to the connection.

Connection

We wrap every tcp connections for the subscribers and the publishers in a Connection, to conveniently read and write messages.

The connection is a bit more complicated than the codec, so we will break it down into smaller pieces.

First we add all our use statements and define the connection struct.

src/connection.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
use bytes::{Bytes, BufMut, BytesMut};
use serde::de::DeserializeOwned;
use sonr::net::tcp::ReactiveTcpStream;
use sonr::reactor::{Reaction, Reactor};
use std::io::{ErrorKind::WouldBlock, Read, Write};

use crate::codec::LineCodec;
use crate::BUFFER_SIZE;

pub struct Connection {
    stream: ReactiveTcpStream,
    read_buffer: BytesMut,
    write_buffer: BytesMut,
}

The read_buffer is used to store data when reading from the tcp stream and the write_buffer is where all the data that will eventually be written is stored, and finally the stream is what we’ll use to read and write data over the network.

Next up we will to add a new method to the Connection.

src/connection.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
impl Connection {
    pub fn new(stream: ReactiveTcpStream) -> Self {
        Self {
            stream,
            read_buffer: BytesMut::with_capacity(BUFFER_SIZE),
            write_buffer: BytesMut::with_capacity(BUFFER_SIZE),
        }
    }

    // ...
}

Then we need a method to read data from the stream into the read_buffer and finally attempt to decode this data into something more tangible like a message.

src/connection.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
impl Connection {
    // ...

    pub fn recv<T: DeserializeOwned>(&mut self) -> Option<Result<Vec<T>, ()>> {
        if !self.stream.readable() {
            return None;
        }

        let res = {
            let mut b = unsafe { self.read_buffer.bytes_mut() };
            self.stream.read(&mut b)
        };

        match res {
            // The connection was closed by the peer.
            Ok(0) => Some(Err(())),

            // Try to decode messages from the read data
            Ok(n) => {
                let buf_len = self.read_buffer.len() + n;
                unsafe { self.read_buffer.set_len(buf_len); }

                let mut v = Vec::new();
                while let Some(val) = LineCodec::decode(&mut self.read_buffer) {
                    v.push(val);
                }

                Some(Ok(v))
            }
            
            // Not an actual error
            Err(ref e) if e.kind() == WouldBlock => None,

            // Connection closed. Ignoring the reason
            // for simplicity
            Err(_) => Some(Err(())),
        }
    }
}

We read data from the stream into the read buffer.

There are two unsafe blocks in this function and this is the only unsafe code we will add:

self.read_buffer.bytes_mut() will return a mutable slice we can read into, and self.read_buffer.set_len(n) sets the length of the buffer. Once data has been placed in the mutable slice the length can be set to equal the number of bytes read + existing length.

If we read N bytes we can start processing messages using the line codec we defined earlier.

In the event of a WouldBlock error we don’t treat this as an actual error, rather a way for the connection to say that it can no longer perform read operations until a readable event is triggered. More details on when such an event is received when we create the publisher.

It’s important to keep reading from the connection until it blocks, as no new read events will be issued until the connection returns a WouldBlock error.

In the case of any other error we will simply return Some(Err(()) and treat the connection as broken.

This means we can keep calling recv for as long as it returns Some, however if it’s an error (any error) the connection should be dropped.

This makes it possible to write code like

while let Some(res) = connection.recv() {
    if res.is_err() {
        // drop connection
        return
    }
}

The next method is write. This will attempt to write the contents of the write_buffer to the stream.

src/connection.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
impl Connection {
    // ...

    pub fn write(&mut self) -> Option<Result<usize, ()>> {
        if !self.stream.writable() {
            return None
        } 

        if self.write_buffer.is_empty() {
            return None
        }

        match self.stream.write(&self.write_buffer) {
            Ok(n) => {
                self.write_buffer.split_to(n); // Remove sent data
                Some(Ok(n))
            }
            Err(ref e) if e.kind() == WouldBlock => None,
            Err(_) => Some(Err(())),
        }
    }

This method resembles recv to some degree: if the underlying stream is not writable we return None. Same applies if there is nothing in the write buffer.

Unlike reading, writing zero bytes does not imply that the connection was closed by the peer (trying to write to a closed socket would return an actual error).

For every successful write, the written part of the buffer can be removed by simply splitting the buffer and only keeping the unwritten data.

Finally the two remaining methods to add are add_payload and react.

src/connection.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
impl Connection {
    pub fn add_payload(&mut self, payload: Bytes) {
        if payload.len() > self.write_buffer.remaining_mut() {
            self.write_buffer.reserve(payload.len());
        }
        self.write_buffer.put_slice(&payload);
    }

    // Convenience, saving us from having to make the stream public
    pub fn react(&mut self, reaction: Reaction<()>) -> Reaction<()> {
        self.stream.react(reaction)
    }
}

add_payload simply adds the bytes to the write buffer. This keeps us from having to make the write_buffer public.

react passes the Reaction to the stream. This is a convenience method so we don’t have to make the stream public, however it’s worth noting what react does under the hood:

Since stream is a ReactiveTcpStream, which is a type alias for Stream<TcpStream>, we can find out more here: sonr::net::stream::Stream

The first paragraph in the docs states:

When a Stream reacts the inner evented reactor is marked as either readable and / or writable depending on the Ready state of the Event.

Given this we now know that before recv or write can ever yield anything but None we first need to call react with a Reaction. To be more precise: with a Reaction::Event(event). More on this once we start building one of the reactors: the publisher.

Now that we have a connection, we need something we can send and receive: messages!

Messages

We need three kinds of messages:

  • Subscribe: subscribe to a channel
  • PubMessage: a message published by a publisher to all subscribers
  • AckMessage: once a PubMessage has been received by the server, we send an “ack” back.

Open src/messages.rs and add the following code.

src/messages.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct PubMessage {
    pub channel: String,
    pub payload: String,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct AckMessage {
    ack: bool,
}

impl AckMessage {
    pub fn new() -> Self {
        Self { ack: true }
    }
}

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct Subscribe {
    pub channel: String,
}

There is not much to say about the messages as they are plain data types.

The last addition we’ll make in this part will be the Publisher.

Publisher

This is a big one, and the first reactor we’ll write.

We start by adding all the use statements and define the Publisher:

src/publisher.rs.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
use std::collections::HashMap;

use sonr::reactor::{Reactor, Reaction};
use sonr::sync::broadcast::Broadcast;
use sonr::net::tcp::{ReactiveTcpStream, TcpStream};
use sonr::Token;
use sonr::errors::Result;
use bytes::{Bytes, BytesMut, BufMut};

use crate::connection::Connection;
use crate::codec::LineCodec;
use crate::messages::{PubMessage, AckMessage};
use crate::timer::{TimerNotifier, ReactiveTimerNotifier};


pub struct Publisher {
    connections: HashMap<Token, Connection>,
    broadcast: Broadcast<Bytes>,
    buffer_threshold: usize, // buffer messages
    publish_payload: BytesMut,
    timer: ReactiveTimerNotifier,
}

The publisher keeps track of all the connections that can publish messages:

connections: HashMap<Token, Connection>

The Connection is not new here, however the Token is.

A Token is what Mio (and subsequently Sonr) use to track which resource is the recipient of an Event.

A token is just a Newtype wrapping a usize so you can look at the token as a numerical id. Since an Event (more on what an event is when we get to the react method) is a combination of a token and a ready state which is either readable and / or writable, we can use said Event to find which connection should be the recipient of the event in the form of a Reaction::Event(Event). Therefore it makes sense to use the token as the key for the HashMap tracking connections. This will become more apparent once we implement the react method.

The next field is the broadcast:

broadcast: Broadcast<Bytes>

The broadcast is rather simple: it has recipients and a method to publish data using channels (meaning data can be sent across thread boundaries). In this case the broadcast can publish Bytes.

It’s important to note that any data published by a broadcast has to implement Clone as the data has to be cloned to be sent to multiple recipients.

It stands to reason that given the ownership model in Rust, multiple consumers can not receive and own the same data.

By buffering the messages we get less “chatty” network traffic which is less performant. The buffer_threshold is used to define when we should publish messages to all subscribers. Once the size of publish_payload is more or equal to the buffer_threshold the publisher should publish the buffer.

This leads us to the final field: timer.

In the event of the payload never reaching the threshold it would make for a poor experience if the messages were never delivered on account of no new messages arriving to fill the buffer. To remedy this we add a timer which will cause the publisher to try to publish messages if there are any in the buffer.

Let’s add a new method to the publisher.

src/publisher.rs.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
impl Publisher {
    pub fn new(broadcast: Broadcast<Bytes>, buffer_threshold: usize, timer: TimerNotifier) -> Result<Self> {
        let timer = ReactiveTimerNotifier::new(timer)?;

        Ok(Self {
            connections: HashMap::new(),
            broadcast,
            buffer_threshold,
            publish_payload: BytesMut::with_capacity(buffer_threshold * 2),
            timer,
        })
    }
}

We’ll see more about the timer later when we implement it in part two, but know that we have to create the ReactiveTimerNotifier in the same thread as the Publisher was created as it’s not possible to send reactors across thread boundaries.

Now for the final (and quite large) bit of code: making the publisher into a reactor - a Sonr object which reacts to events and input. For more information see “an introduction to Sonr”.

First set the input and output types of the reactor:

src/publisher.rs.rs
1
2
3
4
5
6
impl Reactor for Publisher {
    type Input = TcpStream;
    type Output = ();

    // ...
}

Since we don’t intend to return anything from the publisher we can set the output to return a unit.

As for our input we will set that to be a TcpStream since the publisher will eventually be connected to the output of a work stealing queue that produces tcp streams.

tcp listener queue deque publisher

We can now start implementing the react method:

src/publisher.rs.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
impl Reactor for Publisher {
    // ...

    fn react(&mut self, reaction: Reaction<Self::Input>) -> Reaction<Self::Output> {
        use Reaction::*;
        match reaction {
            Value(stream) => {
                if let Ok(stream) = ReactiveTcpStream::new(stream) {
                    self.connections.insert(stream.token(), Connection::new(stream));
                }
                Continue
            }
            Event(event) => // ...
            Continue => Continue,
        }
    }
}

The first variance we’ll deal with is the Value. This means we’ll get a tcp stream as the type Input = TcpStream; The tcp stream it self is not a reactor so we wrap it in a ReactiveTcpStream.

This will register the stream with the System (more on this in part two), track the state of the tcp stream, whether it’s readable and / or writable, and finally it will notify the System in the event of a blocking call and automatically re-register for new events.

In other words: we can read from and write to the stream until it blocks and not have to worry about tracking the state of the stream.

We can also deal directly with the last variance now: Continue, as all this does is return Continue.

Note that some reactors might return a Value, and if the reactor can return more than one value, like a queue, then the Continue variance has a bigger impact.

Finally we can deal with the last variance, Event(Event), and reach the end of the first part.

This is a big one as it deals with both the timer and the connections as they both respond to events.

src/publisher.rs.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
impl Reactor for Publisher {
    // ...

    fn react(&mut self, reaction: Reaction<Self::Input>) -> Reaction<Self::Output> {
        use Reaction::*;
        match reaction {
            Value(stream) => // ...

            Event(event) => {
                // Timer tick event:
                if event.token() == self.timer.token() {
                    // We can ignore the result as it's simply a unit,
                    // however we should get the result out to make room 
                    // for the next result.
                    let _ = self.timer.try_recv();

                    // Only publish if we have actual data
                    if !self.publish_payload.is_empty() {
                        self.broadcast.publish(self.publish_payload.take().freeze());
                    }
                }

                // Connection event:
                if let Some(con) = self.connections.get_mut(&event.token()) {
                    con.react(event.into()); // Mark the underlying stream as readable / writable

                    // Read messages to publish.
                    while let Some(msg_result) = con.recv::<PubMessage>() {
                        match msg_result {
                            Ok(messages) => {
                                for message in messages {
                                    if let Ok(bytes) = LineCodec::encode(&message) {
                                        if bytes.len() > self.publish_payload.remaining_mut() {
                                            self.publish_payload.reserve(self.buffer_threshold);
                                        }
                                        self.publish_payload.put_slice(&bytes);
                                    }

                                    // ack message
                                    let _ = LineCodec::encode(AckMessage::new())
                                        .map(|payload| con.add_payload(payload));
                                }
                            }
                            Err(_) => {
                                self.connections.remove(&event.token());
                                
                                // Publish the payload
                                self.broadcast.publish(self.publish_payload.take().freeze());
                                return Continue
                            }
                        }
                    }

                    // If enough data is buffered then publish the messages.
                    if self.publish_payload.len() >= self.buffer_threshold {
                        self.broadcast.publish(self.publish_payload.take().freeze());
                    }

                    // Write all ack messages
                    while let Some(wrt_res) = con.write() {
                        if wrt_res.is_err() {
                            self.connections.remove(&event.token());
                            return Continue
                        }
                    }

                    Continue
                } else {
                    event.into()
                }
            }

            Continue => Continue,
        }
    }
}

Let’s start with the timer:

As mentioned above, an event is a combination of a ready state (readable / writable) and a token. So the first thing we do is check if the event belongs to one of our connections or the timer.

If it belongs to the timer (which is a type alias for ReactiveSignalReceiver):

  • Get the value out and instantly drop it by assigning it to _.
  • Check if the publish payload is empty…
  • … and if it’s not we drain it and publish it.

We then move on to the connections. There is a chance the event doesn’t belong to one of the connections either, in that case we simply return Reaction::Event(event) (or in this case: event.into() as From<Event> is implemented for Reaction<T>).

If the event belongs to one of the connections the first thing we do is call react(event.into()) on the connection. As mentioned before this marks the connection as readable / writable.

Since the connection doesn’t return anything of value we can simply ignore the return value from the react call.

Now we can keep reading messages from the connection until it either blocks or error out.

We’ll serialize every message and put it in the publish payload buffer. This might seem wasteful to first de serialize each message and then serialize it again, however this ensures that we are actually publishing PubMessages and not some other type, or indeed, rubbish data.

For every message we receive we also put an AckMessage on the write buffer of the connection. We can use this later to write a publisher that counts number of ack’ed messages by the server.

If the connection errors out we publish whatever is in the buffer and then remove the connection.

Once we have received all the messages we’ll get from the connection during the execution of the reaction we check if the publish payload is bigger than the threshold, and if that is the case we publish the buffer.

Then we need to actually tell the connection to write all the ack messages we put in the write buffer, and we do this by calling con.write() until it no longer has anything to write (or errors out, in which case ew simply remove it).

Finally we return Continue to signal that we have nothing left to do.

This is all for this part. In the second part we’ll create the subscriber and the timer and finally put it all together.

Part 2