2015-12-13

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 1

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Part 1 - Overview

Before starting any project I like to make a few drawings, just to keep everything in perspective. My main motivation for this series is to get better acquainted wit Apache Kafka. I just didn't have a chance to use it on some of the projects that I work on in my day to day life, but it's this new technology everybody is buzzing about so I wanted to give it a try. One other thing is that I also didn't get a chance to write Spark Streaming applications, so why not hit two birds with one stone? Here is 10 000 feet overview of the series:

Avoiding the tl;dr

Part of the motivation for splitting is in avoiding the tl;dr effect ;) Now, let's get back to the overview. We'll break down previous image box by box.

Using Spring Boot

We're basically just prototyping here, but to keep everything flexible and in the spirit of the newer architectural paradigms like Microservices the post will be split in 5 parts. The software will also be split so we won't use any specific container for our applications we'll just go with Spring Boot. In the posts we won't go much over the basic, you can always look it up in the official documentation.

Apache Kafka

This is the reason why I'm doing this in the first place. It's this new super cool messaging system that all the big players are using and I want to learn how to put it to everyday use.

Spark Streaming

For some time now I'm doing a lot of stuff with Apache Spark. But somehow I didn't get a chance to look into streaming a little bit better.

Cassandra

Why not?

What this series is about?

It's a year where everybody is talking about voting ... literary everywhere :) so let's make a voting app. In essence it will be a basic word count in the stream. But let's give some context to it while we're at it. We won't do anything complicated or useful. Basically the end result will be total count of token occurrence in the stream. We'll also break a lot of best practices in data modeling etc. in this series.

Series is for people oriented toward learning something new. I guess experienced and battle proven readers will find a ton of flaws in the concept but again most of them are deliberate. One thing I sometimes avoid in my posts is including source code. My opinion is that a lot more remains remembered and learners feel much more comfortable when faced with problems in practice. So I'll just copy paste crucial code parts. One more assumption from my side will be that the readers will be using IntelliJ IDEA. Let's got to Part 2 and see how to setup kafka.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 2

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Setting up Kafka

In this section we'll setup two kafka brokers. We'll also need a zookeeper. If you are reading this my guess is that you don't have one setup already so we'll use the one bundled with kafka. We won't cover everything here. Do read the official documentation for more in depth understanding.

Downloading

Download latest Apache Kafka. In this tutorial we'll use binary distribution. Pay attention to the version of scala if you attend to use kafka with specific scala version. In this tutorial we'll concentrate more on Java. But this will be more important in parts to come. In this section we'll use the tools that ship with Kafka distribution to test everything out. Once again download and extract the distribution of Apache Kafka from official pages.

Configuring brokers

Go into directory where you downloaded and extracted your kafka installation. There is a properties file template and we are going to use properties files to start the brokers. Make two copies of the file:

        $ cd your_kafka_installation_dir
        $ cp config/server.properties config/server0.properties
        $ cp config/server.properties config/server1.properties
    
Now use your favorite editor to make changes to broker configuration files. I'll just use vi, after all it has been around for 40 years :)
        $ vi config/server0.properties
    
Now make changes (check if they are set) to following properties:
        broker.id=0
        listeners=PLAINTEXT://:9092
        num.partitions=2
        log.dirs=/var/tmp/kafka-logs-0
    
Make the changes for the second node too:
        $ vi config/server1.properties
    
        broker.id=1
        listeners=PLAINTEXT://:9093
        num.partitions=2
        log.dirs=/var/tmp/kafka-logs-1
    

Starting everything up

First you need to start the zookeeper, it will be used to store the offsets for topics. There are more advanced versions of using where you don't need it but for someone just starting out it's much easier to use zookeeper bundled with the downloaded kafka. I recommend opening one shell tab where you can hold all of the running processes. We didn't make any changes to the zookeeper properties, they are just fine for our example:

        $ bin/zookeeper-server-start.sh config/zookeeper.properties &
    
From the output you'll notice it started a zookeeper on default port 2181. You can try telnet to this port on localhost just to check if everything is running fine. Now we'll start two kafka brokers:
        $ bin/kafka-server-start.sh config/server0.properties &
        $ bin/kafka-server-start.sh config/server1.properties &
    

Creating a topic

Before producing and consuming messages we need to create a topic for now you can think of it as of queue name. We need to give a reference to the zookeeper. We'll name a topic "votes", topic will have 2 partitions and a replication factor of 2. Please read the official documentation for further explanation. You'll see additional output coming from broker logs because we are running the examples in the background.

        $ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic votes --partitions 2 --replication-factor 2
    

Sending and receiving messages with bundled command line tools

Open two additional shell tabs and position yourself in the directory where you installed kafka. We'll use one tab to produce messages. And second tab will consume the topic and will simply print out the stuff that we typed in in the first tab. Now this might be a bit funny, but imagine you are actually using kafka already!

In tab for producing messages run:

        $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic votes
    

In tab for consuming messages run:

        $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic votes
    

Next part

We covered a lot here but writing from one console window to another can be achieved wit far simpler combination of shell commands. In Part 3 we'll make an app that writes to a topic. We'll also use console reader just to verify that our app is actually sending something to topic.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 3

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Writing a Spring Boot Kafka Producer

We'll go over the steps necessary to write a simple producer for a kafka topic by using spring boot. The application will essentially be a simple proxy application and will receive a JSON containing the key that's going to be sent to kafka topic. Pretty simple but enough to get us going. We'll use IntelliJ IDEA to set everything up. The easiest way to get started is by using Spring Initializr.

Setting up a project

  1. Project SDK: Java 8
  2. Initializr Service URL: https://start.spring.io
  3. Next
  4. Name: spring-boot-kafka-example
  5. Type: Gradle Project
  6. Packaging: Jar
  7. Java Version: 1.8
  8. Language: Java
  9. Group: com.example
  10. Artifact: spring-boot-kafka-example
  11. Vesion: 0.0.1-SNAPSHOT
  12. Description: Spring Boot Kafka Example
  13. Package: com.example
  14. Next
  15. Spring Boot Version: 1.3
  16. Core - Web
  17. Next
  18. Project name: spring-boot-kafka-example
  19. The rest is just fine ...
  20. Finish
  21. After creating project check sdk setting, it should be java 8

build.gradle dependencies

        compile('org.apache.kafka:kafka_2.11:0.9.0.0')
        compile('org.apache.zookeeper:zookeeper:3.4.7')
    

application.properties

        brokerList=localhost:9092
        sync=sync
        topic=votes
    

SpringBootKafkaProducer

