Intro
In my previous post I wrote about building a pubsub server instead of a chat, however this time I want to focus on simple connection handling using sonr_extras, and rather than introducing a new dependency to the otherwise large pubsub example we’ll start from scratch.
Source code for the project is available on github.
UPDATE: As of 29th of April 2019 sonr_extras was updated on Github.
both Connection::recv and Connection::write return an
std::io::Result now, as opposed to a Result<T, ()>.
This is to expose better error messages.
This post has been updated along with the example to reflect that.
Setup
Create a new Rust project:
$ cargo new --bin chatand add the following dependencies to Cargo.toml:
|
|
The biggest work in this tutorial is going to be the connection handling, so we’ll jump right in:
Connection handling
|
|
There are two new structs in our use statements here: Connection and Connections.
Connection has a StreamRef, a Codec, a read buffer and finally a write
buffer. It can recv bytes and write whatever is in the write buffer.
Because recv returns Option<Result<Bytes, ()>> it is possible to write code
like this:
while let Some(res) = connection.recv() {
Ok(bytes) => { /* do something with the bytes */ }
Err(_) => { /* remove and clean up the connection */ }
}
This can be broken down as such: if None is returned there was nothing to
read or the connection was not readable, if an Err is returned the connection
is closed, and finally Ok(bytes) provides the available encoded payload as
provided by the connection.
write behaves in a similar fashion:
let encoded_bytes = connection.encode(some_bytes);
connection.add_payload(some_bytes);
while let Some(res) = connection.write() {
Ok(n_bytes_read) => { /* number of bytes read */ }
Err(_) => { /* remove and clean up the connection */ }
}
encode method is available
on the Connection to make it possible to encode the payload before adding it
to the connection’s write buffer.
Knowing this we can now look at the Connections struct:
It stores T: StreamRef (meaning anything that can expose a Stream
by implementing the StreamRef trait) inside a hash map.
Connections is not really a Reactor it
provides an inner_react method that returns a Reaction<&mut T>
It might seem odd at first that we have inner_react that takes a Reaction<T>,
and returns a Reaction<&mut T>, but isn’t a reactor.
The idea is to use this inside your own custom reactor:
type MyConnection = Connection<Stream<TcpStream, LineCodec>;
struct MyConnections {
inner: Connection<MyConnection>
}
impl Reactor for MyConnections {
type Input = MyConnection;
type Output = ();
fn react(&mut self, reaction: Reaction<MyConnection>) -> Reaction<()> {
match self.inner.inner_react(reaction) {
Value(connection) => {
// Read and write from the connection
// using connection.recv() / connection.write()
}
Event(ev) => Event(ev),
Continue => Continue,
}
}
}
In the above example we get a mutable reference to a connection we can work with, and if no connection was returned we simply pass the event on to the next reactor.
Connections (implementation)
Now it’s time to implement our connection handler in the form of a reactor:
|
|
The first thing we did here is declare a type alias wrapping a Connection<T,
LineCodec>.
Since we’ll be using LineCodec (at the time of writing it’s the only available
codec in sonr-extras) through out out application we can fix the type.
Next we declare our struct that will hold all our connections: Connections<T:
StreamRef>, and implement a new function for convenience.
Lastly we’ll implement the reactor trait so we can connect our Connections struct
to the output of a tcp listener:
|
|
Putting it all together
Now that we have completed our reactor we can connect the input to a tcp listener’s output:
|
|
Final notes
We don’t have to be generic over T when it comes to our Connections<T:
StreamRef> struct, but in doing so we can now change from a regular tcp
listener to a unix domain socket by replaceing one line:
// Change this
// let listener = tcp_listener("127.0.0.1:5555")?.map(|s| {
// Into this
let listener = uds_listener("/tmp/chat.sock")?.map(|s| {
Just remember to add use sonr_extras::uds_listener to the use statements as
well.
Drawing from the previous post where we used channels and broadcast it would be possible to make this work with both tcp and uds at the same time by broadcasting the messages.