SPARK_DAY1_PRACTICE
Spark: --------------------- Apache Spark™ is a fast and general engine for large-scale data processing. Spark Libraries on Spark-Core: -------------------------------- 1. Spark SQL 2. Spark Streaming 3. Spark MLLib 4. Spark GraphX Spark Supports 4 Programming Languages: ------------------------------------------ Scala, Java, Python ,R How to Start the Spark in Command Line: -------------------------------------------- Scala => $SPARK_HOME/bin/spark-shell Python => $SPARK_HOME/bin/pyspark R => $SPARK_HOME/bin/sparkR Spark-2.x: -------------------------------------------- Spark context available as 'sc' Spark session available as 'spark' Spark-1.x: -------------------------------------------- Spark context available as 'sc' Spark SQLContext available as 'sqlContext' Note: ------------ `Spark Context` is `Entry Point` for any `Spark Operations`. Resilient Distributed DataSets (RDD): -------------------------------------------- RDD Features: ---------------- 1. Immutability 2. Lazy Evaluation 3. Cacheable 4. Type Infer RDD Operations: ----------------- 1. Transformations Convert `RDD` into `RDD` ex: old_rdd ==> new_rdd 2. Actions Convert `RDD` into `Result` ex: rdd ==> result How to Create a RDD? ----------------------------------- We can create RDD in 2 ways 1. from collections (List, Set, Seq, ...) 2. from data sets (text, csv, tsv, json, ...) How to create RDD from `collections`? --------------------------------------- val list = List(1,2,3,4,5,6) val rdd = sc.parallelize(, ) val rdd = sc.parallelize(list,6) Create a RDD with bydefault Partitions: -------------------------------------- val rdd = sc.parallelize(list) scala> val list = List(1,2,3,4,5,6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val rdd = sc.parallelize(list) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :26 scala> rdd.getNumPartitions res0: Int = 4 Create a RDD with 2 Partitions: -------------------------------------- val rdd = sc.parallelize(list, 2) scala> val rdd = sc.parallelize(list, 2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :26 scala> rdd.getNumPartitions res1: Int = 2 --------------------------------------------------- scala> rdd.collect res3: Array[Int] = Array(1, 2, 3, 4, 5, 6) scala> rdd.collect.foreach(println) 1 2 3 4 5 6 --------------------------------------------------- scala> rdd.foreach(println) 1 2 3 4 5 6 scala> rdd.foreach(println) 4 1 2 3 5 6 --------------------------------------------------- Note: ------------ `collect` will collect the data from `all the partitions` `collect` will ensure the `order of the data` Don't use `collect` in `Real-Time Use Cases`, the reason is `It will end up with MEMORY issues` --------------------------------------------------- x.map(data => (data, data)) or x.map(y => (y, y)) or x.map(z => (z, z)) or def f(p : Int) : (Int, Int) = { (p,p) } x.map(f) --------------------------------------------------- Importance of `mapPartitionsWithIndex` --------------------------------------------------- scala> rdd.mapPartitionsWithIndex :29: error: missing argument list for method mapPartitionsWithIndex in class RDD Unapplied methods are only converted to functions when a function type is expected. You can make this conversion explicit by writing `mapPartitionsWithIndex _` or `mapPartitionsWithIndex(_,_)(_)` instead of `mapPartitionsWithIndex`. rdd.mapPartitionsWithIndex ^ scala> rdd.mapPartitionsWithIndex() :29: error: not enough arguments for method mapPartitionsWithIndex: (f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean)(implicit evidence$8: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[U]. Unspecified value parameter f. rdd.mapPartitionsWithIndex() ^ --------------------------------------------------- def mapPartitionsWithIndex(f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean) def f(Int, Iterator[Int]) : Iterator[U] --------------------------------------------------- def myFunc1(index: Int, it: Iterator[Int]) : Iterator[String]= { it.toList.map(data => s"index: $index, data: $data").iterator } --------------------------------------------------- rdd.mapPartitionsWithIndex(myFunc1) rdd.mapPartitionsWithIndex(myFunc1).collect rdd.mapPartitionsWithIndex(myFunc1).collect.foreach(println) --------------------------------------------------- scala> def myFunc1(index: Int, it: Iterator[Int]) : Iterator[String]= { | it.toList.map(data => s"index: $index, data: $data").iterator | } myFunc1: (index: Int, it: Iterator[Int])Iterator[String] --------------------------------------------------- scala> rdd.mapPartitionsWithIndex(myFunc1) res28: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at :31 scala> rdd.mapPartitionsWithIndex(myFunc1).collect res29: Array[String] = Array(index: 0, data: 1, index:
---------------------
Apache Spark™ is a fast and general engine for large-scale data processing.
Spark Libraries on Spark-Core:
--------------------------------
1. Spark SQL
2. Spark Streaming
3. Spark MLLib
4. Spark GraphX
Spark Supports 4 Programming Languages:
------------------------------------------
Scala, Java, Python ,R
How to Start the Spark in Command Line:
--------------------------------------------
Scala => $SPARK_HOME/bin/spark-shell
Python => $SPARK_HOME/bin/pyspark
R => $SPARK_HOME/bin/sparkR
Spark-2.x:
--------------------------------------------
Spark context available as 'sc'
Spark session available as 'spark'
Spark-1.x:
--------------------------------------------
Spark context available as 'sc'
Spark SQLContext available as 'sqlContext'
Note:
------------
`Spark Context` is `Entry Point` for any `Spark Operations`.
Resilient Distributed DataSets (RDD):
--------------------------------------------
RDD Features:
----------------
1. Immutability
2. Lazy Evaluation
3. Cacheable
4. Type Infer
RDD Operations:
-----------------
1. Transformations
Convert `RDD` into `RDD`
ex: old_rdd ==> new_rdd
2. Actions
Convert `RDD` into `Result`
ex: rdd ==> result
How to Create a RDD?
-----------------------------------
We can create RDD in 2 ways
1. from collections (List, Set, Seq, ...)
2. from data sets (text, csv, tsv, json, ...)
How to create RDD from `collections`?
---------------------------------------
val list = List(1,2,3,4,5,6)
val rdd = sc.parallelize(
val rdd = sc.parallelize(list,6)
Create a RDD with bydefault Partitions:
--------------------------------------
val rdd = sc.parallelize(list)
scala> val list = List(1,2,3,4,5,6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at
scala> rdd.getNumPartitions
res0: Int = 4
Create a RDD with 2 Partitions:
--------------------------------------
val rdd = sc.parallelize(list, 2)
scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at
scala> rdd.getNumPartitions
res1: Int = 2
---------------------------------------------------
scala> rdd.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> rdd.collect.foreach(println)
1
2
3
4
5
6
---------------------------------------------------
scala> rdd.foreach(println)
1
2
3
4
5
6
scala> rdd.foreach(println)
4
1
2
3
5
6
---------------------------------------------------
Note:
------------
`collect` will collect the data from `all the partitions`
`collect` will ensure the `order of the data`
Don't use `collect` in `Real-Time Use Cases`, the reason is `It will end up with MEMORY issues`
---------------------------------------------------
x.map(data => (data, data))
or
x.map(y => (y, y))
or
x.map(z => (z, z))
or
def f(p : Int) : (Int, Int) = {
(p,p)
}
x.map(f)
---------------------------------------------------
Importance of `mapPartitionsWithIndex`
---------------------------------------------------
scala> rdd.mapPartitionsWithIndex
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `mapPartitionsWithIndex _` or `mapPartitionsWithIndex(_,_)(_)` instead of `mapPartitionsWithIndex`.
rdd.mapPartitionsWithIndex
^
scala> rdd.mapPartitionsWithIndex()
Unspecified value parameter f.
rdd.mapPartitionsWithIndex()
^
---------------------------------------------------
def mapPartitionsWithIndex(f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean)
def f(Int, Iterator[Int]) : Iterator[U]
---------------------------------------------------
def myFunc1(index: Int, it: Iterator[Int]) : Iterator[String]= {
it.toList.map(data => s"index: $index, data: $data").iterator
}
---------------------------------------------------
rdd.mapPartitionsWithIndex(myFunc1)
rdd.mapPartitionsWithIndex(myFunc1).collect
rdd.mapPartitionsWithIndex(myFunc1).collect.foreach(println)
---------------------------------------------------
scala> def myFunc1(index: Int, it: Iterator[Int]) : Iterator[String]= {
| it.toList.map(data => s"index: $index, data: $data").iterator
| }
myFunc1: (index: Int, it: Iterator[Int])Iterator[String]
---------------------------------------------------
scala> rdd.mapPartitionsWithIndex(myFunc1)
res28: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at
scala> rdd.mapPartitionsWithIndex(myFunc1).collect
res29: Array[String] = Array(index: 0, data: 1, index: 0, data: 2, index: 0, data: 3, index: 1, data: 4, index: 1, data: 5, index: 1, data: 6)
scala> rdd.mapPartitionsWithIndex(myFunc1).collect.foreach(println)
index: 0, data: 1
index: 0, data: 2
index: 0, data: 3
index: 1, data: 4
index: 1, data: 5
index: 1, data: 6
---------------------------------------------------
val names = List("kalyan", "anil", "raj", "sunil", "rajesh", "dev")
val rdd1 = sc.parallelize(names, 2)
// the below one won't work
rdd1.mapPartitionsWithIndex(myFunc1)
---------------------------------------------------
scala> val names = List("kalyan", "anil", "raj", "sunil", "rajesh", "dev")
names: List[String] = List(kalyan, anil, raj, sunil, rajesh, dev)
scala> val rdd1 = sc.parallelize(names, 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at
---------------------------------------------------
scala> rdd1.mapPartitionsWithIndex(myFunc1)
found : (Int, Iterator[Int]) => Iterator[String]
required: (Int, Iterator[String]) => Iterator[?]
Error occurred in an application involving default arguments.
rdd1.mapPartitionsWithIndex(myFunc1)
^
---------------------------------------------------
def myFunc2(index: Int, it: Iterator[String]) : Iterator[String]= {
it.toList.map(data => s"index: $index, data: $data").iterator
}
rdd1.mapPartitionsWithIndex(myFunc2)
---------------------------------------------------
scala> def myFunc2(index: Int, it: Iterator[String]) : Iterator[String]= {
| it.toList.map(data => s"index: $index, data: $data").iterator
| }
myFunc2: (index: Int, it: Iterator[String])Iterator[String]
scala> rdd1.mapPartitionsWithIndex(myFunc2)
res32: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at mapPartitionsWithIndex at
scala> rdd1.mapPartitionsWithIndex(myFunc2).collect
res33: Array[String] = Array(index: 0, data: kalyan, index: 0, data: anil, index: 0, data: raj, index: 1, data: sunil, index: 1, data: rajesh, index: 1, data: dev)
scala> rdd1.mapPartitionsWithIndex(myFunc2).collect.foreach(println)
index: 0, data: kalyan
index: 0, data: anil
index: 0, data: raj
index: 1, data: sunil
index: 1, data: rajesh
index: 1, data: dev
---------------------------------------------------
How to create `Generalized Function`:
------------------------------------------
def myFunc[T](index: Int, it: Iterator[T]) : Iterator[String]= {
it.toList.map(data => s"index: $index, data: $data").iterator
}
rdd.mapPartitionsWithIndex(myFunc).collect.foreach(println)
rdd1.mapPartitionsWithIndex(myFunc).collect.foreach(println)
---------------------------------------------------
scala> def myFunc[T](index: Int, it: Iterator[T]) : Iterator[String]= {
| it.toList.map(data => s"index: $index, data: $data").iterator
| }
myFunc: [T](index: Int, it: Iterator[T])Iterator[String]
scala> rdd.mapPartitionsWithIndex(myFunc).collect.foreach(println)
index: 0, data: 1
index: 0, data: 2
index: 0, data: 3
index: 1, data: 4
index: 1, data: 5
index: 1, data: 6
scala> rdd1.mapPartitionsWithIndex(myFunc).collect.foreach(println)
index: 0, data: kalyan
index: 0, data: anil
index: 0, data: raj
index: 1, data: sunil
index: 1, data: rajesh
index: 1, data: dev
---------------------------------------------------
scala> rdd.aggregate
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `aggregate _` or `aggregate(_)(_,_)(_)` instead of `aggregate`.
rdd.aggregate
^
scala> rdd.aggregate()
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `aggregate _` or `aggregate(_)(_,_)(_)` instead of `aggregate`.
rdd.aggregate()
^
scala> rdd.aggregate()()
Unspecified value parameters seqOp, combOp.
rdd.aggregate()()
^
---------------------------------------------------
def aggregate(zeroValue)(seqOp: (Unit, Int) => Unit, combOp: (Unit, Unit) => Unit)
---------------------------------------------------
val zeroValue = Int
def seqOp(Unit, Int) : Unit = {}
def combOp(Unit, Unit) : Unit = {}
Note: `Unit` is the mainly expected `Return Type`
---------------------------------------------------
Implementation of `aggregate`
---------------------------------------------------
Example1: Sum the RDD (Unit => Int)
---------------------------------------------
Note: replce `Unit` with `Int` all the places
val zeroValue = 0
def seqOp(res: Int, data: Int) : Int = {
res + data
}
def combOp(res1: Int, res2: Int) : Int = {
res1 + res2
}
rdd.aggregate(zeroValue)(seqOp,combOp)
---------------------------------------------------
scala> val zeroValue = 0
zeroValue: Int = 0
scala> def seqOp(res: Int, data: Int) : Int = {
| res + data
| }
seqOp: (res: Int, data: Int)Int
scala> def combOp(res1: Int, res2: Int) : Int = {
| res1 + res2
| }
combOp: (res1: Int, res2: Int)Int
scala> rdd.aggregate(zeroValue)(seqOp,combOp)
res40: Int = 21
---------------------------------------------------
Example2: Multiply the RDD (Unit => Int)
---------------------------------------------
Note: replce `Unit` with `Int` all the places
val zeroValue = 1
def seqOp(res: Int, data: Int) : Int = {
res * data
}
def combOp(res1: Int, res2: Int) : Int = {
res1 * res2
}
rdd.aggregate(zeroValue)(seqOp,combOp)
---------------------------------------------------
scala> val zeroValue = 1
zeroValue: Int = 1
scala> def seqOp(res: Int, data: Int) : Int = {
| res * data
| }
seqOp: (res: Int, data: Int)Int
scala> def combOp(res1: Int, res2: Int) : Int = {
| res1 * res2
| }
combOp: (res1: Int, res2: Int)Int
scala> rdd.aggregate(zeroValue)(seqOp,combOp)
res41: Int = 720
---------------------------------------------------
Find the `average` of 1 to 10 using `aggregate` function?
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
scala> rdd.reduce
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `reduce _` or `reduce(_)` instead of `reduce`.
rdd.reduce
^
scala> rdd.reduce()
Unspecified value parameter f.
rdd.reduce()
^
---------------------------------------------------
def f(Int, Int) : Int
---------------------------------------------------
def sum(res: Int, data: Int) : Int = {res + data}
def mul(res: Int, data: Int) : Int = {res * data}
---------------------------------------------------
scala> def sum(res: Int, data: Int) : Int = {res + data}
sum: (res: Int, data: Int)Int
scala> def mul(res: Int, data: Int) : Int = {res * data}
mul: (res: Int, data: Int)Int
scala> rdd.reduce(sum)
res44: Int = 21
scala> rdd.reduce(mul)
res45: Int = 720
---------------------------------------------------
Compare `aggregate` and `reduce`:
-----------------------------------------
aggregate(zeroValue)(seqOp, comOp)
aggregate(intialValue)(seqOp, seqOp)
<==>
reduce(seqOp)
---------------------------------------------------
Transformations in Spark:
----------------------------
scala> rdd.collect.foreach(println)
1
2
3
4
5
6
scala> rdd.map(x => x + 1)
res49: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at map at
scala> rdd.map(x => x + 1).collect.foreach(println)
2
3
4
5
6
7
scala> rdd.map(x => x * x).collect.foreach(println)
1
4
9
16
25
36
scala> rdd.map(x => x * x * x).collect.foreach(println)
1
8
27
64
125
216
---------------------------------------------------
scala> rdd.filter(x => x > 3)
res53: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at filter at
scala> rdd.filter(x => x > 3).collect.foreach(println)
4
5
6
scala> rdd.filter(x => x % 2 == 1).collect.foreach(println)
1
3
5
scala> rdd.filter(x => x % 2 == 0).collect.foreach(println)
2
4
6
---------------------------------------------------
scala> rdd.filter(x => x % 2 == 0).collect.foreach(println)
2
4
6
scala> rdd.filter(_ % 2 == 0).collect.foreach(println)
2
4
6
scala> rdd.filter(x => x > 3).collect.foreach(println)
4
5
6
scala> rdd.filter(_ > 3).collect.foreach(println)
4
5
6
scala> rdd.filter(_ > 3).collect.foreach(x => println(x))
[Stage 45:> (0 + 4
5
6
scala> rdd.filter(_ > 3).collect.foreach(println)
4
5
6
scala> rdd.filter(_ > 3).collect.foreach(println(_))
4
5
6
---------------------------------------------------
val data = sc.textFile("hdfs:///user/tsipl1059/lakshmi/spark_sample.txt")
data: org.apache.spark.rdd.RDD[String] = hdfs:///user/tsipl1059/lakshmi/spark_sample.txt MapPartitionsRDD[1] at textFile at
data.collect()
res0: Array[String] = Array(this is my first spark smaple data., Load data into RDD using scala language., "testing the data ", count the number of lines in a file)
---------------------------------------------------
---------------------------------------------------
val r2 = sc.textFile("/user/tsipl1115/input.txt") =====================> where file located in hdfs and moving that into rdd
---------------------------------------------------
val r1 = sc.textFile("/home/tsipl1115/input.txt")========================> where file located in local and moving that into rdd
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------