This is the class where all the important stuff is happening

package com.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

@Configuration
public class SpringBootKafkaProducer {

    @Value("${brokerList}")
    private String brokerList;

    @Value("${sync}")
    private String sync;

    @Value("${topic}")
    private String topic;

    private Producer<String, String> producer;

    public SpringBootKafkaProducer() {
    }

    @PostConstruct
    public void initIt() {
        Properties kafkaProps = new Properties();

        kafkaProps.put("bootstrap.servers", brokerList);

        kafkaProps.put("key.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("acks", "1");

        kafkaProps.put("retries", "1");
        kafkaProps.put("linger.ms", 5);

        producer = new KafkaProducer<>(kafkaProps);

    }

    public void send(String value) throws ExecutionException, 
            InterruptedException {
        if ("sync".equalsIgnoreCase(sync)) {
            sendSync(value);
        } else {
            sendAsync(value);
        }
    }

    private void sendSync(String value) throws ExecutionException,
            InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
        producer.send(record).get();

    }

    private void sendAsync(String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);

        producer.send(record, (RecordMetadata recordMetadata, Exception e) -> {
            if (e != null) {
                e.printStackTrace();
            }
        });
    }
}
    

SpringBootKafkaExampleApplication

This one will be automatically generated.

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootKafkaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaExampleApplication.class, args);
    }
}
    

AppBeans

Setup beans for the controller.

package com.example;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppBeans {

    @Bean
    public SpringBootKafkaProducer initProducer() {
        return new SpringBootKafkaProducer();
    }
}
    

Helper beans

Status to return to clients, we'll just send "ok" every time.

package com.example;

public class Status {
    private String status;

    public Status(String status) {
        this.status = status;
    }

    public Status() {
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }
}
    
This will be the input to our app
package com.example;

public class Vote {
    private String name;

    public Vote(String name) {
        this.name = name;
    }

    public Vote() {
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
    

SpringBootKafkaController

This is the controller, after starting the app we should have an active endpoint available under http://localhost:8080/vote

package com.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

@RestController
public class SpringBootKafkaController {

    @Autowired
    SpringBootKafkaProducer springBootKafkaProducer;

    @RequestMapping("/vote")
    public Status vote(@RequestBody Vote vote) throws ExecutionException, InterruptedException {

        springBootKafkaProducer.send(vote.getName());

        return new Status("ok");
    }

}
    

Checking everything

There should be an active console reader from previous post so we won't cover this. After running the SpringBootKafkaExampleApplication simply open a rest client application like Postman and try to send the following JSON to http://localhost:8080/vote

{
    "name": "Test"
}
    
If everything was fine you should see the name that you send in this json in the console consumer. In Part 4 we are going to go over how to pickup the data from kafka with spark streaming, combine them with data in cassandra and push them back to cassandra.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 4

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Consuming Kafka data with Spark Streaming and Output to Cassandra

In this section we are going to use spark streaming to read the data in coming from kafka. We'll also combine it with the data already in cassandra, we're going to do some computation with it and we're going to put the results back to cassandra. The best practice would be to have a spark cluster running but for the sake of simplicity we are going to launch local spark context from a java application and do some processing there. We won't go into configuring Cassandra to run, there is plenty documentation there and it takes just minutes to setup.

Cassandra

Nothing fancy here, just a name of the entity for votes and a number of votes

CREATE KEYSPACE voting
    WITH REPLICATION = {
        'class' : 'SimpleStrategy',
        'replication_factor' : 1
    };

USE voting;

CREATE TABLE votes (name text PRIMARY KEY, votes int);
    

Let's create a simple java project with gradle for stream processing

  1. File, New Project, Gradle
  2. Project SDK: Java 8
  3. Java
  4. Next
  5. GroupId: spark-kafka-streaming-example
  6. ArtifactId: spark-kafka-streaming-example
  7. Version: 1.0-SNAPSHOT
  8. Next
  9. Use default gradle wrapper
  10. Next
  11. Project name: spark-kafka-streaming-example
  12. The rest is just fine ...
  13. Finish
  14. After creating project check sdk setting, it should be java 8

Let's have a look at the dependencies

group 'spark-kafka-streaming-example'
version '1.0-SNAPSHOT'

apply plugin: 'java'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile('org.apache.spark:spark-core_2.10:1.5.2')
    compile('org.apache.spark:spark-streaming_2.10:1.5.2')
    compile('org.apache.spark:spark-streaming-kafka_2.10:1.5.2')
    compile('com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3')
    compile('com.datastax.spark:spark-cassandra-connector-java_2.10:1.5.0-M3')

    testCompile group: 'junit', name: 'junit', version: '4.11'
}
    

Simple Voting Class to go with Cassandra Table

We'll use this class for storing data into cassandra

import java.io.Serializable;

public class Vote implements Serializable {
    private String name;
    private Integer votes;

    public Vote(String name, Integer votes) {
        this.name = name;
        this.votes = votes;
    }

    public Vote() {
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getVotes() {
        return votes;
    }

    public void setVotes(Integer votes) {
        this.votes = votes;
    }
}
    

Spark streaming with kafka

And finally the code to accept tokens that come in, compare them with data in cassandra and then write them back to cassandra. I didn't spend much time around configuring the class for external parameters, but for the example it's good enough:

import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

public class SparkStreamingExample {

    public static JavaSparkContext sc;

    public static void main(String[] args) throws IOException {

        String brokers = "localhost:9092,localhost:9093";
        String topics = "votes";

        SparkConf sparkConf = new SparkConf();
        sparkConf.setMaster("local[2]");
        sparkConf.setAppName("SparkStreamingExample");
        sparkConf.set("spark.cassandra.connection.host",
            "127.0.0.1");

        JavaStreamingContext jssc = new JavaStreamingContext(
            sparkConf,
            Durations.seconds(10));

        HashSet<String> topicsSet = new HashSet<>(
                Arrays.asList(topics.split(",")));
        HashMap<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers);

        JavaPairInputDStream<String, String> messages =
                KafkaUtils.createDirectStream(
                        jssc,
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        kafkaParams,
                        topicsSet
                );

        JavaDStream<String> lines =
                messages.map(
                        (Function<Tuple2
                        <String, String>,
                        String>) Tuple2::_2);

        JavaPairDStream<String, Integer> voteCount = lines
            .mapToPair(
                (PairFunction<String, String, Integer>) s ->
                        new Tuple2<>(s, 1)).reduceByKey(
                (Function2<Integer, Integer, Integer>)
                    (i1, i2) ->i1 + i2);

