2015-01-24

Cassandra Community Handling 100 000 req per second

Intro

Recently I got an assignment to prove that Cassandra cluster can hold up to 100 000 requests per second. Also all this had to be done on the budget and with not so much time spent on development of the whole application. This setup had to be as close to the real thing as possible. We will go trough the details soon. Here is just the basic overview of the experiment:

Amazon

Generating and handling the load on this scale requires the infrastructure that is usually not available within a personal budget so I turned to Amazon EC2. I listened about the EC2 for quite some time now and It turned out really easy to use. Basically All you have to do is to setup a security group and store the "pem" file for that security group. Really easy and if anybody didn't try it yet there is a free micro instance available for a whole year after registering. I won't go into details of how to setup the security group. It's all described in the DataStax documentation. Note that the security definition is a bit extensive and that defining the port range from 1024-65535 is sufficient for an inter group communication and I didn't expose any ports to the public as described in the documentation. The second part is generating the key pair. In the rest of the document I'll reference this file as "cassandra.pem".

Load

Generating the load on that scale is not as easy as it might seem. After some searching I've stumbled upon the following. So I came to a conclusion that the best solution is to use Tsung. I've setup the load generating machines with the following snippet. Note that I've placed the "cassandra.pem" file on the node from which I'll start running tsung. Read the node addresses from the aws console. The rest is pretty much here:

        # do this only for the machine from which you'll initiate tsung
        scp -i cassandra.pem cassandra.pem ec2-user@tsung_machine:~

        # connect to every load machine and install erlang and tsung
        ssh -i cassandra.pem ec2-user@every_load_machine

        # repeat this on every node
        sudo yum install erlang

        wget http://tsung.erlang-projects.org/dist/tsung-1.5.1.tar.gz
        tar -xvzf tsung-1.5.1.tar.gz
        cd tsung-1.5.1
        ./configure
        make
        sudo make install

        # you can close other load nodes now
        # go back to the first node. and move cassandra.pem to id_rsa
        mv cassandra.pem .ssh/id_rsa

        # now make an ssh connection from first tsung node to every
        # load generating machine (to add the host key) so that
        # the first tsung node won't have any problem connecting to
        # other nodes and issuing erlang commands to them
        ssh ip-a-b-c-d
        exit

        # create the basic.xml file on the first tsung node
        vi basic.xml
    

The second part with the load generating machines is to edit the basic.xml file. To make it more interesting we are going to send various kinds of messages with a timestamp. The users list will be predefined in a file userlist.csv. Note that the password is the same for all the users, you can adapt this to your own needs or completely remove the password:

        0000000001;pass
        0000000002;pass
        0000000003;pass
        ...
        ...
        ...
    

The tsung tool is well documented, the configuration I used is similar to this:

        <?xml version="1.0" encoding="utf-8"?>
        <!DOCTYPE tsung SYSTEM "/usr/share/tsung/tsung-1.0.dtd" []>
        <tsung loglevel="warning">

        <clients>
            <client host="ip-a-b-c-d0" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d1" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d2" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d3" cpu="8" maxusers="25"/>
        </clients>

        <servers>
            <server host="app-servers-ip-addresses-internal" port="8080" type="tcp"/>
            <!-- enter the rest of the app servers here-->
        </servers>

        <load>
            <arrivalphase phase="1" duration="11" unit="minute">
                <users maxnumber="100" arrivalrate="100" unit="second"/>
            </arrivalphase>
        </load>

        <options>
            <option name="file_server" id='id' value="userlist.csv"/>
        </options>

        <sessions>
            <session probability="100" name="load_session" type="ts_http">
                <setdynvars sourcetype="file" fileid="id" delimiter=";" order="iter">
                    <var name="username" />
                    <var name="pass" />
                </setdynvars>
                <setdynvars sourcetype="eval"
                            code="fun({Pid,DynVars}) -&gt;
                            {Mega, Sec, Micro} = os:timestamp(),
                            (Mega*1000000 + Sec)*1000 + round(Micro/1000)
                            end.
                            ">
                    <var name="millis" />
                </setdynvars>
                <for from="1" to="10000000" var="i">
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%ABC41.7127837,42.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%DEF43.7127837,44.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%GHI45.7127837,46.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%JKL47.7127837,48.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%MNO49.7127837,50.71278370000.0"  method="GET"/>
                    </request>
                </for>
            </session>
        </sessions>
        </tsung>
    

