A cost-effective data streaming to AWS S3 for RabbitMQ using Apache Flume
Introduction
AWS is a great choice for businesses requiring volatile computing resouces. It’s amazing how easily one can scale from tens to millions when it is built entirely from serverless pieces. But it comes at a price - the more resources demands becomes stable the more a client can save by the migration to solutions that do not come out of the box of AWS environment. A good example is Amazon Kinesis Firehose as a sink to AWS S3. You’re just billed for each 5KB sent and Amazon takes care of the rest. An alternative to this, still hosted on AWS, is Apache Flume which run on the EC2 machines. It requires a configuration, a tuning and a maintanance, but it can save a lot of money. I’ve spent some time poking around and prepared a demo of it.
What is Apache Flume?
Flume is a tool for collecting and moving large amounts of log data from different sources to a centralized data store. A single instance of Flume, called Agent, consists of sources, channels and sinks.
![]() |
A source consumes external providers’ data. It can be a Kafka Stream, an HTTP endpoint, a syslog and many more. The one that does not come out-of-the-box can be implemented - that is the case of the RabbitMq support. I’m going to use gmr’s implementation.
Data aquired from a source is staged on an agent in a channel. A channel acts like a buffer for a sink. In our scenario, batches of meassages fit in the memory so we’re going to use the Memory Channel.
A sink consumes channels and efficiently moves batches of data into a centralized data store (well, not necessarily a centralized data store). Obviously, the HDFS Sink is implemented and thanks to AWS SDK for Java can be used in work with Simple Storage Service.
A working example
We are going to build now a small, docker-compose-backed system with three major components:
- Producer, a Python application producing 400 JSON messages per second, 4KB each,
- RabbitMQ instance,
- Apache Flume agent that consumes a RMQ queue and stores data on S3.
docker-compose.yml
version: "2.1"
services:
rabbitmq:
image: rabbitmq:3.6
producer:
build:
context: producer
links:
- "rabbitmq"
depends_on:
- "rabbitmq"
flume:
build:
context: flume
links:
- "rabbitmq"
depends_on:
- "rabbitmq"
producer.py
import time
import pika
import random
import string
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
EXCHANGE_NAME = 'json_exchange'
QUEUE_NAME = 'json_queue'
PUBLISH_FREQ = 400
MSG_BODY = '{"payload": "' + ''.join(random.choice(string.ascii_letters) for _ in range(4096)) + '"}'
def setup_rmq():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type='fanout')
channel.queue_declare(queue=QUEUE_NAME)
channel.queue_bind(exchange=EXCHANGE_NAME, queue=QUEUE_NAME)
return channel
def send_message(channel):
channel.basic_publish(exchange=EXCHANGE_NAME, routing_key='', body=MSG_BODY)
if __name__ == '__main__':
rmq_channel = setup_rmq()
while True:
start_ts = time.time()
send_message(rmq_channel)
dt = time.time() - start_ts
if dt < 1. / PUBLISH_FREQ:
time.sleep(1. / PUBLISH_FREQ - dt)
else:
logger.warning('Publishing took {}s'.format(dt))
flume/Dockerfile
Certainly, the Flume Dockerfile can be more elegant. But this is not the purpose of the demo. It’s just based on existing image.
FROM probablyfine/flume:latest
ENV FLUME_AGENT_NAME a1
ENV HADOOP_VERSION=3.1.1
ENV HADOOP_HOME=/opt/lib/hadoop-$HADOOP_VERSION
RUN mkdir /opt/lib && \
wget -q http://www.eu.apache.org/dist/hadoop/core/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz -O /opt/lib/hadoop-$HADOOP_VERSION.tar.gz
RUN tar xzf /opt/lib/hadoop-$HADOOP_VERSION.tar.gz -C /opt/lib
RUN rm /opt/lib/hadoop-$HADOOP_VERSION.tar.gz
RUN cp /opt/lib/hadoop-$HADOOP_VERSION/share/hadoop/tools/lib/hadoop-aws-$HADOOP_VERSION.jar /opt/flume/lib/
RUN rm /opt/flume/lib/httpclient-4.2.1.jar
RUN rm /opt/flume/lib/jackson*
COPY rabbitmq-flume-plugin-standalone-1.0.3.jar /opt/flume/lib/
COPY aws-java-sdk-1.11.347.jar /opt/flume/lib/
ADD flume-env.sh /opt/flume-config/flume-env.sh
ADD flume.conf /opt/flume-config/flume.conf
flume/flume.conf
a1.sources = rmq
a1.channels = mem_channel
a1.sinks = s3_sink
a1.sources.rmq.channels = sensor_channel
a1.sources.rmq.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.rmq.host = rabbitmq
a1.sources.rmq.port = 5672
a1.sources.rmq.virtual-host = /
a1.sources.rmq.queue = json_queue
a1.channels.mem_channel.type = memory
a1.channels.mem_channel.capacity = 5000
a1.channels.mem_channel.transactionCapacity = 2500
a1.sinks.s3_sink.type = hdfs
a1.sinks.s3_sink.hdfs.path = s3a://S3_KEY:S3_SECRET@BUCKET-NAME/data/
a1.sinks.s3_sink.hdfs.fileType = DataStream
a1.sinks.s3_sink.hdfs.filePrefix = data
a1.sinks.s3_sink.hdfs.writeFormat = Text
a1.sinks.s3_sink.hdfs.rollCount = 0
a1.sinks.s3_sink.hdfs.rollSize = 16777216
a1.sinks.s3_sink.hdfs.batchSize = 1000
a1.sinks.s3_sink.hdfs.rollInterval = 0
a1.sources.rmq.channels = mem_channel
a1.sinks.s3_sink.channel = mem_channel
You can clone the project from github and just run it with docker-compose up --build
. Try scaling a number of messages produced with docker-compose scale producer=N
.
Summary
I have run the whole system on a single t2.large EC2 instance and managed to scale it to three producers before CPU Credit Balance started to go down. That’s about 1,200 4KB messages per second at roughly about $80 a month. The cost according to my calculation with Kinesis Firehose is about $450. In case of production system, the benefit should be more significant since some of the resources were used by both the producers and the broker.
It’s not all roses, though. Flume needs to be carefully configured, tuned and it requires an implementation of some kind of monitoring. In addition, some of the components used in the demo might not be mature enough for a large-scale production usage.
And when it comes to a vendor lock-in – it is not a case if Flume is used.
Leave a Comment