Spark : Spark streaming and Kafka Integration
steps: 1) start zookeper server 2) Start Kafka brokers [ one or more ] 3) create topic . 4) start console producer [ to write messages into topic ] 5) start console consumer [ to test , whether messages are stremed ] 6) create spark streaming context, which streams from kafka topic. 7) perform transformations or aggregations 8) output operation : which will direct the results into another kafka topic. ------------------------------------------ following code tested with , spark 1.6.0 and kafka 0.10.2.0 kafka and spark streaming bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties /bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic bin/kafka-topics.sh --list --zookeeper localhost:2181 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds val ssc = new StreamingContext(sc, Seconds(5)) import org.apache.spark.streaming.kafka.KafkaUtils //1. val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5)) val lines = kafkaStream.map(x => x._2.toUpperCase) val warr = lines.map(x => x.split(" ")) val pair = warr.map(x => (x,1)) val wc = pair.reduceByKey(_+_) wc.print() // use below code to write results into kafka topic ssc.start ------------------------------ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1 // writing into kafka topic. import org.apache.kafka.clients.producer.ProducerConfig import java.util.HashMap import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord wc.foreachRDD(rdd => rdd.foreachPartition(partition => partition.foreach{ case t:(w:String,cnt:Long)=>{ val x = w+"\t"+cnt val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") println(x) val producer = new KafkaProducer[String,String](props) val message=new ProducerRecord[String, String]("results1",null,x) producer.send(message) } })) -- execute above code before ssc.start. -------------------------------------------- bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning ------------------- val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5)) 1. --? KafkaUtils.createStream().. needs 4 arguments. 1st ---> streaming Context 2nd --> zk details. 3rd --- > consumer group id 4th ----> Topics. spark streaming can read from multiple topics. topic should be as a key value pair of map object key ---> topic name value ---> no.of consumer threads. to read from multiple topics, the 4th argument should be as follows. Map("t1"->2,"t2"->4,"t3"->1) ------------------------- each given number of consumer threads will applied on each partition of kafka topic. ex: topic has 3 threads, consumber threads are 5. so , total number of threads = 15. but these 15 theads are not parallely executed. at shot, 5 threads for one partiton will be parallely consuming data. to make all (15) parallel. val numparts = 3 val kstreams = (1 to numparts).map{x => val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer- group", Map("spark-topic" -> 5)) }
1) start zookeper server
2) Start Kafka brokers [ one or more ]
3) create topic .
4) start console producer [ to write messages into topic ]
5) start console consumer [ to test , whether messages are stremed ]
6) create spark streaming context,
which streams from kafka topic.
7) perform transformations or aggregations
8) output operation : which will direct the results into another kafka topic.
------------------------------------------
following code tested with ,
spark 1.6.0 and kafka 0.10.2.0
kafka and spark streaming
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic spark-topic
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic spark-topic --from-beginning
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val ssc = new StreamingContext(sc, Seconds(5))
import org.apache.spark.streaming.kafka.KafkaUtils
//1.
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
val lines = kafkaStream.map(x => x._2.toUpperCase)
val warr = lines.map(x => x.split(" "))
val pair = warr.map(x => (x,1))
val wc = pair.reduceByKey(_+_)
wc.print()
// use below code to write results into kafka topic
ssc.start
------------------------------
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic results1
// writing into kafka topic.
import org.apache.kafka.clients.producer.ProducerConfig
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
wc.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach{
case t:(w:String,cnt:Long)=>{
val x = w+"\t"+cnt
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String]("results1",null,x)
producer.send(message)
}
}))
-- execute above code before ssc.start.
--------------------------------------------
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic results1 --from-beginning
-------------------
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("spark-topic" -> 5))
1. --? KafkaUtils.createStream()..
needs 4 arguments.
1st ---> streaming Context
2nd --> zk details.
3rd --- > consumer group id
4th ----> Topics.
spark streaming can read from multiple topics.
topic should be as a key value pair of map object
key ---> topic name
value ---> no.of consumer threads.
to read from multiple topics,
the 4th argument should be as follows.
Map("t1"->2,"t2"->4,"t3"->1)
-------------------------
each given number of consumer threads will applied on each partition of kafka topic.
ex: topic has 3 threads,
consumber threads are 5.
so , total number of threads = 15.
but these 15 theads are not parallely executed.
at shot, 5 threads for one partiton will be parallely consuming data.
to make all (15) parallel.
val numparts = 3
val kstreams = (1 to numparts).map{x =>
val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer- group", Map("spark-topic" -> 5))
}