2015-12-13

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 4

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Consuming Kafka data with Spark Streaming and Output to Cassandra

In this section we are going to use spark streaming to read the data in coming from kafka. We'll also combine it with the data already in cassandra, we're going to do some computation with it and we're going to put the results back to cassandra. The best practice would be to have a spark cluster running but for the sake of simplicity we are going to launch local spark context from a java application and do some processing there. We won't go into configuring Cassandra to run, there is plenty documentation there and it takes just minutes to setup.

Cassandra

Nothing fancy here, just a name of the entity for votes and a number of votes

CREATE KEYSPACE voting
    WITH REPLICATION = {
        'class' : 'SimpleStrategy',
        'replication_factor' : 1
    };

USE voting;

CREATE TABLE votes (name text PRIMARY KEY, votes int);
    

Let's create a simple java project with gradle for stream processing

  1. File, New Project, Gradle
  2. Project SDK: Java 8
  3. Java
  4. Next
  5. GroupId: spark-kafka-streaming-example
  6. ArtifactId: spark-kafka-streaming-example
  7. Version: 1.0-SNAPSHOT
  8. Next
  9. Use default gradle wrapper
  10. Next
  11. Project name: spark-kafka-streaming-example
  12. The rest is just fine ...
  13. Finish
  14. After creating project check sdk setting, it should be java 8

Let's have a look at the dependencies

group 'spark-kafka-streaming-example'
version '1.0-SNAPSHOT'

apply plugin: 'java'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile('org.apache.spark:spark-core_2.10:1.5.2')
    compile('org.apache.spark:spark-streaming_2.10:1.5.2')
    compile('org.apache.spark:spark-streaming-kafka_2.10:1.5.2')
    compile('com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3')
    compile('com.datastax.spark:spark-cassandra-connector-java_2.10:1.5.0-M3')

    testCompile group: 'junit', name: 'junit', version: '4.11'
}
    

Simple Voting Class to go with Cassandra Table

We'll use this class for storing data into cassandra

import java.io.Serializable;

public class Vote implements Serializable {
    private String name;
    private Integer votes;

    public Vote(String name, Integer votes) {
        this.name = name;
        this.votes = votes;
    }

    public Vote() {
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getVotes() {
        return votes;
    }

    public void setVotes(Integer votes) {
        this.votes = votes;
    }
}
    

Spark streaming with kafka

And finally the code to accept tokens that come in, compare them with data in cassandra and then write them back to cassandra. I didn't spend much time around configuring the class for external parameters, but for the example it's good enough:

import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

public class SparkStreamingExample {

    public static JavaSparkContext sc;

    public static void main(String[] args) throws IOException {

        String brokers = "localhost:9092,localhost:9093";
        String topics = "votes";

        SparkConf sparkConf = new SparkConf();
        sparkConf.setMaster("local[2]");
        sparkConf.setAppName("SparkStreamingExample");
        sparkConf.set("spark.cassandra.connection.host",
            "127.0.0.1");

        JavaStreamingContext jssc = new JavaStreamingContext(
            sparkConf,
            Durations.seconds(10));

        HashSet<String> topicsSet = new HashSet<>(
                Arrays.asList(topics.split(",")));
        HashMap<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers);

        JavaPairInputDStream<String, String> messages =
                KafkaUtils.createDirectStream(
                        jssc,
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        kafkaParams,
                        topicsSet
                );

        JavaDStream<String> lines =
                messages.map(
                        (Function<Tuple2
                        <String, String>,
                        String>) Tuple2::_2);

        JavaPairDStream<String, Integer> voteCount = lines
            .mapToPair(
                (PairFunction<String, String, Integer>) s ->
                        new Tuple2<>(s, 1)).reduceByKey(
                (Function2<Integer, Integer, Integer>)
                    (i1, i2) ->i1 + i2);

        sc = jssc.sparkContext();

