Simple Chat using Sonr-extras 24 Apr 2019

Intro

In my previous post I wrote about building a pubsub server instead of a chat, however this time I want to focus on simple connection handling using sonr_extras, and rather than introducing a new dependency to the otherwise large pubsub example we’ll start from scratch.

Source code for the project is available on github.

UPDATE: As of 29th of April 2019 sonr_extras was updated on Github. both Connection::recv and Connection::write return an std::io::Result now, as opposed to a Result<T, ()>. This is to expose better error messages.

This post has been updated along with the example to reflect that.

Setup

Create a new Rust project:

$ cargo new --bin chat

and add the following dependencies to Cargo.toml:

1
2
3
[dependencies]
sonr = { git = "https://github.com/hagsteel/sonr" }
sonr_extras = { git = "https://github.com/hagsteel/sonr-extras" }

The biggest work in this tutorial is going to be the connection handling, so we’ll jump right in:

Connection handling

connections.rs
1
2
3
4
5
6
use sonr::net::stream::StreamRef;
use sonr::reactor::{Reaction, Reactor};
use sonr_extras::Connection;
use sonr_extras::Connections as InnerConnections;
use sonr_extras::LineCodec;
use bytes::BytesMut;

There are two new structs in our use statements here: Connection and Connections.

Connection has a StreamRef, a Codec, a read buffer and finally a write buffer. It can recv bytes and write whatever is in the write buffer.

Because recv returns Option<Result<Bytes, ()>> it is possible to write code like this:

while let Some(res) = connection.recv() {
    Ok(bytes) => { /* do something with the bytes */ }
    Err(_) => { /* remove and clean up the connection */ }
}

This can be broken down as such: if None is returned there was nothing to read or the connection was not readable, if an Err is returned the connection is closed, and finally Ok(bytes) provides the available encoded payload as provided by the connection.

write behaves in a similar fashion:

let encoded_bytes = connection.encode(some_bytes);
connection.add_payload(some_bytes);

while let Some(res) = connection.write() {
    Ok(n_bytes_read) => { /* number of bytes read */ }
    Err(_) => { /* remove and clean up the connection */ }
}
Write will write the entire buffer, which may consist of several encoded messages so it’s not possible to encode the messages when writing. At the time of writing this post an encode method is available on the Connection to make it possible to encode the payload before adding it to the connection’s write buffer.

Knowing this we can now look at the Connections struct:

It stores T: StreamRef (meaning anything that can expose a Stream by implementing the StreamRef trait) inside a hash map.

Even though Connections is not really a Reactor it provides an inner_react method that returns a Reaction<&mut T>

It might seem odd at first that we have inner_react that takes a Reaction<T>, and returns a Reaction<&mut T>, but isn’t a reactor. The idea is to use this inside your own custom reactor:

type MyConnection = Connection<Stream<TcpStream, LineCodec>;

struct MyConnections {
    inner: Connection<MyConnection>
}

impl Reactor for MyConnections {
    type Input = MyConnection;
    type Output = ();

    fn react(&mut self, reaction: Reaction<MyConnection>) -> Reaction<()> {
        match self.inner.inner_react(reaction) {
            Value(connection) => {
                // Read and write from the connection
                // using connection.recv() / connection.write()
            }
            Event(ev) => Event(ev),
            Continue => Continue,
        }
    }
}

In the above example we get a mutable reference to a connection we can work with, and if no connection was returned we simply pass the event on to the next reactor.

Connections (implementation)

Now it’s time to implement our connection handler in the form of a reactor:

connections.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
pub type UserConnection<T> = Connection<T, LineCodec>;

pub struct Connections<T: StreamRef> {
    inner: InnerConnections<UserConnection<T>>,
}

impl<T: StreamRef> Connections<T> {
    pub fn new() -> Self {
        Self {
            inner: InnerConnections::new(),
        }
    }
}

The first thing we did here is declare a type alias wrapping a Connection<T, LineCodec>. Since we’ll be using LineCodec (at the time of writing it’s the only available codec in sonr-extras) through out out application we can fix the type.

Next we declare our struct that will hold all our connections: Connections<T: StreamRef>, and implement a new function for convenience.

Lastly we’ll implement the reactor trait so we can connect our Connections struct to the output of a tcp listener:

connections.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
impl<T: StreamRef> Reactor for Connections<T> {
    type Input = UserConnection<T>;
    type Output = ();

    fn react(&mut self, reaction: Reaction<Self::Input>) -> Reaction<()> {
        use Reaction::*;

        match self.inner.inner_react(reaction) {
            Continue => Continue,
            Event(ev) => ev.into(),
            Value(user) => {
                // All payloads to send
                let mut payloads = Vec::new();

                // Read data from the connection
                while let Some(bytes_res) = user.recv() {
                    match bytes_res {
                        Ok(bytes) => {
                            // encode the bytes, which in this 
                            // instance just appends `\n`
                            let bytes_mut = BytesMut::from(bytes);
                            let bytes = user.encode(bytes_mut);

                            payloads.push(bytes)
                        }

                        // Connection broke while receiving data
                        Err(_) => {
                            let user_id = user.token();
                            self.inner.remove(user_id);
                            return Reaction::Continue;
                        }
                    }
                }

                // Track all connections that closes
                // during the write process
                let mut closed_connections = Vec::new();

                // Iterate over the payloads
                for bytes in payloads {
                    for (user_id, con) in self.inner.iter_mut() {
                        // Add the payload to each connection
                        con.add_payload(bytes.clone());

                        // ... and try to write the data
                        while let Some(res) = con.write() {
                            if res.is_err() {
                                closed_connections.push(*user_id);
                                break
                            }
                        }
                    }
                }

                // Remove closed connections
                for user_id in closed_connections {
                    self.inner.remove(user_id);
                }

                Reaction::Continue
            }
        }
    }
}
 

Putting it all together

Now that we have completed our reactor we can connect the input to a tcp listener’s output:

main.rs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use sonr::errors::Result;
use sonr::net::stream::Stream;
use sonr::prelude::*;
use sonr_extras::{tcp_listener, LineCodec};

mod connections;
use connections::{Connections, UserConnection};

const BUFFER_SIZE: usize = 1024;

fn main() -> Result<()> {
    System::init()?;

    let listener = tcp_listener("127.0.0.1:5555")?.map(|s| {
        let stream = Stream::new(s).unwrap();
        UserConnection::new(
            stream, 
            LineCodec::new(), 
            BUFFER_SIZE, 
            BUFFER_SIZE
        )
    });

    let connections = Connections::new();

    let run = listener.chain(connections);

    System::start(run)?;
    Ok(())
}
 

Final notes

We don’t have to be generic over T when it comes to our Connections<T: StreamRef> struct, but in doing so we can now change from a regular tcp listener to a unix domain socket by replaceing one line:

// Change this
// let listener = tcp_listener("127.0.0.1:5555")?.map(|s| {

// Into this
let listener = uds_listener("/tmp/chat.sock")?.map(|s| {

Just remember to add use sonr_extras::uds_listener to the use statements as well.

Drawing from the previous post where we used channels and broadcast it would be possible to make this work with both tcp and uds at the same time by broadcasting the messages.