2015-12-13

Stream Processing With Spring, Kafka, Spark and Cassandra - Part 3

Series

This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Writing a Spring Boot Kafka Producer

We'll go over the steps necessary to write a simple producer for a kafka topic by using spring boot. The application will essentially be a simple proxy application and will receive a JSON containing the key that's going to be sent to kafka topic. Pretty simple but enough to get us going. We'll use IntelliJ IDEA to set everything up. The easiest way to get started is by using Spring Initializr.

Setting up a project

  1. Project SDK: Java 8
  2. Initializr Service URL: https://start.spring.io
  3. Next
  4. Name: spring-boot-kafka-example
  5. Type: Gradle Project
  6. Packaging: Jar
  7. Java Version: 1.8
  8. Language: Java
  9. Group: com.example
  10. Artifact: spring-boot-kafka-example
  11. Vesion: 0.0.1-SNAPSHOT
  12. Description: Spring Boot Kafka Example
  13. Package: com.example
  14. Next
  15. Spring Boot Version: 1.3
  16. Core - Web
  17. Next
  18. Project name: spring-boot-kafka-example
  19. The rest is just fine ...
  20. Finish
  21. After creating project check sdk setting, it should be java 8

build.gradle dependencies

        compile('org.apache.kafka:kafka_2.11:0.9.0.0')
        compile('org.apache.zookeeper:zookeeper:3.4.7')
    

application.properties

        brokerList=localhost:9092
        sync=sync
        topic=votes
    

SpringBootKafkaProducer

This is the class where all the important stuff is happening

package com.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

@Configuration
public class SpringBootKafkaProducer {

    @Value("${brokerList}")
    private String brokerList;

    @Value("${sync}")
    private String sync;

    @Value("${topic}")
    private String topic;

    private Producer<String, String> producer;

    public SpringBootKafkaProducer() {
    }

    @PostConstruct
    public void initIt() {
        Properties kafkaProps = new Properties();

        kafkaProps.put("bootstrap.servers", brokerList);

        kafkaProps.put("key.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("acks", "1");

        kafkaProps.put("retries", "1");
        kafkaProps.put("linger.ms", 5);

        producer = new KafkaProducer<>(kafkaProps);

    }

    public void send(String value) throws ExecutionException, 
            InterruptedException {
        if ("sync".equalsIgnoreCase(sync)) {
            sendSync(value);
        } else {
            sendAsync(value);
        }
    }

    private void sendSync(String value) throws ExecutionException,
            InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
        producer.send(record).get();

    }

    private void sendAsync(String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);

        producer.send(record, (RecordMetadata recordMetadata, Exception e) -> {
            if (e != null) {
                e.printStackTrace();
            }
        });
    }
}
    

SpringBootKafkaExampleApplication

This one will be automatically generated.

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootKafkaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaExampleApplication.class, args);
    }
}
    

AppBeans

Setup beans for the controller.

package com.example;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppBeans {

    @Bean
    public SpringBootKafkaProducer initProducer() {
        return new SpringBootKafkaProducer();
    }
}
    

Helper beans

Status to return to clients, we'll just send "ok" every time.

package com.example;

public class Status {
    private String status;

    public Status(String status) {
        this.status = status;
    }

    public Status() {
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }
}
    
This will be the input to our app
package com.example;

public class Vote {
    private String name;

    public Vote(String name) {
        this.name = name;
    }

    public Vote() {
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
    

SpringBootKafkaController

This is the controller, after starting the app we should have an active endpoint available under http://localhost:8080/vote

package com.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

@RestController
public class SpringBootKafkaController {

    @Autowired
    SpringBootKafkaProducer springBootKafkaProducer;

    @RequestMapping("/vote")
    public Status vote(@RequestBody Vote vote) throws ExecutionException, InterruptedException {

        springBootKafkaProducer.send(vote.getName());

        return new Status("ok");
    }

}
    

Checking everything

There should be an active console reader from previous post so we won't cover this. After running the SpringBootKafkaExampleApplication simply open a rest client application like Postman and try to send the following JSON to http://localhost:8080/vote

{
    "name": "Test"
}
    
If everything was fine you should see the name that you send in this json in the console consumer. In Part 4 we are going to go over how to pickup the data from kafka with spark streaming, combine them with data in cassandra and push them back to cassandra.

10 comments:

Lukasz said...

What's the purpose of @Configuration annotation on SpringBootKafkaController class?

Marko Švaljek said...

I probably missed it during refactoring ...

Teik Hooi Beh said...

By any chance you have a github repo on the java codes?

Marko Švaljek said...

I know it's kind of stupid from my side but I do want people to try it out themselves. If you are really interested send me an e-mail on msvaljek@gmail.com

Marko Švaljek said...

I guess there will be more people interested ... here are the sources:

https://drive.google.com/open?id=0Bz9kDTTW0oRgWXdoTGFtM1dLelE

Ayman Elgharabawy said...

password?

Marko Švaljek said...

password: hello

Arash Taheri said...
This comment has been removed by the author.
Arash Taheri said...

thank you Marko, it works very well. Do you have a consumer too?

Siahmed Halim said...

Thank You Marko