2015-12-13

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 1

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Part 1 - Overview

Before starting any project I like to make a few drawings, just to keep everything in perspective. My main motivation for this series is to get better acquainted wit Apache Kafka. I just didn't have a chance to use it on some of the projects that I work on in my day to day life, but it's this new technology everybody is buzzing about so I wanted to give it a try. One other thing is that I also didn't get a chance to write Spark Streaming applications, so why not hit two birds with one stone? Here is 10 000 feet overview of the series:

Avoiding the tl;dr

Part of the motivation for splitting is in avoiding the tl;dr effect ;) Now, let's get back to the overview. We'll break down previous image box by box.

Using Spring Boot

We're basically just prototyping here, but to keep everything flexible and in the spirit of the newer architectural paradigms like Microservices the post will be split in 5 parts. The software will also be split so we won't use any specific container for our applications we'll just go with Spring Boot. In the posts we won't go much over the basic, you can always look it up in the official documentation.

Apache Kafka

This is the reason why I'm doing this in the first place. It's this new super cool messaging system that all the big players are using and I want to learn how to put it to everyday use.

Spark Streaming

For some time now I'm doing a lot of stuff with Apache Spark. But somehow I didn't get a chance to look into streaming a little bit better.

Cassandra

Why not?

What this series is about?

It's a year where everybody is talking about voting ... literary everywhere :) so let's make a voting app. In essence it will be a basic word count in the stream. But let's give some context to it while we're at it. We won't do anything complicated or useful. Basically the end result will be total count of token occurrence in the stream. We'll also break a lot of best practices in data modeling etc. in this series.

Series is for people oriented toward learning something new. I guess experienced and battle proven readers will find a ton of flaws in the concept but again most of them are deliberate. One thing I sometimes avoid in my posts is including source code. My opinion is that a lot more remains remembered and learners feel much more comfortable when faced with problems in practice. So I'll just copy paste crucial code parts. One more assumption from my side will be that the readers will be using IntelliJ IDEA. Let's got to Part 2 and see how to setup kafka.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 2

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Setting up Kafka

In this section we'll setup two kafka brokers. We'll also need a zookeeper. If you are reading this my guess is that you don't have one setup already so we'll use the one bundled with kafka. We won't cover everything here. Do read the official documentation for more in depth understanding.

Downloading

Download latest Apache Kafka. In this tutorial we'll use binary distribution. Pay attention to the version of scala if you attend to use kafka with specific scala version. In this tutorial we'll concentrate more on Java. But this will be more important in parts to come. In this section we'll use the tools that ship with Kafka distribution to test everything out. Once again download and extract the distribution of Apache Kafka from official pages.

Configuring brokers

Go into directory where you downloaded and extracted your kafka installation. There is a properties file template and we are going to use properties files to start the brokers. Make two copies of the file:

        $ cd your_kafka_installation_dir
        $ cp config/server.properties config/server0.properties
        $ cp config/server.properties config/server1.properties
    
Now use your favorite editor to make changes to broker configuration files. I'll just use vi, after all it has been around for 40 years :)
        $ vi config/server0.properties
    
Now make changes (check if they are set) to following properties:
        broker.id=0
        listeners=PLAINTEXT://:9092
        num.partitions=2
        log.dirs=/var/tmp/kafka-logs-0
    
Make the changes for the second node too:
        $ vi config/server1.properties
    
        broker.id=1
        listeners=PLAINTEXT://:9093
        num.partitions=2
        log.dirs=/var/tmp/kafka-logs-1
    

Starting everything up

First you need to start the zookeeper, it will be used to store the offsets for topics. There are more advanced versions of using where you don't need it but for someone just starting out it's much easier to use zookeeper bundled with the downloaded kafka. I recommend opening one shell tab where you can hold all of the running processes. We didn't make any changes to the zookeeper properties, they are just fine for our example:

        $ bin/zookeeper-server-start.sh config/zookeeper.properties &
    
From the output you'll notice it started a zookeeper on default port 2181. You can try telnet to this port on localhost just to check if everything is running fine. Now we'll start two kafka brokers:
        $ bin/kafka-server-start.sh config/server0.properties &
        $ bin/kafka-server-start.sh config/server1.properties &
    

Creating a topic

Before producing and consuming messages we need to create a topic for now you can think of it as of queue name. We need to give a reference to the zookeeper. We'll name a topic "votes", topic will have 2 partitions and a replication factor of 2. Please read the official documentation for further explanation. You'll see additional output coming from broker logs because we are running the examples in the background.

        $ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic votes --partitions 2 --replication-factor 2
    