Resources

  • 3x c3.xlarge
  • 1x c4.xlarge
Note I've added c4 node because I was limited on the amazon with the number of instances I could boot.

App

I've spent most of the time on the app part when developing. The basics for the component handling the requests was netty listener. In one of my previous posts I described how to use netty to handle http requests and acknowledge them with HELLO message. Here I acknowledged them with OK.

The most complicated part with the messages was sending them to cassandra as fast as possible. The fastest way to send them is to use executeAsync. Initially I had trouble with it where I was loosing messages. Some of the issues were due to concurrency. Some were due to poor understanding of the DataStax driver.

Concurrency - Basically what I was doing was that I tried to save on instantiating the BoundStatement instances because of the overal speed. The BoundStatement is not thread safe and after calling the bind method it returns "this". It took me some time to figure this out because when used in loops this behavior is not dangerous. Anyway, thanks to colleague I figured it out.

        // always instantiate new in concurrent code
        // don't reuse and make multiple calls with .bind()!

        BoundStatement bs = new BoundStatement(insertStatement);
    

Asynchronous execution - also a bit tricky. The executeAsync returns a future. Initially I was just adding it to Futures.

        // don't do this under heavy load with the result of executeAsync
        // in Cassandra you will start to loose data

        Futures.addCallback(future, ...
    

After some trial and error I found a pattern where I didn't loose any data:

        // here we are going to keep the futures
        private ArrayBlockingQueue<ResultSetFuture> queue = 
            new ArrayBlockingQueue<>(10000);

        // in the handling code
        queue.add(session.executeAsync(bs));

        // when reaching 1000th element in the queue
        // start emptying it
        if (queue.size() % 1000 == 0) {
            ResultSetFuture elem;
            do {
                elem = queue.poll();
                if (elem != null) {
                    elem.getUninterruptibly();
                }
            } while (elem != null);
        }

        // this will make your insertions around
        // 4x faster when compared to normal execute
    

App setup

The instances come with Open JDK installed. This doesn't guarantee the best performance so I installed the Oracle java. In order not to loose the time on firewall setup I simply copied the "cassandra.pem" file to every node.

        # copy ".jar" and "cassandra.pem" file to a single app node
        # copy the two files from single node to other nodes
        # it's a lot faster then uploading to every node (at least on my connection)

        # setup the machine
        wget --no-check-certificate --no-cookies - --header "Cookie: oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/7u71-b14/jdk-7u71-linux-x64.tar.gz"
        
        tar -xvzf jdk-7u71-linux-x64.tar.gz

        sudo update-alternatives --install "/usr/bin/java" "java" "/home/ec2-user/jdk1.7.0_71/jre/bin/java" 1
        
        # pick the new java number in this step
        sudo update-alternatives --config java

        # check with this
        java -version
    

Resources

  • 2x c4.xlarge
  • 2x c4.2xlarge
  • 4x c3.xlarge
Note I've added c4 nodes because I was limited on the amazon with the number of instances I could boot. Also I had to request it with the customer service but I couldn't assume how many instances of every type I'll use so the instances are not of the same type for load and app servers.

Cassandra

Setting up the Cassandra is the easiest part of the whole undertaking. All I did was following this guide by DataStax.

Resources

  • 7x c3.2xlarge
After hanging on the 90 000 req/s for a while I came to conclusion that perhaps the replication factor of two might be too much for the resources I had available. I would probably need to further increase the number of Cassandra nodes but since I couldn't get any more instance up I've set the replication to 1. Notice that this replication factor does not allow loosing nodes in the cluster without loosing the data. But the goal here is 100 000 req/s on a budget :)

