Intro
This is the first of two posts, where we will go over how to build a basic pub/sub server in Rust using Sonr.
By the end we’ll have a multi threaded pubsub server, where you can subscribe to channels, and publish plain text messages to other subscribers.
When it comes to network programming it seems like a chat is the equivalent of “Hello, world!”, but rather than making yet another chat I want to take a slightly different approach: a pub/sub server.
A direct link to the source for this can be found here, and every code listing with a in the top right corner has a link to the file on Github.
There are probably performance improvements that can be done to the code,
but I have tried to keep the code as simple as possible.
On my machine Intel(R) Core(TM) i7-6700K CPU @ 4.00GHz
with 16gb
ram running the
send
benchmark I get an average of 2,423,872 messages ack’ed per second.
Setup
Create a new Rust project:
$ cargo new --bin pubsub
and add the following dependencies to Cargo.toml:
|
|
We’ll use the bytes
crate for passing bytes around, serde
and serde_json
for
message serialization/de-serialization and finally sonr
for networking.
The next step is to add the following files to the src
directory.
src/
codec.rs
connection.rs
lib.rs
main.rs
messages.rs
publisher.rs
subscriber.rs
timer.rs
Open up src/lib.rs
and add every file listed above as a module except for
lib
and main
.
|
|
Note that we also added BUFFER_SIZE
. This is the default buffer size we’ll use
for each connection’s read/write buffer, and for each subscriber to buffer messages to be written.
We will start editing the files in the order they are listed above, starting with
codec.rs
.
Codec
Since messages are read and written as bytes we need a way to encode and decode
the messages from bytes to the concrete types defined in messages.rs
.
Open up src/codec.rs
in an editor and add the following code:
|
|
This will allow us to decode
bytes into messages, and encode
messages into
bytes.
Now that we can encode and decode messages we can move on to the connection.
Connection
We wrap every tcp connections for the subscribers and the publishers in a
Connection
, to conveniently read and write messages.
The connection is a bit more complicated than the codec, so we will break it down into smaller pieces.
First we add all our use
statements and define the connection struct
.
|
|
The read_buffer
is used to store data when reading from the tcp stream and the
write_buffer
is where all the data that will eventually be written is stored,
and finally the stream
is what we’ll use to read and write data over the
network.
Next up we will to add a new
method to the Connection
.
|
|
Then we need a method to read data from the stream
into the read_buffer
and
finally attempt to decode this data into something more tangible like a message.
|
|
We read data from the stream into the read buffer.
There are two unsafe
blocks in this function and this is the only unsafe code
we will add:
self.read_buffer.bytes_mut()
will return a mutable slice we can read into, and
self.read_buffer.set_len(n)
sets the length of the buffer.
Once data has been placed in the mutable slice the length can be set to equal
the number of bytes read + existing length.
If we read N bytes we can start processing messages using the line codec we defined earlier.
In the event of a WouldBlock
error we don’t treat this as an actual error,
rather a way for the connection to say that it can no longer perform read
operations until a readable
event is triggered. More details on when such an
event is received when we create the publisher.
WouldBlock
error.
In the case of any other error we will simply return Some(Err(())
and treat the connection as
broken.
This means we can keep calling recv
for as long as it returns Some
, however
if it’s an error (any error) the connection should be dropped.
This makes it possible to write code like
while let Some(res) = connection.recv() {
if res.is_err() {
// drop connection
return
}
}
The next method is write
. This will attempt to write the contents of the write_buffer to
the stream.
|
|
This method resembles recv
to some degree: if the underlying stream is not
writable we return None
. Same applies if there is nothing in the write buffer.
Unlike reading, writing zero bytes does not imply that the connection was closed by the peer (trying to write to a closed socket would return an actual error).
For every successful write, the written part of the buffer can be removed by simply splitting the buffer and only keeping the unwritten data.
Finally the two remaining methods to add are add_payload
and react
.
|
|
add_payload
simply adds the bytes to the write buffer. This keeps us from
having to make the write_buffer
public.
react
passes the Reaction
to the stream. This is a convenience method
so we don’t have to make the stream public, however it’s worth noting what
react
does under the hood:
Since stream
is a ReactiveTcpStream
, which is a type alias for
Stream<TcpStream>
, we can find out more here:
sonr::net::stream::Stream
The first paragraph in the docs states:
When a
Stream
react
s the inner evented reactor is marked as either readable and / or writable depending on theReady
state of theEvent
.
Given this we now know that before recv
or write
can ever yield anything but
None
we first need to call react
with a Reaction
. To be more precise:
with a Reaction::Event(event)
. More on this once we start building one of the
reactors: the publisher.
Now that we have a connection, we need something we can send and receive: messages!
Messages
We need three kinds of messages:
- Subscribe: subscribe to a channel
- PubMessage: a message published by a publisher to all subscribers
- AckMessage: once a
PubMessage
has been received by the server, we send an “ack” back.
Open src/messages.rs
and add the following code.
|
|
There is not much to say about the messages as they are plain data types.
The last addition we’ll make in this part will be the Publisher
.
Publisher
This is a big one, and the first reactor we’ll write.
We start by adding all the use statements and define the Publisher
:
|
|
The publisher keeps track of all the connections that can publish messages:
connections: HashMap<Token, Connection>
The Connection
is not new here, however the Token
is.
A Token
is what Mio (and subsequently Sonr) use to track which resource is
the recipient of an Event
.
A token is just a Newtype wrapping a usize
so you can look at the
token as a numerical id. Since an Event
(more on what an event is when we get to the react
method) is a combination of a token and a
ready state which is either readable and / or writable, we can use said Event
to find which connection should be the recipient of the event in the form of a
Reaction::Event(Event)
. Therefore it makes sense to use the token as the key
for the HashMap
tracking connections. This will become more apparent once we
implement the react
method.
The next field is the broadcast:
broadcast: Broadcast<Bytes>
The broadcast is rather simple: it has recipients and a method to
publish data using channels (meaning data can be sent across thread boundaries).
In this case the broadcast can publish Bytes
.
It’s important to note that any
data published by a broadcast has to implement Clone
as the data has to be
cloned to be sent to multiple recipients.
It stands to reason that given the ownership model in Rust, multiple consumers can not receive and own the same data.
By buffering the messages we get less “chatty” network traffic which is less
performant.
The buffer_threshold
is used to define when we should publish messages to all
subscribers. Once the size of publish_payload
is more or equal to the
buffer_threshold
the publisher should publish the buffer.
This leads us to the final field: timer
.
In the event of the payload never reaching the threshold it would make for a poor experience if the messages were never delivered on account of no new messages arriving to fill the buffer. To remedy this we add a timer which will cause the publisher to try to publish messages if there are any in the buffer.
Let’s add a new
method to the publisher.
|
|
We’ll see more about the timer later when we implement it in part two, but know
that we have to create the ReactiveTimerNotifier
in the same thread as the
Publisher
was created as it’s not possible to send reactors across thread
boundaries.
Now for the final (and quite large) bit of code: making the publisher into a reactor - a Sonr object which reacts to events and input. For more information see “an introduction to Sonr”.
First set the input and output types of the reactor:
|
|
Since we don’t intend to return anything from the publisher we can set the output to return a unit.
As for our input we will set that to be a TcpStream
since the publisher will
eventually be connected to the output of a work stealing queue that produces tcp
streams.
tcp listener queue deque publisher
We can now start implementing the react
method:
|
|
The first variance we’ll deal with is the Value
. This means we’ll get a tcp
stream as the type Input = TcpStream;
The tcp stream it self is not a reactor so we wrap it in a ReactiveTcpStream
.
This will register the stream with the System
(more on this in part two),
track the state of the tcp stream, whether it’s readable and / or writable, and
finally it will notify the System
in the event of a blocking call and
automatically re-register for new events.
In other words: we can read from and write to the stream until it blocks and not have to worry about tracking the state of the stream.
We can also deal directly with the last variance now: Continue
, as all this
does is return Continue
.
Value
, and if the reactor can return
more than one value, like a queue, then the Continue
variance has a bigger
impact.
Finally we can deal with the last variance, Event(Event)
, and reach the end of the first part.
This is a big one as it deals with both the timer and the connections as they both respond to events.
|
|
Let’s start with the timer:
As mentioned above, an event is a combination of a ready state (readable / writable) and a token. So the first thing we do is check if the event belongs to one of our connections or the timer.
If it belongs to the timer (which is a type alias for ReactiveSignalReceiver
):
- Get the value out and instantly drop it by assigning it to
_
. - Check if the publish payload is empty…
- … and if it’s not we drain it and publish it.
We then move on to the connections. There is a chance the event doesn’t
belong to one of the connections either, in that case we simply return
Reaction::Event(event)
(or in this case: event.into()
as From<Event>
is implemented for Reaction<T>
).
If the event belongs to one of the connections the first thing we do is call
react(event.into())
on the connection. As mentioned before this marks the
connection as readable / writable.
Since the connection doesn’t return anything of value we can simply ignore the
return value from the react
call.
Now we can keep reading messages from the connection until it either blocks or error out.
We’ll serialize every message and put it in the publish payload buffer. This
might seem wasteful to first de serialize each message and then serialize it
again, however this ensures that we are actually publishing PubMessage
s and
not some other type, or indeed, rubbish data.
For every message we receive we also put an AckMessage
on the write buffer
of the connection. We can use this later to write a publisher that counts number
of ack’ed messages by the server.
If the connection errors out we publish whatever is in the buffer and then remove the connection.
Once we have received all the messages we’ll get from the connection during the
execution of the reaction
we check if the publish payload is bigger than the
threshold, and if that is the case we publish the buffer.
Then we need to actually tell the connection to write all the ack messages we
put in the write buffer, and we do this by calling con.write()
until it no
longer has anything to write (or errors out, in which case ew simply remove it).
Finally we return Continue
to signal that we have nothing left to do.
This is all for this part. In the second part we’ll create the subscriber and the timer and finally put it all together.