Sending and receiving messages with bundled command line tools

Open two additional shell tabs and position yourself in the directory where you installed kafka. We'll use one tab to produce messages. And second tab will consume the topic and will simply print out the stuff that we typed in in the first tab. Now this might be a bit funny, but imagine you are actually using kafka already!

In tab for producing messages run:

        $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic votes
    

In tab for consuming messages run:

        $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic votes
    

Next part

We covered a lot here but writing from one console window to another can be achieved wit far simpler combination of shell commands. In Part 3 we'll make an app that writes to a topic. We'll also use console reader just to verify that our app is actually sending something to topic.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 3

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Writing a Spring Boot Kafka Producer

We'll go over the steps necessary to write a simple producer for a kafka topic by using spring boot. The application will essentially be a simple proxy application and will receive a JSON containing the key that's going to be sent to kafka topic. Pretty simple but enough to get us going. We'll use IntelliJ IDEA to set everything up. The easiest way to get started is by using Spring Initializr.

Setting up a project

  1. Project SDK: Java 8
  2. Initializr Service URL: https://start.spring.io
  3. Next
  4. Name: spring-boot-kafka-example
  5. Type: Gradle Project
  6. Packaging: Jar
  7. Java Version: 1.8
  8. Language: Java
  9. Group: com.example
  10. Artifact: spring-boot-kafka-example
  11. Vesion: 0.0.1-SNAPSHOT
  12. Description: Spring Boot Kafka Example
  13. Package: com.example
  14. Next
  15. Spring Boot Version: 1.3
  16. Core - Web
  17. Next
  18. Project name: spring-boot-kafka-example
  19. The rest is just fine ...
  20. Finish
  21. After creating project check sdk setting, it should be java 8

build.gradle dependencies

        compile('org.apache.kafka:kafka_2.11:0.9.0.0')
        compile('org.apache.zookeeper:zookeeper:3.4.7')
    

application.properties

        brokerList=localhost:9092
        sync=sync
        topic=votes
    

SpringBootKafkaProducer

This is the class where all the important stuff is happening

package com.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

@Configuration
public class SpringBootKafkaProducer {

    @Value("${brokerList}")
    private String brokerList;

    @Value("${sync}")
    private String sync;

    @Value("${topic}")
    private String topic;

    private Producer<String, String> producer;

    public SpringBootKafkaProducer() {
    }

    @PostConstruct
    public void initIt() {
        Properties kafkaProps = new Properties();

        kafkaProps.put("bootstrap.servers", brokerList);

        kafkaProps.put("key.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("acks", "1");

        kafkaProps.put("retries", "1");
        kafkaProps.put("linger.ms", 5);

        producer = new KafkaProducer<>(kafkaProps);

    }

    public void send(String value) throws ExecutionException, 
            InterruptedException {
        if ("sync".equalsIgnoreCase(sync)) {
            sendSync(value);
        } else {
            sendAsync(value);
        }
    }

    private void sendSync(String value) throws ExecutionException,
            InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
        producer.send(record).get();

    }

    private void sendAsync(String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);

        producer.send(record, (RecordMetadata recordMetadata, Exception e) -> {
            if (e != null) {
                e.printStackTrace();
            }
        });
    }
}
    

SpringBootKafkaExampleApplication

This one will be automatically generated.

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootKafkaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaExampleApplication.class, args);
    }
}
    

AppBeans

Setup beans for the controller.

package com.example;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppBeans {

    @Bean
    public SpringBootKafkaProducer initProducer() {
        return new SpringBootKafkaProducer();
    }
}
    

Helper beans

Status to return to clients, we'll just send "ok" every time.

package com.example;

public class Status {
    private String status;

    public Status(String status) {
        this.status = status;
    }

    public Status() {
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }
}
    
This will be the input to our app
package com.example;

public class Vote {
    private String name;

    public Vote(String name) {
        this.name = name;
    }