Results

In the end it took me around 30$ to reach the 100k limit. I'm afraid to calculate how much this setup would cost on a monthly or yearly basis.

The successful run looked like this:

Total messages: 31 145 914 messages
Checked number: 31 145 914 messages
Average: 103 809 req/s

Don't be afraid to send me an email if you have any questions what so ever ;)

2015-01-06

Netty 4 HTTP Hello World

Intro

Finding examples for netty took me a lot of time. Most of the time writing even the smallest portions of code requires you to go trough multiple sources ranging from youtube videos to official netty documentation. In this post I'll show you how to build a basic netty http hello world example. You probably won't have any trouble to continue and write your own app from here.

Dependencies

This example has just one dependency
http://mvnrepository.com/artifact/io.netty/netty-all/4.0.24.Final.

Setting Netty up

Create a class with a name of your choosing it doesn't really matter. This is a hello world example so I suggest writing a main method to run the example. We'll run the example on http port 80:

    public static void main(String[] args) 
        throws InterruptedException {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 200)
                .childOption(ChannelOption.ALLOCATOR,
                    PooledByteBufAllocator.DEFAULT)
                .childHandler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(
                                SocketChannel ch) throws Exception {
                                ChannelPipeline p = ch.pipeline();
                                p.addLast(new HttpRequestDecoder());
                                p.addLast(new HttpResponseEncoder());
                                p.addLast(new MySuperHttpHandler());
                            }
                        });

            ChannelFuture future = bootstrap.bind(80).sync();

            future.channel().closeFuture().sync();

        }
        finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();

            bossGroup.terminationFuture().sync();
            workerGroup.terminationFuture().sync();
        }

    }
    

Hello World HTTP Handler

    public class MySuperHttpHandler extends 
            SimpleChannelInboundHandler<Object> {
            private static final byte[] CONTENT = 
                {'H', 'E', 'L', 'L', 'O'};

            @Override
            public void channelReadComplete(
                ChannelHandlerContext ctx) {

                ctx.flush();
            }

            @Override
            public void channelRead0(ChannelHandlerContext ctx,
                    Object msg) {

                if (msg instanceof HttpRequest) {
                    HttpRequest req = (HttpRequest) msg;

                    String reqUrl = req.getUri();

                    System.out.println(reqUrl);

                    // do something further with request here ...

                    // this is the response part
                    if (HttpHeaders.is100ContinueExpected(req)) {
                        ctx.write(new DefaultFullHttpResponse(
                            HttpVersion.HTTP_1_1, 
                            HttpResponseStatus.CONTINUE));
                    }

                    boolean keepAlive = HttpHeaders.isKeepAlive(req);
                    FullHttpResponse response = 
                        new DefaultFullHttpResponse(
                            HttpVersion.HTTP_1_1,
                            HttpResponseStatus.OK,
                            Unpooled.wrappedBuffer(CONTENT));

                    response.headers().set(
                        HttpHeaders.Names.CONTENT_TYPE, "text/plain");
                    response.headers().set(
                        HttpHeaders.Names.CONTENT_LENGTH,
                        response.content().readableBytes());

                    if (!keepAlive) {
                        ctx.write(response)
                            .addListener(ChannelFutureListener.CLOSE);
                    } else {
                        response.headers().set(
                            HttpHeaders.Names.CONNECTION,
                            HttpHeaders.Values.KEEP_ALIVE);

                        ctx.write(response);
                    }
                }
            }

            @Override
            public void exceptionCaught(
                ChannelHandlerContext ctx, Throwable cause) {
                ctx.close();
            }
    }
    

Setting up Cassandra Cluster in Virtual Machines

Intro

From time to time having just one Cassandra instance installed on your machine is not enough because you want to test certain behaviors when Cassandra cluster is up and running. Having extra spare hardware on the side or processing time on amazon is not always an option. So it's a good idea to setup a simple cluster on your own machine with instances in virtual machines. This post is going to show you how to do it with VirtualBox.

