Intro
This is the first of two posts, where we will go over how to build a basic pub/sub server in Rust using Sonr.
By the end we’ll have a multi threaded pubsub server, where you can subscribe to channels, and publish plain text messages to other subscribers.
When it comes to network programming it seems like a chat is the equivalent of “Hello, world!”, but rather than making yet another chat I want to take a slightly different approach: a pub/sub server.
A direct link to the source for this can be found here, and every code listing with a in the top right corner has a link to the file on Github.
There are probably performance improvements that can be done to the code,
but I have tried to keep the code as simple as possible.
On my machine Intel(R) Core(TM) i7-6700K CPU @ 4.00GHz with 16gb
ram running the
send
benchmark I get an average of 2,423,872 messages ack’ed per second.
Setup
Create a new Rust project:
$ cargo new --bin pubsuband add the following dependencies to Cargo.toml:
|
|
We’ll use the bytes crate for passing bytes around, serde and serde_json for
message serialization/de-serialization and finally sonr for networking.
The next step is to add the following files to the src directory.
src/
codec.rs
connection.rs
lib.rs
main.rs
messages.rs
publisher.rs
subscriber.rs
timer.rs
Open up src/lib.rs and add every file listed above as a module except for
lib and main.
|
|
Note that we also added BUFFER_SIZE. This is the default buffer size we’ll use
for each connection’s read/write buffer, and for each subscriber to buffer messages to be written.
We will start editing the files in the order they are listed above, starting with
codec.rs.
Codec
Since messages are read and written as bytes we need a way to encode and decode
the messages from bytes to the concrete types defined in messages.rs.
Open up src/codec.rs in an editor and add the following code:
|
|
This will allow us to decode bytes into messages, and encode messages into
bytes.
Now that we can encode and decode messages we can move on to the connection.
Connection
We wrap every tcp connections for the subscribers and the publishers in a
Connection, to conveniently read and write messages.
The connection is a bit more complicated than the codec, so we will break it down into smaller pieces.
First we add all our use statements and define the connection struct.
|
|
The read_buffer is used to store data when reading from the tcp stream and the
write_buffer is where all the data that will eventually be written is stored,
and finally the stream is what we’ll use to read and write data over the
network.
Next up we will to add a new method to the Connection.
|
|
Then we need a method to read data from the stream into the read_buffer and
finally attempt to decode this data into something more tangible like a message.
|
|
We read data from the stream into the read buffer.
There are two unsafe blocks in this function and this is the only unsafe code
we will add:
self.read_buffer.bytes_mut() will return a mutable slice we can read into, and
self.read_buffer.set_len(n) sets the length of the buffer.
Once data has been placed in the mutable slice the length can be set to equal
the number of bytes read + existing length.
If we read N bytes we can start processing messages using the line codec we defined earlier.
In the event of a WouldBlock error we don’t treat this as an actual error,
rather a way for the connection to say that it can no longer perform read
operations until a readable event is triggered. More details on when such an
event is received when we create the publisher.
WouldBlock error.
In the case of any other error we will simply return Some(Err(()) and treat the connection as
broken.
This means we can keep calling recv for as long as it returns Some, however
if it’s an error (any error) the connection should be dropped.
This makes it possible to write code like
while let Some(res) = connection.recv() {
if res.is_err() {
// drop connection
return
}
}
The next method is write. This will attempt to write the contents of the write_buffer to
the stream.
|
|
This method resembles recv to some degree: if the underlying stream is not
writable we return None. Same applies if there is nothing in the write buffer.
Unlike reading, writing zero bytes does not imply that the connection was closed by the peer (trying to write to a closed socket would return an actual error).
For every successful write, the written part of the buffer can be removed by simply splitting the buffer and only keeping the unwritten data.
Finally the two remaining methods to add are add_payload and react.
|
|
add_payload simply adds the bytes to the write buffer. This keeps us from
having to make the write_buffer public.
react passes the Reaction to the stream. This is a convenience method
so we don’t have to make the stream public, however it’s worth noting what
react does under the hood:
Since stream is a ReactiveTcpStream, which is a type alias for
Stream<TcpStream>, we can find out more here:
sonr::net::stream::Stream
The first paragraph in the docs states:
When a
Streamreacts the inner evented reactor is marked as either readable and / or writable depending on theReadystate of theEvent.
Given this we now know that before recv or write can ever yield anything but
None we first need to call react with a Reaction. To be more precise:
with a Reaction::Event(event). More on this once we start building one of the
reactors: the publisher.
Now that we have a connection, we need something we can send and receive: messages!
Messages
We need three kinds of messages:
- Subscribe: subscribe to a channel
- PubMessage: a message published by a publisher to all subscribers
- AckMessage: once a
PubMessagehas been received by the server, we send an “ack” back.
Open src/messages.rs and add the following code.
|
|
There is not much to say about the messages as they are plain data types.
The last addition we’ll make in this part will be the Publisher.
Publisher
This is a big one, and the first reactor we’ll write.
We start by adding all the use statements and define the Publisher:
|
|
The publisher keeps track of all the connections that can publish messages:
connections: HashMap<Token, Connection>
The Connection is not new here, however the Token is.
A Token is what Mio (and subsequently Sonr) use to track which resource is
the recipient of an Event.
A token is just a Newtype wrapping a usize so you can look at the
token as a numerical id. Since an Event (more on what an event is when we get to the react
method) is a combination of a token and a
ready state which is either readable and / or writable, we can use said Event
to find which connection should be the recipient of the event in the form of a
Reaction::Event(Event). Therefore it makes sense to use the token as the key
for the HashMap tracking connections. This will become more apparent once we
implement the react method.
The next field is the broadcast:
broadcast: Broadcast<Bytes>
The broadcast is rather simple: it has recipients and a method to
publish data using channels (meaning data can be sent across thread boundaries).
In this case the broadcast can publish Bytes.
It’s important to note that any
data published by a broadcast has to implement Clone as the data has to be
cloned to be sent to multiple recipients.
It stands to reason that given the ownership model in Rust, multiple consumers can not receive and own the same data.
By buffering the messages we get less “chatty” network traffic which is less
performant.
The buffer_threshold is used to define when we should publish messages to all
subscribers. Once the size of publish_payload is more or equal to the
buffer_threshold the publisher should publish the buffer.
This leads us to the final field: timer.
In the event of the payload never reaching the threshold it would make for a poor experience if the messages were never delivered on account of no new messages arriving to fill the buffer. To remedy this we add a timer which will cause the publisher to try to publish messages if there are any in the buffer.
Let’s add a new method to the publisher.
|
|
We’ll see more about the timer later when we implement it in part two, but know
that we have to create the ReactiveTimerNotifier in the same thread as the
Publisher was created as it’s not possible to send reactors across thread
boundaries.
Now for the final (and quite large) bit of code: making the publisher into a reactor - a Sonr object which reacts to events and input. For more information see “an introduction to Sonr”.
First set the input and output types of the reactor:
|
|
Since we don’t intend to return anything from the publisher we can set the output to return a unit.
As for our input we will set that to be a TcpStream since the publisher will
eventually be connected to the output of a work stealing queue that produces tcp
streams.
tcp listener queue deque publisher
We can now start implementing the react method:
|
|
The first variance we’ll deal with is the Value. This means we’ll get a tcp
stream as the type Input = TcpStream;
The tcp stream it self is not a reactor so we wrap it in a ReactiveTcpStream.
This will register the stream with the System (more on this in part two),
track the state of the tcp stream, whether it’s readable and / or writable, and
finally it will notify the System in the event of a blocking call and
automatically re-register for new events.
In other words: we can read from and write to the stream until it blocks and not have to worry about tracking the state of the stream.
We can also deal directly with the last variance now: Continue, as all this
does is return Continue.
Value, and if the reactor can return
more than one value, like a queue, then the Continue variance has a bigger
impact.
Finally we can deal with the last variance, Event(Event), and reach the end of the first part.
This is a big one as it deals with both the timer and the connections as they both respond to events.
|
|
Let’s start with the timer:
As mentioned above, an event is a combination of a ready state (readable / writable) and a token. So the first thing we do is check if the event belongs to one of our connections or the timer.
If it belongs to the timer (which is a type alias for ReactiveSignalReceiver):
- Get the value out and instantly drop it by assigning it to
_. - Check if the publish payload is empty…
- … and if it’s not we drain it and publish it.
We then move on to the connections. There is a chance the event doesn’t
belong to one of the connections either, in that case we simply return
Reaction::Event(event) (or in this case: event.into() as From<Event>
is implemented for Reaction<T>).
If the event belongs to one of the connections the first thing we do is call
react(event.into()) on the connection. As mentioned before this marks the
connection as readable / writable.
Since the connection doesn’t return anything of value we can simply ignore the
return value from the react call.
Now we can keep reading messages from the connection until it either blocks or error out.
We’ll serialize every message and put it in the publish payload buffer. This
might seem wasteful to first de serialize each message and then serialize it
again, however this ensures that we are actually publishing PubMessages and
not some other type, or indeed, rubbish data.
For every message we receive we also put an AckMessage on the write buffer
of the connection. We can use this later to write a publisher that counts number
of ack’ed messages by the server.
If the connection errors out we publish whatever is in the buffer and then remove the connection.
Once we have received all the messages we’ll get from the connection during the
execution of the reaction we check if the publish payload is bigger than the
threshold, and if that is the case we publish the buffer.
Then we need to actually tell the connection to write all the ack messages we
put in the write buffer, and we do this by calling con.write() until it no
longer has anything to write (or errors out, in which case ew simply remove it).
Finally we return Continue to signal that we have nothing left to do.
This is all for this part. In the second part we’ll create the subscriber and the timer and finally put it all together.