    public Vote() {
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
    

SpringBootKafkaController

This is the controller, after starting the app we should have an active endpoint available under http://localhost:8080/vote

package com.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

@RestController
public class SpringBootKafkaController {

    @Autowired
    SpringBootKafkaProducer springBootKafkaProducer;

    @RequestMapping("/vote")
    public Status vote(@RequestBody Vote vote) throws ExecutionException, InterruptedException {

        springBootKafkaProducer.send(vote.getName());

        return new Status("ok");
    }

}
    

Checking everything

There should be an active console reader from previous post so we won't cover this. After running the SpringBootKafkaExampleApplication simply open a rest client application like Postman and try to send the following JSON to http://localhost:8080/vote

{
    "name": "Test"
}
    
If everything was fine you should see the name that you send in this json in the console consumer. In Part 4 we are going to go over how to pickup the data from kafka with spark streaming, combine them with data in cassandra and push them back to cassandra.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 4

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Consuming Kafka data with Spark Streaming and Output to Cassandra

In this section we are going to use spark streaming to read the data in coming from kafka. We'll also combine it with the data already in cassandra, we're going to do some computation with it and we're going to put the results back to cassandra. The best practice would be to have a spark cluster running but for the sake of simplicity we are going to launch local spark context from a java application and do some processing there. We won't go into configuring Cassandra to run, there is plenty documentation there and it takes just minutes to setup.

Cassandra

Nothing fancy here, just a name of the entity for votes and a number of votes

CREATE KEYSPACE voting
    WITH REPLICATION = {
        'class' : 'SimpleStrategy',
        'replication_factor' : 1
    };

USE voting;

CREATE TABLE votes (name text PRIMARY KEY, votes int);
    

Let's create a simple java project with gradle for stream processing

  1. File, New Project, Gradle
  2. Project SDK: Java 8
  3. Java
  4. Next
  5. GroupId: spark-kafka-streaming-example
  6. ArtifactId: spark-kafka-streaming-example
  7. Version: 1.0-SNAPSHOT
  8. Next
  9. Use default gradle wrapper
  10. Next
  11. Project name: spark-kafka-streaming-example
  12. The rest is just fine ...
  13. Finish
  14. After creating project check sdk setting, it should be java 8

Let's have a look at the dependencies

group 'spark-kafka-streaming-example'
version '1.0-SNAPSHOT'

apply plugin: 'java'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile('org.apache.spark:spark-core_2.10:1.5.2')
    compile('org.apache.spark:spark-streaming_2.10:1.5.2')
    compile('org.apache.spark:spark-streaming-kafka_2.10:1.5.2')
    compile('com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3')
    compile('com.datastax.spark:spark-cassandra-connector-java_2.10:1.5.0-M3')

    testCompile group: 'junit', name: 'junit', version: '4.11'
}
    

Simple Voting Class to go with Cassandra Table

We'll use this class for storing data into cassandra

import java.io.Serializable;

public class Vote implements Serializable {
    private String name;
    private Integer votes;

    public Vote(String name, Integer votes) {
        this.name = name;
        this.votes = votes;
    }

    public Vote() {
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getVotes() {
        return votes;
    }

    public void setVotes(Integer votes) {
        this.votes = votes;
    }
}
    

Spark streaming with kafka

And finally the code to accept tokens that come in, compare them with data in cassandra and then write them back to cassandra. I didn't spend much time around configuring the class for external parameters, but for the example it's good enough:

import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

public class SparkStreamingExample {

    public static JavaSparkContext sc;

    public static void main(String[] args) throws IOException {

        String brokers = "localhost:9092,localhost:9093";
        String topics = "votes";

        SparkConf sparkConf = new SparkConf();
        sparkConf.setMaster("local[2]");
        sparkConf.setAppName("SparkStreamingExample");
        sparkConf.set("spark.cassandra.connection.host",
            "127.0.0.1");

        JavaStreamingContext jssc = new JavaStreamingContext(
            sparkConf,
            Durations.seconds(10));

        HashSet<String> topicsSet = new HashSet<>(
                Arrays.asList(topics.split(",")));
        HashMap<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers);

        JavaPairInputDStream<String, String> messages =
                KafkaUtils.createDirectStream(
                        jssc,
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        kafkaParams,
                        topicsSet
                );

        JavaDStream<String> lines =
                messages.map(
                        (Function<Tuple2
                        <String, String>,
                        String>) Tuple2::_2);

        JavaPairDStream<String, Integer> voteCount = lines
            .mapToPair(
                (PairFunction<String, String, Integer>) s ->
                        new Tuple2<>(s, 1)).reduceByKey(
                (Function2<Integer, Integer, Integer>)
                    (i1, i2) ->i1 + i2);

        sc = jssc.sparkContext();

        voteCount.foreachRDD((v1, v2) -> {
            v1.foreach((x) -> {
                CassandraTableScanJavaRDD<CassandraRow> previousVotes =
                    javaFunctions(sc)
                        .cassandraTable("voting", "votes")
                        .where("name = '" + x._1() + "'");

                Integer oldVotes = 0;
                if (previousVotes.count() > 0) {
                    oldVotes = 
                        previousVotes.first().getInt("votes");
                }

                Integer newVotes = oldVotes + x._2();

                List<Vote> votes = Arrays.asList(
                    new Vote(x._1(), newVotes));
                JavaRDD<Vote> rdd = sc.parallelize(votes);

                javaFunctions(rdd)
                    .writerBuilder("voting", "votes", mapToRow(Vote.class))
                    .saveToCassandra();
            });

            return null;
        });

