Spark : Spark streaming and Kafka Integration

steps:  1)  start zookeper server  2)  Start Kafka brokers [ one or more ]  3)  create topic .  4)  start console producer [ to write messages into topic ]  5) start console consumer [ to test , whether messages are stremed ]  6) create spark streaming context,     which streams from kafka topic.  7) perform transformations or aggregations  8) output operation : which will direct the results into another kafka topic. ------------------------------------------       following code tested with ,   spark 1.6.0 and kafka 0.10.2.0 kafka and spark streaming bin/zookeeper-server-start.sh  config/zookeeper.properties bin/kafka-server-start.sh config/server.properties /bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic bin/kafka-topics.sh --list --zookeeper localhost:2181 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds val ssc = new StreamingContext(sc, Seconds(5)) import org.apache.spark.streaming.kafka.KafkaUtils //1. val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5)) val lines = kafkaStream.map(x => x._2.toUpperCase) val warr = lines.map(x => x.split(" ")) val pair = warr.map(x => (x,1)) val wc = pair.reduceByKey(_+_) wc.print() // use below code to write results into kafka topic ssc.start ------------------------------ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1 // writing into kafka topic. import org.apache.kafka.clients.producer.ProducerConfig import java.util.HashMap import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord wc.foreachRDD(rdd =>       rdd.foreachPartition(partition =>                 partition.foreach{                   case t:(w:String,cnt:Long)=>{                     val x = w+"\t"+cnt                                     val props = new HashMap[String, Object]()                     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,                             "localhost:9092")                     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,                       "org.apache.kafka.common.serialization.StringSerializer")                     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,                       "org.apache.kafka.common.serialization.StringSerializer")                     println(x)                     val producer = new KafkaProducer[String,String](props)                     val message=new ProducerRecord[String, String]("results1",null,x)                     producer.send(message)                   }                 })) -- execute above code before ssc.start. -------------------------------------------- bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning -------------------  val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5)) 1. --? KafkaUtils.createStream()..    needs 4 arguments.     1st --->  streaming Context     2nd --> zk details.    3rd --- > consumer group id    4th ----> Topics.   spark streaming can read from multiple topics.     topic should be as  a key value pair of map object key ---> topic name value ---> no.of consumer threads.  to read from multiple topics,  the 4th argument should be as follows.     Map("t1"->2,"t2"->4,"t3"->1) -------------------------    each given number of consumer threads will applied on each partition of kafka topic.    ex: topic has 3 threads,         consumber threads are 5.    so , total number of threads = 15. but these  15 theads are not parallely executed. at shot, 5 threads for one partiton will be parallely consuming data. to make all (15) parallel. val numparts = 3 val kstreams = (1 to numparts).map{x =>     val kafkaStream = KafkaUtils.createStream(ssc,   "localhost:2181","spark-streaming-consumer-   group",     Map("spark-topic" -> 5))           }

steps:

 1)  start zookeper server
 2)  Start Kafka brokers [ one or more ]
 3)  create topic .
 4)  start console producer [ to write messages into topic ]
 5) start console consumer [ to test , whether messages are stremed ]
 6) create spark streaming context,
    which streams from kafka topic.
 7) perform transformations or aggregations
 8) output operation : which will direct the results into another kafka topic.
------------------------------------------





   

 


following code tested with ,
  spark 1.6.0 and kafka 0.10.2.0

kafka and spark streaming

bin/zookeeper-server-start.sh  config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, Seconds(5))
import org.apache.spark.streaming.kafka.KafkaUtils
//1.
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
val lines = kafkaStream.map(x => x._2.toUpperCase)

val warr = lines.map(x => x.split(" "))
val pair = warr.map(x => (x,1))
val wc = pair.reduceByKey(_+_)

wc.print()
// use below code to write results into kafka topic
ssc.start

------------------------------
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1

// writing into kafka topic.

import org.apache.kafka.clients.producer.ProducerConfig
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord


wc.foreachRDD(rdd =>
      rdd.foreachPartition(partition =>

                partition.foreach{
                  case t:(w:String,cnt:Long)=>{
                    val x = w+"\t"+cnt                
                    val props = new HashMap[String, Object]()
                    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                            "localhost:9092")
                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")

                    println(x)
                    val producer = new KafkaProducer[String,String](props)
                    val message=new ProducerRecord[String, String]("results1",null,x)
                    producer.send(message)
                  }
                }))

-- execute above code before ssc.start.
--------------------------------------------
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning





-------------------
 val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))

1. --? KafkaUtils.createStream()..
   needs 4 arguments.
    1st --->  streaming Context
    2nd --> zk details.
   3rd --- > consumer group id
   4th ----> Topics.
 
spark streaming can read from multiple topics.
    topic should be as  a key value pair of map object

key ---> topic name
value ---> no.of consumer threads.

 to read from multiple topics,
 the 4th argument should be as follows.
    Map("t1"->2,"t2"->4,"t3"->1)

-------------------------

   each given number of consumer threads will applied on each partition of kafka topic.

   ex: topic has 3 threads,
        consumber threads are 5.
   so , total number of threads = 15.

but these  15 theads are not parallely executed.

at shot, 5 threads for one partiton will be parallely consuming data.

to make all (15) parallel.

val numparts = 3
val kstreams = (1 to numparts).map{x =>
    val kafkaStream = KafkaUtils.createStream(ssc,   "localhost:2181","spark-streaming-consumer-   group",     Map("spark-topic" -> 5))
          }