Intro to Benthos

Oct 17, 2016 20:09 · 1003 words · 5 minutes read

Benthos is a persistent data streaming service, capable of connecting with a range of messaging protocols as a bridge. Protocols include Kafka, ZeroMQ, AMQP (RabbitMQ), Scalability Protocols (Nanomsg), File, HTTP 1.1 and stdin/stdout. The persistent buffer is optional, and uses memory-mapped files as a low-latency solution.

eyes

For more information on getting Benthos and general usage check out the github page. If you intend to run the examples in this blog then you should grab a binary before continuing.

Benthos doesn’t replace larger, horizontally scaled message queues such as Kafka or RabbitMQ, but can bridge them to services which cannot natively connect. Here is an example platform, and where Benthos might fit in:

platform diagram

Imagine we had a foo connected to bar through ZMQ. As the platform grew we decided to scale out most of our bar services via RabbitMQ, but foo was written in a language without an AMQP client lib. Instead, we can just stick Benthos between the two without a buffer of its own (direct bridge).

Justification

I work on a platform that is rife with services. In order to meet high throughput and low latency constraints we often use ZeroMQ as a direct bridge between them. The three major issues we faced with this were:

  • Upgrades that broke backwards compatibility (naughty ZMQ 3.2)
  • Back pressure (during restarts, intermittent errors, etc)
  • Other protocols creeping into the platform, screwing up any future restructure

Benthos solves these by allowing you to glue services to new protocols without developer effort. You can then remove Benthos once it is no longer needed.

Architecture

Benthos looks good naked. The block diagram is so simple you could literally draw it in a png file, and host it on this blog:

benthos arch

The configuration file for Benthos also follows this simple three part model of input, buffer and output. You can print a full list of inputs, outputs and buffer options, along with documentation, with:

benthos --list-inputs --list-buffers --list-outputs | less

Discoverable Configuration

Configuration sucks, both for developers and users. However, Benthos has too many options for flags to be viable. To make life easier Benthos has the ability to print a fully populated configuration file as a command. You can try it out with benthos --print-yaml or benthos --print-json.

To make the config options discoverable they are formatted such that any field with multiple options has those options listed as sibling fields. The sibling fields are objects containing any unique fields for that particular option.

For example, let’s take a look at a snippet of the “input” config:

input:
  type: http_server
  http_server:
    address: localhost:80
    path: /post
  zmq:
    addresses:
      - tcp://*:1333
  file:
    path: ./input.data
  stdin:

By reading that snippet you can hopefully infer that the options for “type” are “http_server”, “zmq”, “file”, and “stdin”.

Using Benthos: HTTP to stdout

As a quick demo let’s print HTTP POST requests to stdout. Create a file called “http_to_stdout.yaml” and write to it the following:

input:
  type: http_server
  http_server:
    address: localhost:8080
    path: /post

We can leave out the entire buffer and output sections as we are using the defaults anyway. Change the values for address and path at your leisure.

Run this config with:

benthos -c ./http_to_stdout.yaml

You’ll see some log printed to stderr (this would normally print to stdout, but an exception is made when the output is stdout.) Benthos is now listening for messages. You can test it by sending text with curl:

curl http://localhost:8080/post -d "hello world"

You should see it on the stdout pipe from Benthos. If at any point you suspect your config might be formatted incorrectly you can do a sanity check with:

benthos -c ./http_to_stdout.yaml --print-yaml | less

Which will parse your config and print the full result.

Multiple Inputs/Outputs

When you print a config you might notice some odd input and output options called “fan_in”, “fan_out”, “round_robin”. These are special inputs and outputs called broker types. They allow you to configure multiple inputs/outputs in a certain arrangement.

The simplest is the “fan_in” type, which simply reads from N inputs. To demo this let’s build the previous example with the added ability to also read stdin.

Our config input type will now be “fan_in”, the only config field for this type is “inputs”, which is an array of input config objects, one for each input:

input:
  type: fan_in
  fan_in:
    inputs:
    - type: http_server
      http_server:
        address: localhost:8080
        path: /post
    - type: stdin

Here we have specified two inputs: “http_server” and “stdin”. Run it with some text piped to stdin:

echo "via stdin" | benthos -c ./http_to_stdout.yaml

And also send some text using curl like last time:

curl http://localhost:8080/post -d "via HTTP POST"

And you should see both “via stdin” and “via HTTP POST” from Benthos. Sorry if it didn’t work I’m not good with computers.

Hot Tip

With brokers you might find you have to copy/paste many config fields if they are similar. To save you some effort there is a special “ditto” type, which copies the previous config and overlays any changes.

For example, say we are reading from kafka partitions 0, 1 and 2:

input:
  type: fan_in
  fan_in:
    inputs:
    - type: Kafka
      kafka:
        addresses:
          - someserver:9092
        topic: my_data_stream
        consumer_group: super_cool_consumer_group
        partition: 0
    - type: ditto
      kafka:
        partition: 1
    - type: ditto
      kafka:
        partition: 2

All three inputs above will be configured to connect to “someserver:9092” with the topic “my_data_stream” and the consumer group “super_cool_consumer_group”.

More Useful: Kafka + HTTP POST to ZMQ

Let’s imagine that we had a service foo which reads messages from a ZMQ pull socket. We then decided to introduce Kafka before foo, and as a bridge we deployed Benthos with this config:

input:
  type: kafka
  kafka:
    addresses:
      - broker1:9092
      - broker2:9092
      - broker3:9092
    topic: foo_stream
    consumer_group: benthos_foo_1
    partition: 0
output:
  type: zmq4
  zmq4:
    addresses:
      - tcp://foo1:5556
    bind: false
    socket_type: PUSH

Now, after some time, we want to automatically probe the resilience of our system by injecting messages before foo. This should be easy to perform from scripts and avoid dirtying up our Kafka logs. A small change to our previous config lets us do this with Benthos and HTTP:

input:
  type: fan_in
  fan_in:
    inputs:
    - type: kafka
      kafka:
        addresses:
          - broker1:9092
          - broker2:9092
          - broker3:9092
        topic: foo_stream
        consumer_group: benthos_foo_1
        partition: 0
    - type: http_server
      http_server:
        address: benthos_foo_1:8080
        path: /inject
output:
  type: zmq4
  zmq4:
    addresses:
      - tcp://foo1:5556
    bind: false
    socket_type: PUSH

Without losing any of our previous functionality we can now inject messages into our foo with any tool that supports HTTP. For example, with curl:

curl http://benthos_foo_1:8080/inject -d "{mangled_json:+++}" || alert_ops

Conclusion

The perfect platform architecture doesn’t need Benthos. However:

  • Messaging protocols are fragmented
  • Client libraries aren’t always available for your stack
  • Deployments sometimes need to change faster than the dev process can handle
  • It’s fun to play with messaging streams

If you think Benthos might help you then grab a binary and start playing.