        voteCount.print();

        jssc.start();
        jssc.awaitTermination();
    }
}
    

And that's it

You can check how data changes by running select statements from voting table. In Part 5 we are going to make a simple spring boot project that displays and sorts the voting data.

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 5

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Displaying Cassandra Data With Spring Boot

Now that we have our voting data in Cassandra let's write a simple Spring Boot project that simply gathers all the data from cassandra sorts them and displays to user.

Setting up a project

  1. Project SDK: Java 8
  2. Initializr Service URL: https://start.spring.io
  3. Next
  4. Name: boot-cassandra-data-show
  5. Type: Gradle Project
  6. Packaging: Jar
  7. Java Version: 1.8
  8. Language: Java
  9. Group: com.example
  10. Artifact: boot-cassandra-data-show
  11. Vesion: 0.0.1-SNAPSHOT
  12. Description: Spring Boot Display Cassandra Data
  13. Package: com.example
  14. Next
  15. Spring Boot Version: 1.3
  16. Core - Web
  17. Template Engines - Mustache
  18. Next
  19. Project name: boot-cassandra-data-show
  20. The rest is just fine ...
  21. Finish
  22. After creating project check sdk setting, it should be java 8

Cassandra dependencies

compile('com.datastax.cassandra:cassandra-driver-core:2.1.9')
    

Vote class

We'll use this class to map rows from cassandra.

package com.example;

import java.io.Serializable;

public class Vote implements Serializable {
    private String name;
    private Integer votes;

    public Vote(String name, Integer votes) {
        this.name = name;
        this.votes = votes;
    }

    public Vote() {
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getVotes() {
        return votes;
    }

    public void setVotes(Integer votes) {
        this.votes = votes;
    }
}
    

application.properties

server.port = 8090
contactPoint = 127.0.0.1
keyspace = voting
    

CassandraSessionManager

This bean is used to setup connection towards Cassandra

package com.example;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Configuration
public class CassandraSessionManager {

    private Session session;
    private Cluster cluster;

    @Value("${contactPoint}")
    private String contactPoint;

    @Value("${keyspace}")
    private String keyspace;

    public CassandraSessionManager() {

    }

    public Session getSession() {
        return session;
    }

    @PostConstruct
    public void initIt() {
        cluster = Cluster.builder().addContactPoint(
            contactPoint).build();
        session = cluster.connect(keyspace);
    }

    @PreDestroy
    public void destroy() {
        if (session != null) {
            session.close();
        }
        if (cluster != null) {
            cluster.close();
        }
    }
}

    

BootCassandraDataShowApplication

Automatically generated ...

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BootCassandraDataShowApplication {

    public static void main(String[] args) {
        SpringApplication.run(
        BootCassandraDataShowApplication.class, args);
    }
}
    

AppBeans

Bean for holding configured objects.

package com.example;

import com.datastax.driver.core.Session;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppBeans {

    @Bean
    public Session session() {
        return sessionManager().getSession();
    }

    @Bean
    public CassandraSessionManager sessionManager() {
        return new CassandraSessionManager();
    }
}

    

Web Controller

package com.example;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;

@Configuration
@Controller
public class WelcomeController {

    @Autowired
    Session session;

    @RequestMapping("/")
    public String welcome(Map<String, Object> model) {

        final ResultSet rows = session.execute("SELECT * FROM votes");

        ArrayList results = new ArrayList<>();

        for (Row row : rows.all()) {
            results.add(new Vote(
                row.getString("name"),
                row.getInt("votes")
            ));
        }

        Collections.sort(results, (a, b) ->
        b.getVotes().compareTo(a.getVotes()));

        model.put("results", results);

        return "welcome";
    }
}
    

Template to show the results

<!DOCTYPE html>
<html lang="en">
<body>

<h1>Voting results:</h1>
<br/>
{{#results}}
    <strong>{{this.name}}</strong> {{this.votes}} <br/>
{{/results}}

</body>
</html>
    

That's all folks

Now this app might not seem as a lot, but there's a kafka cluster that receives messages comming in from a spring boot app that exposes REST interface. Messages that come in from kafka are then processed with Spark Streaming and then sent to Cassandra. There is another Spring Boot app that sorts and displays results to the users. This small tutorial covers most of the cool java/big data technologies now-days. Special thanks to the readers that went through all five parts of this tutorial ;)