        sc = jssc.sparkContext();

        voteCount.foreachRDD((v1, v2) -> {
            v1.foreach((x) -> {
                CassandraTableScanJavaRDD<CassandraRow> previousVotes =
                    javaFunctions(sc)
                        .cassandraTable("voting", "votes")
                        .where("name = '" + x._1() + "'");

                Integer oldVotes = 0;
                if (previousVotes.count() > 0) {
                    oldVotes = 
                        previousVotes.first().getInt("votes");
                }

                Integer newVotes = oldVotes + x._2();

                List<Vote> votes = Arrays.asList(
                    new Vote(x._1(), newVotes));
                JavaRDD<Vote> rdd = sc.parallelize(votes);

                javaFunctions(rdd)
                    .writerBuilder("voting", "votes", mapToRow(Vote.class))
                    .saveToCassandra();
            });

            return null;
        });

        voteCount.print();

        jssc.start();
        jssc.awaitTermination();
    }
}
    

And that's it

You can check how data changes by running select statements from voting table. In Part 5 we are going to make a simple spring boot project that displays and sorts the voting data.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 5

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Displaying Cassandra Data With Spring Boot

Now that we have our voting data in Cassandra let's write a simple Spring Boot project that simply gathers all the data from cassandra sorts them and displays to user.

Setting up a project

  1. Project SDK: Java 8
  2. Initializr Service URL: https://start.spring.io
  3. Next
  4. Name: boot-cassandra-data-show
  5. Type: Gradle Project
  6. Packaging: Jar
  7. Java Version: 1.8
  8. Language: Java
  9. Group: com.example
  10. Artifact: boot-cassandra-data-show
  11. Vesion: 0.0.1-SNAPSHOT
  12. Description: Spring Boot Display Cassandra Data
  13. Package: com.example
  14. Next
  15. Spring Boot Version: 1.3
  16. Core - Web
  17. Template Engines - Mustache
  18. Next
  19. Project name: boot-cassandra-data-show
  20. The rest is just fine ...
  21. Finish
  22. After creating project check sdk setting, it should be java 8

Cassandra dependencies

compile('com.datastax.cassandra:cassandra-driver-core:2.1.9')
    

Vote class

We'll use this class to map rows from cassandra.

package com.example;

import java.io.Serializable;

public class Vote implements Serializable {
    private String name;
    private Integer votes;

    public Vote(String name, Integer votes) {
        this.name = name;
        this.votes = votes;
    }

    public Vote() {
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getVotes() {
        return votes;
    }

    public void setVotes(Integer votes) {
        this.votes = votes;
    }
}
    

application.properties

server.port = 8090
contactPoint = 127.0.0.1
keyspace = voting
    

CassandraSessionManager

This bean is used to setup connection towards Cassandra

package com.example;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Configuration
public class CassandraSessionManager {

    private Session session;
    private Cluster cluster;

    @Value("${contactPoint}")
    private String contactPoint;

    @Value("${keyspace}")
    private String keyspace;

    public CassandraSessionManager() {

    }

    public Session getSession() {
        return session;
    }

    @PostConstruct
    public void initIt() {
        cluster = Cluster.builder().addContactPoint(
            contactPoint).build();
        session = cluster.connect(keyspace);
    }

    @PreDestroy
    public void destroy() {
        if (session != null) {
            session.close();
        }
        if (cluster != null) {
            cluster.close();
        }
    }
}

    

BootCassandraDataShowApplication

Automatically generated ...

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BootCassandraDataShowApplication {

    public static void main(String[] args) {
        SpringApplication.run(
        BootCassandraDataShowApplication.class, args);
    }
}
    

AppBeans

Bean for holding configured objects.

package com.example;

import com.datastax.driver.core.Session;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppBeans {

    @Bean
    public Session session() {
        return sessionManager().getSession();
    }

    @Bean
    public CassandraSessionManager sessionManager() {
        return new CassandraSessionManager();
    }
}

    

Web Controller

package com.example;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;

@Configuration
@Controller
public class WelcomeController {

    @Autowired
    Session session;

