Building a pub/sub server with Sonr (part 2) 11 Apr 2019

Intro

This is part 2 of “Building a pub/sub server with Sonr”.
Part 1 is available here.

Publisher

We will jump straight in building the publisher. This is the biggest piece of code so far in this project.

src/subscriber.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
use std::collections::HashMap;

use sonr::Token;
use sonr::reactor::{Reactor, Reaction};
use sonr::errors::Result as SonrResult;
use sonr::net::tcp::{TcpStream, ReactiveTcpStream};
use sonr::sync::signal::{SignalReceiver, ReactiveSignalReceiver};
use bytes::{Bytes, BytesMut, BufMut};

use crate::connection::Connection;
use crate::codec::LineCodec;
use crate::messages::{PubMessage, Subscribe};
use crate::BUFFER_SIZE;

pub struct Subscriber {
    connections: HashMap<Token, Connection>,
    messages: ReactiveSignalReceiver<Bytes>,
    channels: HashMap<String, Vec<Token>>,
    message_buffer: BytesMut,
}

The connections field is the same as on the publisher in part 1, and channels consists of a channel name and a list of tokens representing the connection subscribed to that channel.

Using this we can see what channel a message belongs to and find all the connection tokens in that channel and add the message to the connection’s write buffer.

Note that for simplicity checking that it’s actually possible to add payloads to the connection is left out. A scenario could arise where the underlying connection’s write buffer is never drained as a result of the remote socket never performing any reads, and the write buffer simply fills up.

The message_buffer holds all incoming messages before they are serialized into PubMessages.

Finally we have messages: a ReactiveSignalReceiver<Bytes>. At the other end in the publisher there is a signal sender: Broadcast. Every time the broadcast publish data it also triggers an event for the corresponding receiver.

A ReactiveSignalReceiver<T> is notified by an Event when it can try to receive T, at which point it should keep trying until it receives nothing.

We add a new method for convenience.

src/subscriber.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
impl Subscriber {
    pub fn new(messages: SignalReceiver<Bytes>) -> SonrResult<Self> {
        Ok(Self {
            connections: HashMap::new(),
            messages: ReactiveSignalReceiver::new(messages)?,
            channels: HashMap::new(),
            message_buffer: BytesMut::with_capacity(BUFFER_SIZE),
        })
    } 

    // ...
}

Next up is the publish method. Here we will continuously decode the message buffer in a loop until we have no messages left in the buffer, find which channel the message belongs to and write that message to every connection’s write buffer in that channel.

src/subscriber.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
impl Subscriber {
    // ... 

    fn publish(&mut self) {
        loop {
            let message = LineCodec::decode::<PubMessage>(&mut self.message_buffer);
            if message.is_none() { return }

            let message = message.unwrap();
            if let Some(connection_ids) = self.channels.get(&message.channel) {
                let connection_ids = connection_ids.clone();

                if let Ok(encoded_message) = LineCodec::encode(message) {
                    for cid in connection_ids {
                        if let Some(con) = self.connections.get_mut(&cid) {
                            con.add_payload(encoded_message.clone());

                            while let Some(wrt_res) = con.write() {
                                if wrt_res.is_err() {
                                    self.connections.remove(&cid);
                                    self.unsubscribe(cid);
                                    break;
                                }
                            }
                        }
                    }
                }
            }
        }
    }

    // ...
}

In the event of an error the connection is not only removed from connections but also unsubscribed from all the channels.

Tracking channels and connections using only a hash map is not the most efficient way. It’s quick to look up all connections in a channel, however when it comes to removing a connection from a channel we have to scan every single channel for that connection.

This would be more noticeable in a situation with many channels and several connections unsubscribing frequently.

Before we tackle the biggest method in this entire project we add the ability to unsubscribe connections and put data into the message buffer.

src/subscriber.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
impl Subscriber {
    // ... 

    fn unsubscribe(&mut self, connection_id: Token) {
        for connection_ids in self.channels.values_mut() {
            while let Some(pos) = connection_ids.iter().position(|id| id == &connection_id) {
                connection_ids.remove(pos);
            }
        }
    }

    fn add_payload(&mut self, payload: Bytes) {
        if payload.len() > self.message_buffer.remaining_mut() {
            self.message_buffer.reserve(payload.len());
        }
        self.message_buffer.put_slice(&payload);
    }

    // ...
}

Finally we implement Reactor for the subscriber. Just like the publisher we have a TcpStream as the Input and since we don’t intend to return a value from this reactor as it’s the last reactor in the chain, we can set the Output to a unit.