Getting VirtualBox Images

The reason why I chose VirtualBox is that there are lot of free virtual images available. Most of the time you'll be installing Cassandra on a Linux machine. I decided to go with the CentOS. Head over to http://virtualboxes.org/images/centos/ and download CentOS-6.6-x86_64-minimal. The default settings are fine for every machine. Create couple of them, give them names so that you can differentiate between them (Node1, Node2, etc. ...).

Perhaps the best idea would be for you to setup one node first and then make copies afterwards. Do not forget to set the network to bridged adapter. The username and password for the virtual machines are probably set to "root/reverse" but check those options when downloading the virtual box image. To keep it short I'll just continue with using the root user. When doing things in production it's an extremely bad practice.

Setup networking

When importing .ova file virtual box is going to ask you if you want to reinitialize mac address. Check that option. There is a certain amount of buggy behavior when it comes down to networking. So to prevent those errors run the following command when logging in to the virtual machine (root/reverse):

        rm  /etc/udev/rules.d/70-persistant-net.rules
    
When VirtualBoxinitializes the networking on the virtual machine it put a new mac address to a file. There seems to be a bug where this mac address is not transferred from that file to the virtual machine settings. Run the following command and copy the MAC Address.
        cat /etc/sysconfig/network-scripts/ifcfg-eth0
    
Shutdown the machine and set the mac address under Settings > Network > Advanced > MAC Address

Install Java

Just to make things a bit easier we're going to install wget:

        yum install wget
    
Now we are going to install java:
        $ cd /opt/
        $ wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/7u72-b14/jdk-7u72-linux-x64.tar.gz"
        $ tar xzf jdk-7u72-linux-x64.tar.gz
        $ rm jdk-7u72-linux-x64.tar.gz

        $ cd /opt/jdk1.7.0_72/

        $ alternatives --install /usr/bin/java java /opt/jdk1.7.0_72/bin/java 2
        $ alternatives --config java

        $ alternatives --install /usr/bin/jar jar /opt/jdk1.7.0_72/bin/jar 2
        $ alternatives --install /usr/bin/javac javac /opt/jdk1.7.0_72/bin/javac 2
        $ alternatives --set jar /opt/jdk1.7.0_72/bin/jar
        $ alternatives --set javac /opt/jdk1.7.0_72/bin/javac

        $ vi /etc/profile.d/java.sh
        export JAVA_HOME=/opt/jdk1.7.0_72
        export JRE_HOME=/opt/jdk1.7.0_72/jre
        export PATH=$PATH:/opt/jdk1.7.0_72/bin:/opt/jdk1.7.0_72/jre/bin
    
reboot (and check with echo $JAVA_HOME[enter])

Install Cassandra

Cassandra is installed and run by the following commands:

        $ cd /opt/
        $ wget http://downloads.datastax.com/community/dsc-cassandra-2.1.2-bin.tar.gz
        $ tar xzf dsc-cassandra-2.1.2-bin.tar.gz
        $ rm dsc-cassandra-2.1.2-bin.tar.gz

        [check ip address with ifconfig]

        $ cd conf

        $ vi cassandra.yaml
            rpc_address: ip address of the node
            broadcast_address: ip address of the node
            - seeds: ip_address of the first node

        $ cd ../bin
        $ ./cassandra
    

Firewall settings

The cluster will not work out of the box because of the firewall settings. To start everything you will need to enable the following ports:

        $ iptables -I INPUT -p tcp -m tcp --dport 9042 -j ACCEPT
        $ iptables -I INPUT -p tcp -m tcp --dport 7000 -j ACCEPT
        $ iptables -I INPUT -p tcp -m tcp --dport 7001 -j ACCEPT
        $ iptables -I INPUT -p tcp -m tcp --dport 7199 -j ACCEPT

        $ /etc/init.d/iptables save

        $ service iptables restart
    
Now make copies of this machine and update cassandra.yaml file with the ip addresses of the new machines. Also do check /var/log/cassandra/system.log to see if other nodes are joining in.