    @RequestMapping("/")
    public String welcome(Map<String, Object> model) {

        final ResultSet rows = session.execute("SELECT * FROM votes");

        ArrayList results = new ArrayList<>();

        for (Row row : rows.all()) {
            results.add(new Vote(
                row.getString("name"),
                row.getInt("votes")
            ));
        }

        Collections.sort(results, (a, b) ->
        b.getVotes().compareTo(a.getVotes()));

        model.put("results", results);

        return "welcome";
    }
}
    

Template to show the results

<!DOCTYPE html>
<html lang="en">
<body>

<h1>Voting results:</h1>
<br/>
{{#results}}
    <strong>{{this.name}}</strong> {{this.votes}} <br/>
{{/results}}

</body>
</html>
    

That's all folks

Now this app might not seem as a lot, but there's a kafka cluster that receives messages comming in from a spring boot app that exposes REST interface. Messages that come in from kafka are then processed with Spark Streaming and then sent to Cassandra. There is another Spring Boot app that sorts and displays results to the users. This small tutorial covers most of the cool java/big data technologies now-days. Special thanks to the readers that went through all five parts of this tutorial ;)

2015-11-30

Cassandra TIme Series Bucketing

Intro

Bucketing is one of the most important techniques when working with time series data in Cassandra. This post has it's roots in two very popular blog entries:

The posts are very well written and the pretty much describe all of the standard techniques when it comes down to working with time series data in Cassandra. But to be honest there isn't all that much code in them. This is partly to a fact that almost every project has it's own specifics and from my experience it often happens that even within a relatively small team there will be multiple implementations on how to bucket and access the time series data.

The Case for Bucketing

For some time now I'm in the world if IoT and I find that explaining everything with a help of a simple temperature sensor is the best method to discuss the subject. Previously mentioned articles are also a good read. This section is sort of a warm up. Theoretically in most of the use cases we'll want to access temperature readings by some sensor Id and we know where this sensor is located. In the most simple case sensor id becomes the long row in cassandra and the readings are stored in it and kept sorted by time etc. However in some cases the temperature may be read very often and this could cause the wide row to grow to a proportion that is not manageable by cassandra so the data has to be split among multiple long rows. The easiest method to make this split is to make multiple long rows based on the measurement timestamp.

How big should my buckets be?

It may vary from project to project, but it depends on two important factors. How many readings are you storing per single measurement and how often the measurement is happening. For instance if you are recording a reading once per day you probably don't even need the bucketing. Also if you are recording it once per hour the project you are working on probably wont't last long enough for you to run into problem. It applies to seconds too, but only for the most trivial case where you are making a single reading. If you go into frequencies where something is happening on the milliseconds level you will most definetly need bucketing. The most complex project I worked up until now had time bucketing on a level of a single minute. meaning every minute, new bucket. But that project is not in the IoT world, In that world I'm using partitions on a month basis.

10 000 feet Bucketing View

Main problem is how to calculate the bucket based on measurement time stamp. Also keep in mind there might be differences between the timezones, in a distributed system a very advisable practice is to save everything in the UTC format. If we decided that wee need bucketing per day it could be something as simple as the following:

    FastDateFormat dateFormat = FastDateFormat.getInstance(
        "yyyy-MM-dd", TimeZone.getTimeZone("UTC"));

    public String dateBucket(Date date) {
        return dateFormat.format(date;
    }
    
That's it, combine this with your sensor Id and you get buckets on a day level basis. Now the problem is how to retrieve the measurements from buckets. Especially if you have to fetch the measurements across multiple buckets. We'll go over this in the next section.

Anything goes

Bare in mind that you should keep buckets in time series data easy to maintain. Also try to avoid having multiple implementation for the same thing in your code base. This section will not provide 100% implemented examples but will be more on a level of a pseudo code.

When you are fetching the data from the buckets, you will have two types of query. One is to fetch data out from the bucket without any restrictions on measurement time stamp. The other is when you will want to start from a certain position within the bucket. Again there is a question of ordering and sorting the retrieved data. I worked in systems having all sorts of practices there, most of the time reversing was done with a help of a specific boolean flag but my opinion is this should be avoided. It's best to stick to the from and to parameters and order the data according to them. i.e.

        from:   01/01/2016
        to:     02/02/2016
        returns: ascending

        from:   02/02/2016
        to:     01/01/2016
        returns: descending
    
That way you don't have to break you head and think about various flags passed over the levels in your code.

Here is a bit of pseudo code:

        // constructor of your iterator object

        startPartition = dateBucket(from);
        endPartition = dateBucket(to);

        lastFetchedToken = null;

        bucketMoveCount = 0;

        String statement = "SELECT * FROM readings"

        // from past experience, somehow the driver takes out data the fastest
        // if it fetches 3000 items at once, would be interesting to drill down
        // why is this so :)

        int fetchSize = 3000;

        if (from.isBefore(to)) {
            select = statement + " ORDER BY measurement_timestamp ASC LIMIT " + fetchSize;
            selectFromBoundary = statement + " AND measurement_timestamp > ? ORDER BY measurement_timestamp ASC LIMIT " + fetchSize;

            partitionDiff = -1f;
        } else {
            selectNormal = statement + " LIMIT " + fetchSize;
            selectFromBoundary = statement + " AND measurement_timestamp < ? LIMIT " + fetchSize;

            partitionDiff = 1f;
        }
    
Partition could move by hour, day, minute. It all depends on how you decide to implement it. You will have to do some time based calculations there I recommend using Joda-Time there. Now when you defined how init of an iterator looks like, it's time to do some iterations over it:
    public List<Row> getNextPage() {

        List<Row> resultOut = new ArrayList<>();

        boolean continueFromPreviousBucket = false;

        do {
            ResultSet resultSet =
                    lastFetchedToken == null ?
                            session.execute(new SimpleStatement(select, currentBucket)) :
                            session.execute(new SimpleStatement(selectFromBoundary, currentBucket, lastToken));

            List<Row> result = resultSet.all();

            if (result.size() == fetchSize) {
                if (continueFromPreviousBucket) {
                    resultOut.addAll(result.subList(0, fetchSize - resultOut.size()));
                } else {
                    resultOut = result;
                }

                lastFetchedToken = resultOut.get(resultOut.size() - 1).getUUID("measurement_timestamp");

            } else if (result.size() == 0) {
                currentBucket = calculateNextBucket();
                bucketMoveCount++;

            } else if (result.size() < fetchSize) {
                currentBucket = calculateNextBucket();
                bucketMoveCount++;

                lastFetchedToken = null;

                if (continueFromPreviousBucket) {
                    resultOut.addAll(result.subList(0, Math.min(result.size(), fetchSize - resultOut.size())));
                } else {
                    resultOut = result;
                }

                continueFromPreviousBucket = true;
            }

            if (resultOut.size() == fetchSize
                    || bucketMoveCount >= MAX_MOVE_COUNT
                    || Math.signum(currentBucket.compareTo(endPartition)) != okPartitionDiff) {
                break;
            }

        } while (true);

        return result;
    }
    

This is just a high level overview of how to move among the buckets. Actual implementation would actually be significantly different from project to project. My hope for this post is that you give the problems I faced a thought before you run into them.

2015-11-07

Spring Data Cassandra vs. Native Driver

Intro

For some time now spring data with cassandra is getting more and more popular. My main concern with the framework is performance characteristics when compared to native cql driver. After all with the driver everything is under your control and one can probably squeeze much more juice out of cluster. O.k. I admit it's not always about performance. If that would be the case we would all be writing software in C or assembler. But still I think it's a good practice to be aware of the drawbacks.

To be honest spring data cassandra is relatively new to me. I did the performance comparison on the lowest level without using repositories and other high level concepts that come with spring data cassandra. My focus in this post is more on the generics that decode the data that comes out from the driver. To make a comparison I'm going to use a simple cassandra table (skinny row), then I'm going to make query after query (5000 and 10000) towards cassandra and after that I'll decode results. Once again the focus in this post is not on performance characteristics of higher order functionalities like paged queries etc. I just wanted to know by a rule of thumb what can I expect from spring data cassandra.

Setup

    -- simple skinny row
    CREATE TABLE activities (
        activity_id uuid,
        activity_model_id bigint,
        activity_state text,
        asset_id text,
        attrs map<text, text>,
        creation_time timestamp,
        customer_id text,
        end_time timestamp,
        last_modified_time timestamp,
        person_id text,
        poi_id text,
        start_time timestamp,
        PRIMARY KEY (activity_id)
    );

    
To eliminate all possible effects, I just used single skinny row:
    activity_id 72b493f0-e59d-11e3-9bd6-0050568317c1
    activity_model_id 66
    activity_state DONE
    asset_id 8400848739855200000
    attrs {
        'businessDrive': '1:1',
        'customer': '4:test_test_test',
        'distance': '3:180', 
        'endLocation': '6:15.7437466839,15.9846853333,0.0000000000',
        'fromAddress': '4:XX1', 
        'locked': '1:0', 
        'reason': '4:Some reason 2', 
        'startLocation': 
        '6:15.7364385831,15.0071729736,0.0000000000', 
        'toAddress': '4:YY2'
        }
    creation_time 2014-05-27 14:50:14+0200
    customer_id 8400768435301400000
    end_time 2014-05-27 12:15:40+0200
    last_modified_time 2014-05-29 21:30:44+0200
    person_id 8401111750365200000
    poi_id null
    start_time 2014-05-27 12:13:05+0200
    
This row is fetched every time, to detect differences We'll see how long the iterations last. Network and cluster is also out of scope so everything was tested on local running datastax cassandra community (2.0.16) instance.

The code

To separate all possible interfering effects I used two separate projects. I had a situation where I used an old thrift api together with cql driver and it significantly affected performance. And it required additional configuration parameters etc. The main code snippets are located on gist. This is not the focus here, but if somebody is interested:

spring-data
native-drivers

Results in milliseconds

    3 fields - 5000 items
        spring-data
        5381
        5282
        5385
        avg: 5339

        driver
        4426
        4280
        4469
        avg: 4390

        result: driver faster 21.6%

    3 fields - 10000 items
        spring-data
        8560
        8133
        8144
        avg: 8279

        driver
        6822
        6770
        6875
        avg: 6822
        
        result: driver faster 21.3%

    12 fields - 5000 items
        spring-data
        5911
        5920
        5928
        avg: 5920 - 10.88 % slower than with 3 fields!

        driver
        4687
        4669
        4606
        avg: 4654 - 6 % slower than with 3 fields

        result: driver faster 27%

Conclusions

Spring data cassandra may be very interesting if you are interested to learn something new. It might also have very positive development effects when prototyping or doing something similar. I didn't test the higher order functionalities like pagination etc. This was just a rule of a thumb test to see what to expect. Basically the bigger the classes that you have to decode the bigger the deserialization cost. At least this is the effect I'm noticing in my basic tests.

Follow up with Object Mapping available in Cassandra driver 2.1

There was an interesting follow up disuccion on reddit. By a proposal from reddit user v_krishna another candidate was added to comparison Object-mapping API.

Let's see the results:

    3 fields - 5000 items
        spring-data
        5438
        5453
        5576
        avg: 5489

        object-map
        5390
        5299
        5476
        avg: 5388

        driver
        4382
        4410
        4249
        avg: 4347

    conclusion
        - driver 26% faster than spring data
        - object map just under 2% faster than spring data

    3 fields - 10000 items
        spring-data
        8792
        8507
        8473
        avg: 8591

        object-map
        8435
        8494
        8365
        avg: 8431

        driver
        6632
        6760
        6646
        avg: 6679

    conclusion
        - driver faster 28.6% than spring data
        - object mapping just under 2% faster than spring data

    12 fields 5000 items
        spring-data
        6193
        5999
        5938
        avg: 6043

        object-map
        6062
        5936
        5911
        avg: 5970

        driver
        4910
        4955
        4596
        avg: 4820

    conclusion
        - driver 25% faster than spring data
        - object mapping 1.2% faster than spring data

To keep everything fair, there was some deviation in test runs when compared to previous test, here are deviations:

comparison with first run:
    3 fields - 5000 items
        spring-data
        avg1: 5339
        avg2: 5489
        2.7% deviation

        driver
        avg1: 4390
        avg2: 4347
        1% deviation

    3 fields - 10000 items
        spring-data
        avg1: 8279
        avg2: 8591
        3.6% deviation

        driver
        avg1: 6822
        avg2: 6679
        2.1% deviation

    12 fields 5000 items
        spring-data
        avg1: 5920
        avg2: 6043
        2% deviation

        driver
        avg1: 4654
        avg2: 4820
        3.4% deviation
Object mapping from spring data seems to be just a bit slower then object mapping available in new driver. I can't wait to see the comparison of two in future versions. Initially I was expecting around 5-10% percent worse performance when compared to object mapping capabilities. It surprised me a bit that the difference was more on the level of 25%. So if you are planning on using object mapping capabilities there is a performance penalty.

2015-05-07

Analysis of Cassandra powered Greenhouse with Apache Spark

Intro

In the previous post we went over the steps for gathering the data on the Rasperry pi.

  1. Gather Data on Raspberry Pi with Cassandra and Arduino
  2. Arduino Greenhouse
In this post I'm going to go over the steps necessary to get the data into Cassandra and then process it with Apache Spark.

Cassandra queries

    -- we'll keep the data on just one node
    CREATE KEYSPACE home
    WITH REPLICATION = {
        'class' : 'SimpleStrategy',
        'replication_factor' : 1
    };
    
    -- create statement, bucketed by date
    CREATE TABLE greenhouse (
        source text,
        day text,
        time timestamp,
        temperaturein decimal,
        temperatureout decimal,
        temperaturecheck decimal,
        humidity decimal,
        light int,
        PRIMARY KEY ((source, day), time)
    )
    WITH CLUSTERING ORDER BY (time DESC);
    
    -- example insert, just to check everything out
    INSERT INTO greenhouse (
        source, day, time, temperaturein,
        temperatureout, temperaturecheck,
        humidity, light)
    VALUES ('G', '2015-04-04', dateof(now()), 0,
        0, 0, 0, 0);
    
    -- check if everything is inserted
    SELECT * FROM greenhouse WHERE source = 'G' AND day = '2015-04-19';
    

Analysis results

I wanted to keep the partitions relatively small because I didn't know how RaspberryPi is going to handle the data. Timeout is possible if the rows get to big so I went with the partitioning the data by day. The analysis of the April showed that the project paid off. Here are the results of analysis:

Total Data points(not much, but it's a home DIY solution after all)
172651

First record
Measurement{source='G', day='2015-04-04', time=Sat Apr 04 17:04:41 CEST 2015, temperaturein=11.77, temperatureout=10.43, temperaturecheck=15.0, humidity=46.0, light=57}

Last record
Measurement{source='G', day='2015-05-04', time=Mon May 04 09:37:35 CEST 2015, temperaturein=22.79, temperatureout=20.49, temperaturecheck=23.0, humidity=31.0, light=68}

Cold nights(bellow 2 C outside)
2015-04-06
2015-04-07
2015-04-10
2015-04-16
2015-04-17
2015-04-18
2015-04-19
2015-04-20

Lowest In
Measurement{source='G', day='2015-04-06', time=Mon Apr 06 06:22:25 CEST 2015, temperaturein=2.28, temperatureout=2.39, temperaturecheck=4.0, humidity=41.0, light=8}

Highest In
Measurement{source='G', day='2015-04-22', time=Wed Apr 22 14:52:26 CEST 2015, temperaturein=75.53, temperatureout=43.53, temperaturecheck=71.0, humidity=21.0, light=84}

Average In
19.45

Lowest Out
Measurement{source='G', day='2015-04-20', time=Mon Apr 20 04:42:16 CEST 2015, temperaturein=4.48, temperatureout=-2.88, temperaturecheck=6.0, humidity=31.0, light=0}

Highest Out
Measurement{source='G', day='2015-04-22', time=Wed Apr 22 15:58:32 CEST 2015, temperaturein=57.69, temperatureout=45.07, temperaturecheck=56.0, humidity=24.0, light=71}

Average Out
14.71

Average Difference
4.75

Biggest Diff
Measurement{source='G', day='2015-04-20', time=Mon Apr 20 15:11:53 CEST 2015, temperaturein=69.93, temperatureout=28.36, temperaturecheck=62.0, humidity=21.0, light=83}

The code

  1. Spark analysis code

2015-04-23

Gather Data on Raspberry Pi with Cassandra and Arduino

Intro

In the previous post we went over the steps necessary to make a sensor for a small greenhouse for the balcony.

  1. Arduino Greenhouse
In this section we are going to concentrate on how to gather the data coming in from the Greenhouse. The approach is applicable for any kind of telemetry data or something similar. The parts list is simpler than in the previous section but as a "concentrator" node we are going to use a raspberry pi. Here are the parts:
  • Arduino Uno
  • USB cable
  • Raspberry PI
  • nRF24L01+
  • 7 Wires
To install Arduino libraries please consult the previous post. The wiring for the nRF24 is the same as in the previous post.

Persisting the data

To persist the data I opted for Apache Cassandra. It's a good fit even for a low powered Raspberry Pi. Cassandra is java technology. So before installing Cassandra you have to install java. It's all written up nicely in the following posts:

  1. Install Java
  2. Installing Cassandra

Overview of the process

The code

  1. Data Gathering in Arduino
  2. Python serial to Cassandra bridge
To be continued ...

Arduino Greenhouse

Intro

I built a small greenhouse for the balcony. I decided to arm it with an Arduino sensor just to make sure if it's of some use. I found some interesting data and will describe the whole process in the following posts. The first post will be about building a sensor. Let's start with a simple parts list:

  • Arduino Uno
  • USB cable
  • Power bank
  • 10K Ohm NTC 5mm Thermistor
  • DHT11 temperature and humidity sensor.
  • GL5528 Photo Resistor (we'll bind it with 1K Ohm Resistor)
  • nRF24L01+
  • 10K Ohm resistor
  • 1K Ohm resistor
  • 17 Wires

Installing the libraries

To send the data over the nRF we'll use a library. To install the library download the file https://github.com/maniacbug/RF24/archive/master.zip. Create a new sketch or open the example from gitHub and click on Sketch - Import Library - Add Library. Select the zip archive and the Arduino IDE should install the library. Repeat the same steps for the temperature and humidity sensor https://github.com/adafruit/DHT-sensor-library/archive/master.zip

Wiring

The example has two temperature sensors. One is for the outside and will be connected with longer wires. You'll have to compare the readings from both sensors in the same environment and see the offset in the readings. The humidity sensor can also read the temperature but it is not very precise and is relatively slow. We'll use this value simply as a check value.

The wiring of the nRF24L01+ is a bit complex so simply use this scheme to connect it:

The code

The example is available on GitHub under https://github.com/msval/arduino_greenhouse/tree/master/arduino_temp_sensor

The result

To be continued ...

2015-01-24

Cassandra Community Handling 100 000 req per second

Intro

Recently I got an assignment to prove that Cassandra cluster can hold up to 100 000 requests per second. Also all this had to be done on the budget and with not so much time spent on development of the whole application. This setup had to be as close to the real thing as possible. We will go trough the details soon. Here is just the basic overview of the experiment:

Amazon

Generating and handling the load on this scale requires the infrastructure that is usually not available within a personal budget so I turned to Amazon EC2. I listened about the EC2 for quite some time now and It turned out really easy to use. Basically All you have to do is to setup a security group and store the "pem" file for that security group. Really easy and if anybody didn't try it yet there is a free micro instance available for a whole year after registering. I won't go into details of how to setup the security group. It's all described in the DataStax documentation. Note that the security definition is a bit extensive and that defining the port range from 1024-65535 is sufficient for an inter group communication and I didn't expose any ports to the public as described in the documentation. The second part is generating the key pair. In the rest of the document I'll reference this file as "cassandra.pem".

Load

Generating the load on that scale is not as easy as it might seem. After some searching I've stumbled upon the following. So I came to a conclusion that the best solution is to use Tsung. I've setup the load generating machines with the following snippet. Note that I've placed the "cassandra.pem" file on the node from which I'll start running tsung. Read the node addresses from the aws console. The rest is pretty much here:

        # do this only for the machine from which you'll initiate tsung
        scp -i cassandra.pem cassandra.pem ec2-user@tsung_machine:~

        # connect to every load machine and install erlang and tsung
        ssh -i cassandra.pem ec2-user@every_load_machine

        # repeat this on every node
        sudo yum install erlang

        wget http://tsung.erlang-projects.org/dist/tsung-1.5.1.tar.gz
        tar -xvzf tsung-1.5.1.tar.gz
        cd tsung-1.5.1
        ./configure
        make
        sudo make install

        # you can close other load nodes now
        # go back to the first node. and move cassandra.pem to id_rsa
        mv cassandra.pem .ssh/id_rsa

        # now make an ssh connection from first tsung node to every
        # load generating machine (to add the host key) so that
        # the first tsung node won't have any problem connecting to
        # other nodes and issuing erlang commands to them
        ssh ip-a-b-c-d
        exit

        # create the basic.xml file on the first tsung node
        vi basic.xml
    

The second part with the load generating machines is to edit the basic.xml file. To make it more interesting we are going to send various kinds of messages with a timestamp. The users list will be predefined in a file userlist.csv. Note that the password is the same for all the users, you can adapt this to your own needs or completely remove the password:

        0000000001;pass
        0000000002;pass
        0000000003;pass
        ...
        ...
        ...
    

The tsung tool is well documented, the configuration I used is similar to this:

        <?xml version="1.0" encoding="utf-8"?>
        <!DOCTYPE tsung SYSTEM "/usr/share/tsung/tsung-1.0.dtd" []>
        <tsung loglevel="warning">

        <clients>
            <client host="ip-a-b-c-d0" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d1" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d2" cpu="8" maxusers="25"/>
            <client host="ip-a-b-c-d3" cpu="8" maxusers="25"/>
        </clients>

        <servers>
            <server host="app-servers-ip-addresses-internal" port="8080" type="tcp"/>
            <!-- enter the rest of the app servers here-->
        </servers>

        <load>
            <arrivalphase phase="1" duration="11" unit="minute">
                <users maxnumber="100" arrivalrate="100" unit="second"/>
            </arrivalphase>
        </load>

        <options>
            <option name="file_server" id='id' value="userlist.csv"/>
        </options>

        <sessions>
            <session probability="100" name="load_session" type="ts_http">
                <setdynvars sourcetype="file" fileid="id" delimiter=";" order="iter">
                    <var name="username" />
                    <var name="pass" />
                </setdynvars>
                <setdynvars sourcetype="eval"
                            code="fun({Pid,DynVars}) -&gt;
                            {Mega, Sec, Micro} = os:timestamp(),
                            (Mega*1000000 + Sec)*1000 + round(Micro/1000)
                            end.
                            ">
                    <var name="millis" />
                </setdynvars>
                <for from="1" to="10000000" var="i">
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%ABC41.7127837,42.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%DEF43.7127837,44.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%GHI45.7127837,46.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%JKL47.7127837,48.71278370000.0"  method="GET"/>
                    </request>
                    <request subst="true">
                        <http url="/m?c=%%_username%%%%_millis%%MNO49.7127837,50.71278370000.0"  method="GET"/>
                    </request>
                </for>
            </session>
        </sessions>
        </tsung>
    

Resources

  • 3x c3.xlarge
  • 1x c4.xlarge
Note I've added c4 node because I was limited on the amazon with the number of instances I could boot.

App

I've spent most of the time on the app part when developing. The basics for the component handling the requests was netty listener. In one of my previous posts I described how to use netty to handle http requests and acknowledge them with HELLO message. Here I acknowledged them with OK.

The most complicated part with the messages was sending them to cassandra as fast as possible. The fastest way to send them is to use executeAsync. Initially I had trouble with it where I was loosing messages. Some of the issues were due to concurrency. Some were due to poor understanding of the DataStax driver.

Concurrency - Basically what I was doing was that I tried to save on instantiating the BoundStatement instances because of the overal speed. The BoundStatement is not thread safe and after calling the bind method it returns "this". It took me some time to figure this out because when used in loops this behavior is not dangerous. Anyway, thanks to colleague I figured it out.

        // always instantiate new in concurrent code
        // don't reuse and make multiple calls with .bind()!

        BoundStatement bs = new BoundStatement(insertStatement);
    

Asynchronous execution - also a bit tricky. The executeAsync returns a future. Initially I was just adding it to Futures.

        // don't do this under heavy load with the result of executeAsync
        // in Cassandra you will start to loose data

        Futures.addCallback(future, ...
    

After some trial and error I found a pattern where I didn't loose any data:

        // here we are going to keep the futures
        private ArrayBlockingQueue<ResultSetFuture> queue = 
            new ArrayBlockingQueue<>(10000);

        // in the handling code
        queue.add(session.executeAsync(bs));

        // when reaching 1000th element in the queue
        // start emptying it
        if (queue.size() % 1000 == 0) {
            ResultSetFuture elem;
            do {
                elem = queue.poll();
                if (elem != null) {
                    elem.getUninterruptibly();
                }
            } while (elem != null);
        }

        // this will make your insertions around
        // 4x faster when compared to normal execute
    

App setup

The instances come with Open JDK installed. This doesn't guarantee the best performance so I installed the Oracle java. In order not to loose the time on firewall setup I simply copied the "cassandra.pem" file to every node.

        # copy ".jar" and "cassandra.pem" file to a single app node
        # copy the two files from single node to other nodes
        # it's a lot faster then uploading to every node (at least on my connection)

        # setup the machine
        wget --no-check-certificate --no-cookies - --header "Cookie: oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/7u71-b14/jdk-7u71-linux-x64.tar.gz"
        
        tar -xvzf jdk-7u71-linux-x64.tar.gz

        sudo update-alternatives --install "/usr/bin/java" "java" "/home/ec2-user/jdk1.7.0_71/jre/bin/java" 1
        
        # pick the new java number in this step
        sudo update-alternatives --config java

        # check with this
        java -version
    

Resources

  • 2x c4.xlarge
  • 2x c4.2xlarge
  • 4x c3.xlarge
Note I've added c4 nodes because I was limited on the amazon with the number of instances I could boot. Also I had to request it with the customer service but I couldn't assume how many instances of every type I'll use so the instances are not of the same type for load and app servers.

Cassandra

Setting up the Cassandra is the easiest part of the whole undertaking. All I did was following this guide by DataStax.

Resources

  • 7x c3.2xlarge
After hanging on the 90 000 req/s for a while I came to conclusion that perhaps the replication factor of two might be too much for the resources I had available. I would probably need to further increase the number of Cassandra nodes but since I couldn't get any more instance up I've set the replication to 1. Notice that this replication factor does not allow loosing nodes in the cluster without loosing the data. But the goal here is 100 000 req/s on a budget :)

Results

In the end it took me around 30$ to reach the 100k limit. I'm afraid to calculate how much this setup would cost on a monthly or yearly basis.

The successful run looked like this:

Total messages: 31 145 914 messages
Checked number: 31 145 914 messages
Average: 103 809 req/s

Don't be afraid to send me an email if you have any questions what so ever ;)

2015-01-06

Netty 4 HTTP Hello World

Intro

Finding examples for netty took me a lot of time. Most of the time writing even the smallest portions of code requires you to go trough multiple sources ranging from youtube videos to official netty documentation. In this post I'll show you how to build a basic netty http hello world example. You probably won't have any trouble to continue and write your own app from here.

Dependencies

This example has just one dependency
http://mvnrepository.com/artifact/io.netty/netty-all/4.0.24.Final.

Setting Netty up

Create a class with a name of your choosing it doesn't really matter. This is a hello world example so I suggest writing a main method to run the example. We'll run the example on http port 80:

    public static void main(String[] args) 
        throws InterruptedException {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 200)
                .childOption(ChannelOption.ALLOCATOR,
                    PooledByteBufAllocator.DEFAULT)
                .childHandler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(
                                SocketChannel ch) throws Exception {
                                ChannelPipeline p = ch.pipeline();
                                p.addLast(new HttpRequestDecoder());
                                p.addLast(new HttpResponseEncoder());
                                p.addLast(new MySuperHttpHandler());
                            }
                        });

            ChannelFuture future = bootstrap.bind(80).sync();

            future.channel().closeFuture().sync();

        }
        finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();

            bossGroup.terminationFuture().sync();
            workerGroup.terminationFuture().sync();
        }

    }
    

Hello World HTTP Handler