src/subscriber.rs
1
2
3
4
5
6
7
impl Reactor for Subscriber {
    type Input = TcpStream;
    type Output = ();

    // ...

}

Just like with the publisher we’ll break down the react method into two blocks, starting with the Value and Continue variance:

There is not much to be said here as it’s exactly the same as the publisher.

src/subscriber.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
impl Reactor for Subscriber {
    // ...

    fn react(&mut self, reaction: Reaction<Self::Input>) -> Reaction<Self::Output> {
        use Reaction::*;
        match reaction {
            Value(stream) => {
                if let Ok(stream) = ReactiveTcpStream::new(stream) {
                    self.connections.insert(stream.token(), Connection::new(stream));
                }
                Continue
            }
            Event(event) => // ...
            Continue => Continue,
        }
    }
}

The final variance is the Event(Event). The first thing we do is check if the event belongs to the receiver. This is similar to the subscriber except in this instance we care about the output as it’s the message payload we want to write to the corresponding connections.

If the event does not belong to messages we check if it belongs to connections.

This is quite similar to the publisher, except rather than reading PubMessages to be sent to subscribers, we read Subscribe messages to know which channel the subscriber wants to subscribe to.

We also need to unsubscribe the connection if an error occur.

src/subscriber.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
impl Reactor for Subscriber {
    // ...
    fn react(&mut self, reaction: Reaction<Self::Input>) -> Reaction<Self::Output> {
        use Reaction::*;
        match reaction {
            Value(stream) => // ...

            Event(event) => {
                // Incoming messages:
                if self.messages.token() == event.token() {
                    if let Value(messages) = self.messages.react(event.into()) {
                        self.add_payload(messages);

                        // Keep "reacting" until we no longer receive a message
                        while let Value(messages) = self.messages.react(Continue) {
                            self.add_payload(messages);
                        }

                        // and finally publish the messages
                        self.publish();
                    }
                    return Continue
                }

                // Connection event:
                if let Some(con) = self.connections.get_mut(&event.token()) {
                    con.react(event.into());

                    // Read all "subscribe" messages
                    while let Some(messages) = con.recv::<Subscribe>() {
                        match messages {
                            Ok(messages) => {
                                for message in messages {
                                    match self.channels.get_mut(&message.channel) {
                                        Some(_) => {
                                            if let Some(conneciton_ids) = self.channels.get_mut(&message.channel) {
                                                conneciton_ids.push(event.token());
                                            }
                                        }
                                        None => { self.channels.insert(message.channel, vec![event.token()]); }
                                    }
                                }
                            }
                            Err(_) => {
                                self.connections.remove(&event.token());
                                self.unsubscribe(event.token());
                                return Continue
                            }
                        }
                    }
                    Continue
                } else {
                    event.into()
                }
            }
            Continue => Continue,
        }
    }
}

The only thing remaining to add before we put all this together is the timer mentioned in part 1. The one that keeps publish messages in case the buffer never fills up.

We only need one timer instance as it will deal with all publishers.

The timer is quite basic: one thread is spawned with a loop that sleeps for the duration given before notifying all the receivers.

We make two type aliases for the notifier to emphasise that these belong to the timer.

  • ReactiveTimerNotifier
  • TimerNotifier

They are in fact nothing more than signal receivers, like the one used to send data from the Publisher to the Subscriber. In this case we are not interested in the data it self, but rather when we get the data so we can publish anything unpublished in the buffer.

src/timer.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
use sonr::sync::signal::{SignalSender, SignalReceiver, ReactiveSignalReceiver};
use std::thread;
use std::time::Duration;

pub type ReactiveTimerNotifier = ReactiveSignalReceiver<()>;
pub type TimerNotifier = SignalReceiver<()>;

pub struct Timer {
    senders: Vec<SignalSender<()>>,
    interval: Duration,
}

impl Timer {
    pub fn new(interval: Duration) -> Self {
        Self { 
            senders: Vec::new(),
            interval,
        }
    }

    // ...
}

There are two methods to add here. The receiver method call will create a bounded signal receiver, meaning it can hold a maximum of N values (in this case: 1) at a time. We will pass this receiver to the publisher when calling Publisher::new.

Finally there is start which will consume self and spawn the thread. This will keep running until the program exits.

src/timer.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
impl Timer {
    pub fn receiver(&mut self) -> SignalReceiver<()> {
        let receiver = SignalReceiver::bounded(1);
        self.senders.push(receiver.sender());
        receiver
    }

    pub fn start(self) {
        thread::spawn(move || {
            loop {
                thread::sleep(self.interval);
                self.senders.iter().for_each(|n| { let _ = n.send(()); } );
            }
        });
    }
}

