Intro
In my previous post I wrote about building a pubsub server instead of a chat, however this time I want to focus on simple connection handling using sonr_extras, and rather than introducing a new dependency to the otherwise large pubsub example we’ll start from scratch.
Source code for the project is available on github.
UPDATE: As of 29th of April 2019 sonr_extras
was updated on Github.
both Connection::recv
and Connection::write
return an
std::io::Result
now, as opposed to a Result<T, ()>
.
This is to expose better error messages.
This post has been updated along with the example to reflect that.
Setup
Create a new Rust project:
$ cargo new --bin chat
and add the following dependencies to Cargo.toml:
|
|
The biggest work in this tutorial is going to be the connection handling, so we’ll jump right in:
Connection handling
|
|
There are two new structs in our use statements here: Connection
and Connections
.
Connection
has a StreamRef
, a Codec
, a read buffer and finally a write
buffer. It can recv
bytes and write
whatever is in the write buffer.
Because recv
returns Option<Result<Bytes, ()>>
it is possible to write code
like this:
while let Some(res) = connection.recv() {
Ok(bytes) => { /* do something with the bytes */ }
Err(_) => { /* remove and clean up the connection */ }
}
This can be broken down as such: if None
is returned there was nothing to
read or the connection was not readable, if an Err
is returned the connection
is closed, and finally Ok(bytes)
provides the available encoded payload as
provided by the connection.
write
behaves in a similar fashion:
let encoded_bytes = connection.encode(some_bytes);
connection.add_payload(some_bytes);
while let Some(res) = connection.write() {
Ok(n_bytes_read) => { /* number of bytes read */ }
Err(_) => { /* remove and clean up the connection */ }
}
encode
method is available
on the Connection
to make it possible to encode the payload before adding it
to the connection’s write buffer.
Knowing this we can now look at the Connections
struct:
It stores T: StreamRef
(meaning anything that can expose a Stream
by implementing the StreamRef
trait) inside a hash map.
Connections
is not really a Reactor
it
provides an inner_react
method that returns a Reaction<&mut T>
It might seem odd at first that we have inner_react
that takes a Reaction<T>
,
and returns a Reaction<&mut T>
, but isn’t a reactor.
The idea is to use this inside your own custom reactor:
type MyConnection = Connection<Stream<TcpStream, LineCodec>;
struct MyConnections {
inner: Connection<MyConnection>
}
impl Reactor for MyConnections {
type Input = MyConnection;
type Output = ();
fn react(&mut self, reaction: Reaction<MyConnection>) -> Reaction<()> {
match self.inner.inner_react(reaction) {
Value(connection) => {
// Read and write from the connection
// using connection.recv() / connection.write()
}
Event(ev) => Event(ev),
Continue => Continue,
}
}
}
In the above example we get a mutable reference to a connection we can work with, and if no connection was returned we simply pass the event on to the next reactor.
Connections (implementation)
Now it’s time to implement our connection handler in the form of a reactor:
|
|
The first thing we did here is declare a type alias wrapping a Connection<T,
LineCodec>
.
Since we’ll be using LineCodec
(at the time of writing it’s the only available
codec in sonr-extras) through out out application we can fix the type.
Next we declare our struct that will hold all our connections: Connections<T:
StreamRef>
, and implement a new
function for convenience.
Lastly we’ll implement the reactor trait so we can connect our Connections
struct
to the output of a tcp listener:
|
|
Putting it all together
Now that we have completed our reactor we can connect the input to a tcp listener’s output:
|
|
Final notes
We don’t have to be generic over T when it comes to our Connections<T:
StreamRef>
struct, but in doing so we can now change from a regular tcp
listener to a unix domain socket by replaceing one line:
// Change this
// let listener = tcp_listener("127.0.0.1:5555")?.map(|s| {
// Into this
let listener = uds_listener("/tmp/chat.sock")?.map(|s| {
Just remember to add use sonr_extras::uds_listener
to the use statements as
well.
Drawing from the previous post where we used channels and broadcast it would be possible to make this work with both tcp and uds at the same time by broadcasting the messages.