spark_streaming_examples
Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate java: --------------- import org.apache.spark.*; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import scala.Tuple2; // Create a local StreamingContext with two working thread and batch interval of 1 second SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)) // Create a DStream that will connect to hostname:port, like localhost:9999 JavaReceiverInputDStream lines = jssc.socketTextStream("localhost", 9999); // Split each line into words JavaDStream words = lines.flatMap( new FlatMapFunction() { @Override public Iterable call(String x) { return Arrays.asList(x.split(" ")); } }); // Count each word in each batch JavaPairDStream pairs = words.mapToPair( new PairFunction() { @Override public Tuple2 call(String s) { return new Tuple2(s, 1); } }); JavaPairDStream wordCounts = pairs.reduceByKey( new Function2() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print(); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate python: --------------- from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with two working thread and batch interval of 1 second sc = SparkContext("local[2]", "NetworkWordCount") ssc = StreamingContext(sc, 1) # Create a DStream that will connect to hostname:port, like localhost:9999 lines = ssc.socketTextStream("localhost", 9999) # Split each line into words words = lines.flatMap(lambda line: line.split(" ")) # Count each word in each batch pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) # Print the first ten elements of each RDD generated in this DStream to the console wordCounts.pprint() ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate ================================================================== nc -lk 9999 $SPARK_HOME/bin/run-example streaming.NetworkWordCount localhost 9999 $SPARK_HOME/bin/run-example streaming.JavaNetworkWordCount localhost 9999 $SPARK_HOME/bin/spark-submit $SPARK_HOME/examples/src/main/python/streaming/network_wordcount.py localhost 9999 ================================================================== $SPARK_HOME/bin/run-example streaming.TwitterPopularTags FlRx3d0n8duIQ0UvGeGtTA DS7TTbxhmQ7oCUlDntpQQRqQllFFOiyNoOMEDD0lA 1643982224-xTfNpLrARoWKxRh9KtFqc7aoB8KAAHkCcfC5vDk PqkbuBqF3AVskgx1OKgXKOZzV7EMWRmRG0p8hvLQYKs ================================================================== $FLUME_HOME/bin/flume-ng agent -n spark-flume --conf $FLUME_HOME/conf -f $FLUME_HOME/test/spark-flume.conf -Dflume.root.logger=DEBUG,console telnet localhost 3000 $SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.FlumeEventCount localhost 1234 ================================================================== $SPARK_HOME/bin/run-example streaming.HdfsWordCount file:/home/orienit/spark/input/test ================================================================== $SPARK_HOME/bin/run-example streaming.QueueStream ================================================================== $SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10 $SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444 $SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageVie
==========================================
scala:
---------------
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
java:
---------------
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a local StreamingContext with two working thread and batch interval of 1
second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream
// Split each line into words
JavaDStream
new FlatMapFunction
@Override public Iterable
return Arrays.asList(x.split(" "));
}
});
// Count each word in each batch
JavaPairDStream
new PairFunction
@Override public Tuple2
return new Tuple2
}
});
JavaPairDStream
new Function2
@Override public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
jssc.start();
// Start the computation
jssc.awaitTermination();
// Wait for the computation to terminate
python:
---------------
from pyspark
import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
==================================================================
nc -lk 9999
$SPARK_HOME/bin/run-example streaming.NetworkWordCount localhost 9999
$SPARK_HOME/bin/run-example streaming.JavaNetworkWordCount localhost 9999
$SPARK_HOME/bin/spark-submit $SPARK_HOME/examples/src/main/python/streaming/network_wordcount.py localhost 9999
==================================================================
$SPARK_HOME/bin/run-example streaming.TwitterPopularTags FlRx3d0n8duIQ0UvGeGtTA DS7TTbxhmQ7oCUlDntpQQRqQllFFOiyNoOMEDD0lA 1643982224-xTfNpLrARoWKxRh9KtFqc7aoB8KAAHkCcfC5vDk PqkbuBqF3AVskgx1OKgXKOZzV7EMWRmRG0p8hvLQYKs
==================================================================
$FLUME_HOME/bin/flume-ng agent -n spark-flume --conf $FLUME_HOME/conf -f $FLUME_HOME/test/spark-flume.conf -Dflume.root.logger=DEBUG,console
telnet localhost 3000
$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.FlumeEventCount localhost 1234
==================================================================
$SPARK_HOME/bin/run-example streaming.HdfsWordCount file:/home/orienit/spark/input/test
==================================================================
$SPARK_HOME/bin/run-example streaming.QueueStream
==================================================================
$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10
$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444
$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream activeUserCount localhost 44444
$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream popularUsersSeen localhost 44444
==================================================================
Usage: KafkaWordCount
$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 my-consumer-group topic1,topic2 1
==================================================================
$SPARK_HOME/bin/run-example streaming.StatefulNetworkWordCount localhost 9999