    public class MySuperHttpHandler extends 
            SimpleChannelInboundHandler<Object> {
            private static final byte[] CONTENT = 
                {'H', 'E', 'L', 'L', 'O'};

            @Override
            public void channelReadComplete(
                ChannelHandlerContext ctx) {

                ctx.flush();
            }

            @Override
            public void channelRead0(ChannelHandlerContext ctx,
                    Object msg) {

                if (msg instanceof HttpRequest) {
                    HttpRequest req = (HttpRequest) msg;

                    String reqUrl = req.getUri();

                    System.out.println(reqUrl);

                    // do something further with request here ...

                    // this is the response part
                    if (HttpHeaders.is100ContinueExpected(req)) {
                        ctx.write(new DefaultFullHttpResponse(
                            HttpVersion.HTTP_1_1, 
                            HttpResponseStatus.CONTINUE));
                    }

                    boolean keepAlive = HttpHeaders.isKeepAlive(req);
                    FullHttpResponse response = 
                        new DefaultFullHttpResponse(
                            HttpVersion.HTTP_1_1,
                            HttpResponseStatus.OK,
                            Unpooled.wrappedBuffer(CONTENT));

                    response.headers().set(
                        HttpHeaders.Names.CONTENT_TYPE, "text/plain");
                    response.headers().set(
                        HttpHeaders.Names.CONTENT_LENGTH,
                        response.content().readableBytes());

                    if (!keepAlive) {
                        ctx.write(response)
                            .addListener(ChannelFutureListener.CLOSE);
                    } else {
                        response.headers().set(
                            HttpHeaders.Names.CONNECTION,
                            HttpHeaders.Values.KEEP_ALIVE);

                        ctx.write(response);
                    }
                }
            }

            @Override
            public void exceptionCaught(
                ChannelHandlerContext ctx, Throwable cause) {
                ctx.close();
            }
    }
    

Setting up Cassandra Cluster in Virtual Machines

Intro

From time to time having just one Cassandra instance installed on your machine is not enough because you want to test certain behaviors when Cassandra cluster is up and running. Having extra spare hardware on the side or processing time on amazon is not always an option. So it's a good idea to setup a simple cluster on your own machine with instances in virtual machines. This post is going to show you how to do it with VirtualBox.

Getting VirtualBox Images

The reason why I chose VirtualBox is that there are lot of free virtual images available. Most of the time you'll be installing Cassandra on a Linux machine. I decided to go with the CentOS. Head over to http://virtualboxes.org/images/centos/ and download CentOS-6.6-x86_64-minimal. The default settings are fine for every machine. Create couple of them, give them names so that you can differentiate between them (Node1, Node2, etc. ...).

Perhaps the best idea would be for you to setup one node first and then make copies afterwards. Do not forget to set the network to bridged adapter. The username and password for the virtual machines are probably set to "root/reverse" but check those options when downloading the virtual box image. To keep it short I'll just continue with using the root user. When doing things in production it's an extremely bad practice.

Setup networking

When importing .ova file virtual box is going to ask you if you want to reinitialize mac address. Check that option. There is a certain amount of buggy behavior when it comes down to networking. So to prevent those errors run the following command when logging in to the virtual machine (root/reverse):