Now that the timer is complete there on only main.rs left!

Main: putting it all together

We’ll start by adding all the use statements on top and a convenience function for creating a tcp listener:

To keep the code more compact I’ve used Sonr::prelude. To find out what’s in prelude: sonr::prelude.

src/main.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
use std::thread;
use std::time::Duration;
use sonr::prelude::*;
use sonr::errors::Result;
use sonr::net::tcp::{TcpStream, ReactiveTcpListener};
use sonr::sync::broadcast::Broadcast;
use sonr::sync::queue::{ReactiveQueue, ReactiveDeque};

use pubsub::publisher::Publisher;
use pubsub::subscriber::Subscriber;
use pubsub::timer::Timer;

fn listener(addr: &str) -> Result<impl Reactor<Output=TcpStream>> {
    let l = ReactiveTcpListener::bind(addr)?
        .map(|(s, _)| s);
    Ok(l)
}
 

The listener function creates a tcp listener bound to a specific address. We call map on the listener to change the output from (TcpStream, SocketAddr) to TcpStream.

Now we can layout how the program will actually look:

For the publisher:

tcp listener connection queue threads connection dequeue publisher

The output of the tcp listener is the input of the connection queue. Since the connection queue sends data over a channel the Input of the queue is the Output of the dequeue. We already know the input of the publisher: a TcpStream.

Thinking about the Input and Output of the reactors makes it easier to put together a program by chaining the relevant reactors together. It also makes it trivial to inject reactors anywhere in the chain as long as the input and output types match.

The subscriber follows the same flow:

tcp listener connection queue threads connection dequeue subscriber

Given this it would be easy to replace the tcp listener with a unix domain socket: change the listener function to return a ReactiveUdsListener instead. Change the Input of both the publisher and subscriber to a UnixStream, and finally change Connection to use a ReactiveUdsStream instead of a ReactiveTcpStream.

Finally here is the main function:

src/main.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
fn main() -> Result<()> {
    System::init()?;
    let thread_count = 8;
    let buffer_threshold = 256;
    let publish_timeout = Duration::from_millis(20);

    // Publisher
    let pub_listener = listener("127.0.0.1:8000")?;
    let broadcast = Broadcast::unbounded();
    let mut timer = Timer::new(publish_timeout);
    let mut pub_connection_queue = ReactiveQueue::unbounded(); 

    // Subscriber
    let sub_listener = listener("127.0.0.1:9000")?;
    let mut sub_connection_queue = ReactiveQueue::unbounded(); 

    for _ in 0..thread_count {
        let broadcast = broadcast.clone();
        let timer_notifier = timer.receiver();
        let pub_deque = pub_connection_queue.deque();
        let sub_deque = sub_connection_queue.deque();

        thread::spawn(move || -> Result<()> {
            System::init()?;

            let sub_connection_deque = ReactiveDeque::new(sub_deque)?;
            let subscriber = Subscriber::new(broadcast.subscriber())?;
            let sub_run = sub_connection_deque.chain(subscriber);

            let pub_connection_deque = ReactiveDeque::new(pub_deque)?;
            let publisher = Publisher::new(broadcast, buffer_threshold, timer_notifier)?;
            let pub_run = pub_connection_deque.chain(publisher);

            let run = pub_run.and(sub_run);

            System::start(run)?;

            Ok(())
        });
    }

    timer.start();

    let pub_run = pub_listener.chain(pub_connection_queue);
    let sub_run = sub_listener.chain(sub_connection_queue);

    System::start(pub_run.and(sub_run))?;

    Ok(())
}

There is a lot to take in here. The first thing when using Reactors is to initialise the System. Without the System evented reactors can not be created and would in fact panic. More about the System can be found in the introduction to Sonr in part 1.

We create both the broadcast and the timer in the main thread as we want to clone them for each thread.

We could create a new timer for each thread, but since the timer it self will create a thread to not block we would end up with twice as many threads, which would be unnecessary and considered an expensive operation.

If we created a broadcast for each thread then messages would not be published to every subscriber, only the subscriber in the same thread as the publisher, making the program incorrect.

Once we have created all the threads we need we can start the timer.

Finally we start each system by combining reactors with and. This means that both pub_run.and(sub_run) will receive events.

Final notes

I ended up writing a quick benchmark to count the number of “ack”ed messages per second by the publisher. It can be found here.

There are probably cleverer things that could done to improve performance with regards to passing the messages around, however this is more an introduction to Sonr than an exercise is building a highly optimised pubsub solution.