This is part 2 of “Building a pub/sub server with Sonr”.
Part 1 is available here.
We will jump straight in building the publisher. This is the biggest piece of code so far in this project.
connections field is the same as on the publisher in part 1, and
channels consists of a
channel name and a list of tokens representing the connection subscribed to that
Using this we can see what channel a message belongs to and find all the connection tokens in that channel and add the message to the connection’s write buffer.
message_buffer holds all incoming messages before they are serialized into
Finally we have
At the other end in the publisher there is a signal sender:
Every time the broadcast publish data it also triggers an event for the
ReactiveSignalReceiver<T> is notified by an
Event when it can try to receive
at which point it should keep trying until it receives nothing.
We add a
new method for convenience.
Next up is the
publish method. Here we will continuously decode the message
buffer in a loop until we have no messages left in the buffer, find which
channel the message belongs to and write that message to every connection’s
write buffer in that channel.
In the event of an error the connection is not only removed from
but also unsubscribed from all the channels.
Tracking channels and connections using only a hash map is not the most efficient way. It’s quick to look up all connections in a channel, however when it comes to removing a connection from a channel we have to scan every single channel for that connection.
This would be more noticeable in a situation with many channels and several connections unsubscribing frequently.
Before we tackle the biggest method in this entire project we add the ability to unsubscribe connections and put data into the message buffer.
Finally we implement
Reactor for the subscriber. Just like the publisher we
TcpStream as the
Input and since we don’t intend to return a value
from this reactor as it’s the last reactor in the chain, we can set the
Output to a unit.
Just like with the publisher we’ll break down the
react method into two
blocks, starting with the
There is not much to be said here as it’s exactly the same as the publisher.
The final variance is the
Event(Event). The first thing we do is check if the
event belongs to the receiver. This is similar to the subscriber except in this
instance we care about the output as it’s the message payload we want to write to the
If the event does not belong to
messages we check if it belongs to
This is quite similar to the publisher, except rather than reading
to be sent to subscribers, we read
Subscribe messages to know which channel
the subscriber wants to subscribe to.
We also need to unsubscribe the connection if an error occur.
The only thing remaining to add before we put all this together is the timer mentioned in part 1. The one that keeps publish messages in case the buffer never fills up.
We only need one timer instance as it will deal with all publishers.
The timer is quite basic: one thread is spawned with a loop that sleeps for the duration given before notifying all the receivers.
We make two type aliases for the notifier to emphasise that these belong to the timer.
They are in fact nothing more than signal receivers, like the one used to send
data from the
Publisher to the
In this case we are not interested in the data it self, but rather when we get
the data so we can publish anything unpublished in the buffer.
There are two methods to add here. The
receiver method call will create a
bounded signal receiver, meaning it can hold a maximum of N values (in this
case: 1) at a time. We will pass this receiver to the publisher when calling
Finally there is
start which will consume
self and spawn the thread.
This will keep running until the program exits.
Now that the timer is complete there on only main.rs left!
Main: putting it all together
We’ll start by adding all the use statements on top and a convenience function for creating a tcp listener:
To keep the code more compact I’ve used
Sonr::prelude. To find out what’s in
listener function creates a tcp listener bound to a specific address.
map on the listener to change the output from
Now we can layout how the program will actually look:
For the publisher:
tcp listener connection queue threads connection dequeue publisher
The output of the tcp listener is the input of the connection queue.
Since the connection queue sends data over a channel the
Input of the queue is
Output of the dequeue.
We already know the input of the publisher: a
Thinking about the
Output of the reactors makes it easier to put
together a program by chaining the relevant reactors together. It also makes it
trivial to inject reactors anywhere in the chain as long as the input and output
The subscriber follows the same flow:
tcp listener connection queue threads connection dequeue subscriber
Given this it would be easy to replace the tcp listener with a unix domain
listener function to return a
ReactiveUdsListener instead. Change the
Input of both the
publisher and subscriber to a
UnixStream, and finally change
ReactiveUdsStream instead of a
Finally here is the main function:
There is a lot to take in here. The first thing when using
Reactors is to
System. Without the
System evented reactors can not be
created and would in fact panic. More about the
System can be found in the
introduction to Sonr in part 1.
We create both the broadcast and the timer in the main thread as we want to clone them for each thread.
We could create a new timer for each thread, but since the timer it self will create a thread to not block we would end up with twice as many threads, which would be unnecessary and considered an expensive operation.
If we created a broadcast for each thread then messages would not be published to every subscriber, only the subscriber in the same thread as the publisher, making the program incorrect.
Once we have created all the threads we need we can start the timer.
Finally we start each system by combining reactors with
This means that both
pub_run.and(sub_run) will receive events.
I ended up writing a quick benchmark to count the number of “ack”ed messages per second by the publisher. It can be found here.
There are probably cleverer things that could done to improve performance with regards to passing the messages around, however this is more an introduction to Sonr than an exercise is building a highly optimised pubsub solution.