        rm  /etc/udev/rules.d/70-persistant-net.rules
    
When VirtualBoxinitializes the networking on the virtual machine it put a new mac address to a file. There seems to be a bug where this mac address is not transferred from that file to the virtual machine settings. Run the following command and copy the MAC Address.
        cat /etc/sysconfig/network-scripts/ifcfg-eth0
    
Shutdown the machine and set the mac address under Settings > Network > Advanced > MAC Address

Install Java

Just to make things a bit easier we're going to install wget:

        yum install wget
    
Now we are going to install java:
        $ cd /opt/
        $ wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/7u72-b14/jdk-7u72-linux-x64.tar.gz"
        $ tar xzf jdk-7u72-linux-x64.tar.gz
        $ rm jdk-7u72-linux-x64.tar.gz

        $ cd /opt/jdk1.7.0_72/

        $ alternatives --install /usr/bin/java java /opt/jdk1.7.0_72/bin/java 2
        $ alternatives --config java

        $ alternatives --install /usr/bin/jar jar /opt/jdk1.7.0_72/bin/jar 2
        $ alternatives --install /usr/bin/javac javac /opt/jdk1.7.0_72/bin/javac 2
        $ alternatives --set jar /opt/jdk1.7.0_72/bin/jar
        $ alternatives --set javac /opt/jdk1.7.0_72/bin/javac