        voteCount.foreachRDD((v1, v2) -> {
            v1.foreach((x) -> {
                CassandraTableScanJavaRDD<CassandraRow> previousVotes =
                    javaFunctions(sc)
                        .cassandraTable("voting", "votes")
                        .where("name = '" + x._1() + "'");

                Integer oldVotes = 0;
                if (previousVotes.count() > 0) {
                    oldVotes = 
                        previousVotes.first().getInt("votes");
                }

                Integer newVotes = oldVotes + x._2();

                List<Vote> votes = Arrays.asList(
                    new Vote(x._1(), newVotes));
                JavaRDD<Vote> rdd = sc.parallelize(votes);

                javaFunctions(rdd)
                    .writerBuilder("voting", "votes", mapToRow(Vote.class))
                    .saveToCassandra();
            });

            return null;
        });

        voteCount.print();

        jssc.start();
        jssc.awaitTermination();
    }
}
    

And that's it

You can check how data changes by running select statements from voting table. In Part 5 we are going to make a simple spring boot project that displays and sorts the voting data.

3 comments:

JOHN PETER said...

Running into issue.

Exception in thread "main" java.lang.VerifyError: class com.fasterxml.jackson.module.scala.ser.ScalaIteratorSerializer overrides final method withResolved.

JavaPairInputDStream messages =
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);


16/01/14 10:21:46 INFO VerifiableProperties: Property zookeeper.connect is overridden to
Exception in thread "main" java.lang.VerifyError: class com.fasterxml.jackson.module.scala.ser.ScalaIteratorSerializer overrides final method withResolved.(Lcom/fasterxml/jackson/databind/BeanProperty;Lcom/fasterxml/jackson/databind/jsontype/TypeSerializer;Lcom/fasterxml/jackson/databind/JsonSerializer;)Lcom/fasterxml/jackson/databind/ser/std/AsArraySerializerBase;
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at com.fasterxml.jackson.module.scala.ser.IteratorSerializerModule$class.$init$(IteratorSerializerModule.scala:70)
at com.fasterxml.jackson.module.scala.DefaultScalaModule.(DefaultScalaModule.scala:19)
at com.fasterxml.jackson.module.scala.DefaultScalaModule$.(DefaultScalaModule.scala:35)
at com.fasterxml.jackson.module.scala.DefaultScalaModule$.(DefaultScalaModule.scala)
at org.apache.spark.rdd.RDDOperationScope$.(RDDOperationScope.scala:78)
at org.apache.spark.rdd.RDDOperationScope$.(RDDOperationScope.scala)
at org.apache.spark.streaming.dstream.InputDStream.(InputDStream.scala:78)
at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.(DirectKafkaInputDStream.scala:56)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$8$$anonfun$apply$4.apply(KafkaUtils.scala:419)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$8$$anonfun$apply$4.apply(KafkaUtils.scala:410)
at scala.util.Either$RightProjection.map(Either.scala:536)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$8.apply(KafkaUtils.scala:410)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$8.apply(KafkaUtils.scala:409)
at scala.util.Either$RightProjection.flatMap(Either.scala:523)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.ls.in2.kafka.SparkStreamingExample.main(SparkStreamingExample.java:55)

Marko Švaljek said...

I didn't run into something similar. I googled around a bit ... looks like you might be having problems described here

http://stackoverflow.com/questions/33815396/spark-com-fasterxml-jackson-module-error

But then again, It works for me :(

MLL said...

I found your example useful even over 1 yr later. Thank you.
I am using current version of Cassandra 3.10 which causes runtime exception. Without changing your code or updating org.apache.spark dependencies, I found that I had to update:
compile('com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3')
compile('com.datastax.spark:spark-cassandra-connector-java_2.10:1.5.0-M3')
to:
compile('com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-RC1')
compile('com.datastax.spark:spark-cassandra-connector-java_2.10:1.5.0-RC1')
Interesting that newer versions, such as 1.5.0+, would also cause runtime exception. I hope to update all dependencies to latest after understanding Spark better.