Async, Futures, AMQP, pick three

A few weeks ago, we set out to develop an AMQP client library in Rust, and I'm happy to release it now! We will integrate it in more and more of our tools in the future.

Design: a futures based API and a low level API

One of our goals was to leverage tokio and futures to make an API that is easy to use, but also allowing for lower level implementations using directly an event loop with something like mio.

This was a bit challenging, but we ended up with two crates:

The resulting code can work with tokio-core's event reactor, or even futures-cpupool.

The libraries use, for the network frame format: nom, the Rust parser combinators library; cookie-factory, the experimental serialization library with the same approach as nom. It is a great example of employing nom inside a tokio transport, and integrating a complex protocol's state machine directly with tokio-io. We will release a tutorial on how to write such a protocol soon.

The libraries are also designed to be completely independent from the network stream: you can use a basic TCP stream, a TLS stream or a unix socket, and you won't be blocked by rust-openssl version conflicts between many libraries (which was a big issue for us).

Using the futures API: publishing a message

Every method returns a future, to let you chain them: the connect result will give a correct client once the complete AMQP handshake was performed, the channel will be available once the server has answered, etc. But the nature of AMQP makes parallel work on the same connection easy.

extern crate futures;
extern crate tokio_core;
extern crate lapin_futures as lapin;

use std::default::Default;
use futures::Stream;
use futures::future::Future;
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use lapin::client::ConnectionOptions;
use lapin::channel::{BasicPublishOptions,QueueDeclareOptions};

fn main() {

  // create the reactor
  let mut core = Core::new().unwrap();
  let handle = core.handle();
  let addr = "127.0.0.1:5672".parse().unwrap();

  core.run(

    TcpStream::connect(&addr, &handle).and_then(|stream| {

      // connect() returns a future of an AMQP Client
      // that resolves once the handshake is done
      lapin::client::Client::connect(
        stream,
        &ConnectionOptions{
          username: "guest",
          password: "guest",
          ..Default::default()
        }
      )
    }).and_then(|client| {

      // create_channel returns a future that is resolved
      // once the channel is successfully created
      client.create_channel()
    }).and_then(|channel| {
      let id = channel.id;
      info!("created channel with id: {}", id);

      channel.queue_declare("hello", &QueueDeclareOptions::default()).and_then(move |_| {
        info!("channel {} declared queue {}", id, "hello");

        channel.basic_publish(
          "hello",
          b"hello from tokio",
          &BasicPublishOptions::default(),
          BasicProperties::default().with_user_id("guest".to_string()).with_reply_to("foobar".to_string())
        )
      })
    })
  ).unwrap();
}

Every struct of the API, be it a client, channel or consumer, holds a synchronized reference to the underlying transport, so you could call it from any thread.

Using the futures API: creating a consumer

When you call the basic_consume method, it returns a future of a Consumer. It implements Stream, so this can reuse all the related combinators from the futures library.

extern crate futures;
extern crate tokio_core;
extern crate lapin_futures as lapin;

use futures::Stream;
use futures::future::Future;
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use lapin::client::ConnectionOptions;
use lapin::channel::{BasicConsumeOptions,QueueDeclareOptions};

fn main() {

  // create the reactor
  let mut core = Core::new().unwrap();
  let handle = core.handle();
  let addr = "127.0.0.1:5672".parse().unwrap();

  core.run(

    TcpStream::connect(&addr, &handle).and_then(|stream| {
      lapin::client::Client::connect(stream, &ConnectionOptions::default())
    }).and_then(|client| {

      client.create_channel()
    }).and_then(|channel| {

      let id = channel.id;
      info!("created channel with id: {}", id);

      let ch = channel.clone();
      channel.queue_declare("hello", &QueueDeclareOptions::default()).and_then(move |_| {
        info!("channel {} declared queue {}", id, "hello");

        channel.basic_consume("hello", "my_consumer", &BasicConsumeOptions::default())
      }).and_then(|stream| {
        info!("got consumer stream");

        stream.for_each(|message| {
          debug!("got message: {:?}", message);
          info!("decoded message: {:?}", std::str::from_utf8(&message.data).unwrap());

          ch.basic_ack(message.delivery_tag);
          Ok(())
        })
      })
    })
  ).unwrap();
}

Looking under the hood: lapin-async

The lapin-async library is meant for use with an event loop that will tell you when you can read or write on the underlying stream. As such, it does not own the network stream, nor the buffers used to read and write. You handle your IO, then pass the buffers to the protocol's state machine. It will update its state, tell you how much data it consumed, give you data to send to the network. And then you can query it for state changes.

There are various reasons for an architecture like this one:

  • a library that owns the IO stream usually does not play well with event loops
  • the developer might want to make their own optimizations with sockets and buffers
  • separating the IO makes the library easy to test: you can pass buffers (or even
    complete structs) to the state machine and verify the expected state easily

More generally, a protocol library should not dictate how the application handles its networking.

As an example of how it could run:

let mut stream = TcpStream::connect("127.0.0.1:5672").unwrap();
stream.set_nonblocking(true);

let capacity = 8192;
let mut send_buffer    = Buffer::with_capacity(capacity as usize);
let mut receive_buffer = Buffer::with_capacity(capacity as usize);

let mut conn: Connection = Connection::new();
assert_eq!(conn.connect().unwrap(), ConnectionState::Connecting(ConnectingState::SentProtocolHeader));
loop {
  match conn.run(&mut stream, &mut send_buffer, &mut receive_buffer) {
    Err(e) => panic!("could not connect: {:?}", e),
    Ok(ConnectionState::Connected) => break,
    Ok(state) => info!("now at state {:?}, continue", state),
  }
  thread::sleep(time::Duration::from_millis(100));
}
info!("CONNECTED");

the run method is a helper that will read from the network, parse frames, update internal state with the frames, write new frames to the network. We loop until the state switches to "connected". Most of the behaviour is on that model.

While the lapin-async library has most of the functionality, it is still a lot of manual work to manage, and you should prefer the futures based library.

A young library

This is an early release, and it is missing a lot of features, but the design makes them easy to implement.

Right now, the only authentication method is "plain", you can create and close channels, create queues (without options), and use the methods from the "basic" AMQP class. RabbitMQ's "publisher confirms" extension is also available.

It is mainly missing the "nack" extension, and the exchange and transaction handling methods.

More features will come in the following weeks, and if you want to contribute, you're very welcome 🙂

Blog

À lire également

SuperBOL: The COBOL revolution in the Cloud

COBOL, a programming language that is over 60 years old, continues to power a large proportion of the IT systems of the world's major companies, particularly in the financial and insurance sectors.
Features

Clever Cloud welcomes the first startups to the UP Programme

Clever Cloud is proud to announce the arrival of the first five startups selected to join its UP Programme, an initiative dedicated to supporting young technology companies in their growth phase.
Company

A minor update resulted in a cascade of errors: how it went wrong, what we’ve learnt

On Friday, August 2nd, 2024 Clever Cloud’s platform became very unstable, leading to downtime of varying duration and scope, for customers using services on the EU-FR-1 (PAR) region, and remote zones depending on the EU-FR-1 control plane (OVHcloud, Scaleway, and Oracle). Privates and on-premise zones weren’t impacted.
Company Engineering