        $ vi /etc/profile.d/java.sh
        export JAVA_HOME=/opt/jdk1.7.0_72
        export JRE_HOME=/opt/jdk1.7.0_72/jre
        export PATH=$PATH:/opt/jdk1.7.0_72/bin:/opt/jdk1.7.0_72/jre/bin
    
reboot (and check with echo $JAVA_HOME[enter])

Install Cassandra

Cassandra is installed and run by the following commands:

        $ cd /opt/
        $ wget http://downloads.datastax.com/community/dsc-cassandra-2.1.2-bin.tar.gz
        $ tar xzf dsc-cassandra-2.1.2-bin.tar.gz
        $ rm dsc-cassandra-2.1.2-bin.tar.gz

        [check ip address with ifconfig]

        $ cd conf

        $ vi cassandra.yaml
            rpc_address: ip address of the node
            broadcast_address: ip address of the node
            - seeds: ip_address of the first node

        $ cd ../bin
        $ ./cassandra
    

Firewall settings

The cluster will not work out of the box because of the firewall settings. To start everything you will need to enable the following ports:

        $ iptables -I INPUT -p tcp -m tcp --dport 9042 -j ACCEPT
        $ iptables -I INPUT -p tcp -m tcp --dport 7000 -j ACCEPT
        $ iptables -I INPUT -p tcp -m tcp --dport 7001 -j ACCEPT
        $ iptables -I INPUT -p tcp -m tcp --dport 7199 -j ACCEPT

        $ /etc/init.d/iptables save

        $ service iptables restart
    
Now make copies of this machine and update cassandra.yaml file with the ip addresses of the new machines. Also do check /var/log/cassandra/system.log to see if other nodes are joining in.