An introduction to Sonr 29 Mar 2019

Sonr is built on top of Mio with the aim to make it easier to create networking applications.

The two main components of Sonr are the Reactor and the System.

A Reactor reacts to Events from the System, or the output of another reactor. This makes it possible (and intended) to chain two reactors. Such a chain is in it self a Reactor, and can be chained further.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
let tcp_listener = some_reactor();
let con_throttle = another_reactor();
let con_queue = and_another_one();

// First chain
let throttled_listener = tcp_listener.chain(con_throttle);

// Second chain
let run = throttled_listener.chain(con_queue);

System::start(run) 

Once the System receives an Event it is pushed down the chain(s) of reactors until a reactor matching the event id reacts, returning a Reaction that is passed to the next Reactor in the chain.

System is thread local and has to exist for each thread using Reactors. There can only be one instance of a System per thread, and this instance is created by calling System::init() (calling init twice in the same thread will panic).

Reactor

The reactor trait consists of two types and a method:

1
2
3
4
5
6
trait Reactor {
    type Input;
    type Output;

    fn react(&mut self, reaction: Reaction<Self::Input>) -> Reaction<Self::Output>;
}
The Reactor trait.

Chain

To chain one reactor with another the Output of the first has to be the same type as the Input of the second.

The following example shows the creation of a chain from two reactors.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
use sonr::reactor::{Reaction, Reactor};

struct VecToString;

impl Reactor for VecToString {
    type Input = Vec<u8>;
    type Output = String; // Match the Input of `UppercaseString` reactor

    fn react(&mut self, reaction: Reaction<Self::Input>) -> Reaction<Self::Output> {
        use Reaction::*;
        match reaction {
            Value(bytes) => Value(String::from_utf8(bytes).unwrap()),
            Event(ev) => Event(ev),
            Continue => Continue,
        }
    }
}

struct UppercaseString;

impl Reactor for UppercaseString {
    type Input = String;
    type Output = String;

    fn react(&mut self, reaction: Reaction<Self::Input>) -> Reaction<Self::Output> {
        use Reaction::*;
        match reaction {
            Value(mut s) => {
                s.make_ascii_uppercase();
                Value(s)
            }
            Event(ev) => Event(ev),
            Continue => Continue,
        }
    }
}

fn main() {
    let input = "hello world".to_owned().into_bytes();

    let r1 = VecToString;
    let r2 = UppercaseString;
    let mut chain = r1.chain(r2);

    // Manually cause the chain to react
    chain.react(Reaction::Value(input));
}
Since none of the reactors in the above example are responding to Events there is no need for the System.

Since reactors in a chain will always push data forward and never return anything other than Reaction::Continue it is not possible to capture the output from line 46: chain.react(Raction::Value(input)); here.

To print the result in the above example it would be possible to create a third reactor that prints the given Input, however rather than creating yet another reactor the map function of a reactor accepts a closure that is called if the reactor returns Reaction::Value(val), with val as an argument.

Map

Updating the previous example to use map to print the output of UppercaseString:

1
2
3
4
5
6
7
8
fn main() {
    let input = "hello world".to_owned().into_bytes();

    let r1 = VecToString;
    let r2 = UppercaseString.map(|the_string| println!("{}", the_string));
    let mut chain = r1.chain(r2);
    chain.react(Reaction::Value(input));
}

The map function takes a closure as an argument, where the argument for the closure is the Output of the reactor. Since the output of UppercaseString is a String, the map function is called with a closure FnMut(UppercaseString::Output) -> T.

This is a convenient way to change the output of a reactor.

The ReactiveTcpListener outputs a tuple: (TcpStream, SocketAddr). The map function could be used here if the SocketAddr is not required:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
use sonr::prelude::*;
use sonr::net::tcp::ReactiveTcpListener;

fn main() {
    System::init();

    let listener = ReactiveTcpListener::bind("127.0.0.1:8000")
        .unwrap()
        .map(|(stream, _addr)| stream); // ignore the SocketAddr

    System::start(listener);
}
By returning only the stream instead of the tuple, the Reactor Output is changed to TcpStream from (TcpStream, SocketAddress)

And

Another method available on a Reactor is and. This is useful to run more than one reactor in parallel.

Since the output of a tcp listener is a stream and a socket address, it would not be possible to chain two tcp listeners together. It is also not possible to call System::start twice in the same thread.

To run two tcp listeners at the same time use and:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
use sonr::prelude::*;
use sonr::net::tcp::ReactiveTcpListener;
use sonr::errors::Result;

fn main() -> Result<()> {
    System::init();

    let listener_1 = ReactiveTcpListener::bind("127.0.0.1:8000")?;
    let listener_2 = ReactiveTcpListener::bind("127.0.0.1:9000")?;

    System::start(listener_1.and(listener_2));
    Ok(())
}

This means a Reaction::Event(event) from the System will be sent to the first listener and the second listener.

System

A Reactor such as ReactiveTcpListener or a ReactiveTcpStream depend on system events to notify them when they can read and/or write, or accept incoming connections.

Therefore the System::init call has to come before a ReactiveTcpStream or a ReactiveTcpListener is created.

Once System::start(reactor) is called, the System polls Events and passes the events the reactor.

If the event does not belong to the reactor it is passed down the chain until a reactor reacts to the event.

The System has a few responsibilities:

  • Polling system events and passing them to the reactors
  • Registering / Re registering Evented types
  • Managing tokens used when registering an Evented type

The System also registers EventedReactors

Shutting down a System

Calling init returns a SignalSender<SystemEvent>.

At the time of writing the SystemEvent only contains one variance: Stop.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
use sonr::prelude::*;
use sonr::reactor::producers::ReactiveGenerator;
use sonr::errors::Result;

fn main() -> Result<()> {
    let signal_sender = System::init()?;

    let gen = ReactiveGenerator::new(vec![1])?;
    let run = gen.map(|number| {
        eprintln!("received the number {:?}, sending System Stop event", number);

        // Without this line the system would run forever
        signal_sender.send(SystemEvent::Stop);
    });

    System::start(run)?;
    eprintln!("so long and thanks for all the fish");
    Ok(())
}