Intro
This is part 2 of “Building a pub/sub server with Sonr”.
Part 1 is available here.
Publisher
We will jump straight in building the publisher. This is the biggest piece of code so far in this project.
|
|
The connections
field is the same as on the publisher in part 1, and channels
consists of a
channel name and a list of tokens representing the connection subscribed to that
channel.
Using this we can see what channel a message belongs to and find all the connection tokens in that channel and add the message to the connection’s write buffer.
The message_buffer
holds all incoming messages before they are serialized into
PubMessage
s.
Finally we have messages
: a ReactiveSignalReceiver<Bytes>
.
At the other end in the publisher there is a signal sender: Broadcast
.
Every time the broadcast publish data it also triggers an event for the
corresponding receiver.
A ReactiveSignalReceiver<T>
is notified by an Event
when it can try to receive T
,
at which point it should keep trying until it receives nothing.
We add a new
method for convenience.
|
|
Next up is the publish
method. Here we will continuously decode the message
buffer in a loop until we have no messages left in the buffer, find which
channel the message belongs to and write that message to every connection’s
write buffer in that channel.
|
|
In the event of an error the connection is not only removed from connections
but also unsubscribed from all the channels.
Tracking channels and connections using only a hash map is not the most efficient way. It’s quick to look up all connections in a channel, however when it comes to removing a connection from a channel we have to scan every single channel for that connection.
This would be more noticeable in a situation with many channels and several connections unsubscribing frequently.
Before we tackle the biggest method in this entire project we add the ability to unsubscribe connections and put data into the message buffer.
|
|
Finally we implement Reactor
for the subscriber. Just like the publisher we
have a TcpStream
as the Input
and since we don’t intend to return a value
from this reactor as it’s the last reactor in the chain, we can set the Output
to a unit.
|
|
Just like with the publisher we’ll break down the react
method into two
blocks, starting with the Value
and Continue
variance:
There is not much to be said here as it’s exactly the same as the publisher.
|
|
The final variance is the Event(Event)
. The first thing we do is check if the
event belongs to the receiver. This is similar to the subscriber except in this
instance we care about the output as it’s the message payload we want to write to the
corresponding connections.
If the event does not belong to messages
we check if it belongs to
connections
.
This is quite similar to the publisher, except rather than reading PubMessage
s
to be sent to subscribers, we read Subscribe
messages to know which channel
the subscriber wants to subscribe to.
We also need to unsubscribe the connection if an error occur.
|
|
The only thing remaining to add before we put all this together is the timer mentioned in part 1. The one that keeps publish messages in case the buffer never fills up.
We only need one timer instance as it will deal with all publishers.
The timer is quite basic: one thread is spawned with a loop that sleeps for the duration given before notifying all the receivers.
We make two type aliases for the notifier to emphasise that these belong to the timer.
ReactiveTimerNotifier
TimerNotifier
They are in fact nothing more than signal receivers, like the one used to send
data from the Publisher
to the Subscriber
.
In this case we are not interested in the data it self, but rather when we get
the data so we can publish anything unpublished in the buffer.
|
|
There are two methods to add here. The receiver
method call will create a
bounded signal receiver, meaning it can hold a maximum of N values (in this
case: 1) at a time. We will pass this receiver to the publisher when calling
Publisher::new
.
Finally there is start
which will consume self
and spawn the thread.
This will keep running until the program exits.
|
|
Now that the timer is complete there on only main.rs left!
Main: putting it all together
We’ll start by adding all the use statements on top and a convenience function for creating a tcp listener:
To keep the code more compact I’ve used Sonr::prelude
. To find out what’s in
prelude: sonr::prelude
.
|
|
The listener
function creates a tcp listener bound to a specific address.
We call map
on the listener to change the output from (TcpStream,
SocketAddr)
to TcpStream
.
Now we can layout how the program will actually look:
For the publisher:
tcp listener connection queue threads connection dequeue publisher
The output of the tcp listener is the input of the connection queue.
Since the connection queue sends data over a channel the Input
of the queue is
the Output
of the dequeue.
We already know the input of the publisher: a TcpStream
.
Thinking about the Input
and Output
of the reactors makes it easier to put
together a program by chaining the relevant reactors together. It also makes it
trivial to inject reactors anywhere in the chain as long as the input and output
types match.
The subscriber follows the same flow:
tcp listener connection queue threads connection dequeue subscriber
Given this it would be easy to replace the tcp listener with a unix domain
socket:
change the listener
function to return a ReactiveUdsListener
instead. Change the Input
of both the
publisher and subscriber to a UnixStream
, and finally change Connection
to
use a ReactiveUdsStream
instead of a ReactiveTcpStream
.
Finally here is the main function:
|
|
There is a lot to take in here. The first thing when using Reactor
s is to
initialise the System
. Without the System
evented reactors can not be
created and would in fact panic. More about the System
can be found in the
introduction to Sonr in part 1.
We create both the broadcast and the timer in the main thread as we want to clone them for each thread.
We could create a new timer for each thread, but since the timer it self will create a thread to not block we would end up with twice as many threads, which would be unnecessary and considered an expensive operation.
If we created a broadcast for each thread then messages would not be published to every subscriber, only the subscriber in the same thread as the publisher, making the program incorrect.
Once we have created all the threads we need we can start the timer.
Finally we start each system by combining reactors with and
.
This means that both pub_run.and(sub_run)
will receive events.
Final notes
I ended up writing a quick benchmark to count the number of “ack”ed messages per second by the publisher. It can be found here.
There are probably cleverer things that could done to improve performance with regards to passing the messages around, however this is more an introduction to Sonr than an exercise is building a highly optimised pubsub solution.