Spark : Union and Distinct

 Unions in spark.val l1 = List(10,20,30,40,50)val l2 = List(100,200,300,400,500)val r1 = sc.parallelize(l1)val r2 = sc.parallelize(l2)val r = r1.union(r2)scala> r.collect.foreach(println)[Stage 0:>                                                          (0 + 0                                                                          10   20304050100200300400500scala> r.countres1: Long = 10spark union allows duplicates.Using ++ operatory, merging can be done.scala> val r3 = r1 ++ r2r3: org.apache.spark.rdd.RDD[Int] = UnionRDD[3] at $plus$plus at :35scala> r3.collectres4: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500)scala>meging more than two sets.                     ^scala> val rr = r1.union(r2).union(rx)rr: org.apache.spark.rdd.RDD[Int] = UnionRDD[6] at union at :37scala> rr.countres5: Long = 13scala> rr.collectres6: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500, 15, 25, 35)scala>// orscala> val rr = r1 ++ r2 ++ rxrr: org.apache.spark.rdd.RDD[Int] = UnionRDD[8] at $plus$plus at :37scala> rr.collectres7: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500, 15, 25, 35)scala>--- eleminate duplicates.scala> val x = List(10,20,30,40,10,10,20)x: List[Int] = List(10, 20, 30, 40, 10, 10, 20)scala> x.distinctres8: List[Int] = List(10, 20, 30, 40)scala> val y = sc.parallelize(x)y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :29scala> r1.collectres14: Array[Int] = Array(10, 20, 30, 40, 50)scala> y.collectres15: Array[Int] = Array(10, 20, 30, 40, 10, 10, 20)scala> val nodupes = (r1 ++ y).distinctnodupes: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at distinct at :35scala> nodupes.collect[Stage 10:>                                                         (0 + 0                                                                          res16: Array[Int] = Array(30, 50, 40, 20, 10)scala>---------------------------------------[cloudera@quickstart ~]$ hadoop fs -cat spLab/emp101,aaaa,40000,m,11102,bbbbbb,50000,f,12103,cccc,50000,m,12104,dd,90000,f,13105,ee,10000,m,12106,dkd,40000,m,12107,sdkfj,80000,f,13108,iiii,50000,m,11[cloudera@quickstart ~]$ hadoop fs -cat spLab/emp2201,Ravi,80000,m,12202,Varun,90000,m,11203,Varuna,100000,f,13204,Vanila,50000,f,12205,Mani,30000,m,14206,Manisha,30000,f,14[cloudera@quickstart ~]$scala> val branch1 = sc.textFile("/user/cloudera/spLab/emp")branch1: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/emp MapPartitionsRDD[15] at textFile at :27scala> val branch2 = sc.textFile("/user/cloudera/spLab/emp2")branch2: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/emp2 MapPartitionsRDD[17] at textFile at :27scala> val emp = branch1.union(branch2)emp: org.apache.spark.rdd.RDD[String] = UnionRDD[18] at union at :31scala> emp.collect.foreach(println)scala> emp.collect.foreach(println)101,aaaa,40000,m,11102,bbbbbb,50000,f,12103,cccc,50000,m,12104,dd,90000,f,13105,ee,10000,m,12106,dkd,40000,m,12107,sdkfj,80000,f,13108,iiii,50000,m,11201,Ravi,80000,m,12202,Varun,90000,m,11203,Varuna,100000,f,13204,Vanila,50000,f,12205,Mani,30000,m,14206,Manisha,30000,f,14-------------------------------- distinct:  to eleminated duplicates  based on entire row match. limitations: can not eleminated based on some  column(s) match.   for this solution:     by iterating compactBuffer.   [ later we will see ]grouping aggregation on merged set.scala> val pair = emp.map{ x =>     |       val w = x.split(",")     |       val dno = w(4).toInt     |       val sal = w(2).toInt     |      (dno, sal)     | }pair: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[19] at map at :35scala> val eres = pair.reduceByKey(_+_)eres: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[20] at reduceByKey at :37scala> eres.collect.foreach(println)(14,60000)(12,280000)(13,270000)(11,180000)scala>-- in this output we dont have seperate total for branch1 and branch2. for this solution: COGROUP.    [NEXT DOCUMENT ]



 Unions in spark.
val l1 = List(10,20,30,40,50)
val l2 = List(100,200,300,400,500)
val r1 = sc.parallelize(l1)
val r2 = sc.parallelize(l2)
val r = r1.union(r2)
scala> r.collect.foreach(println)
[Stage 0:>                                                          (0 + 0                                                                          10  
20
30
40
50
100
200
300
400
500
scala> r.count
res1: Long = 10

spark union allows duplicates.
Using ++ operatory, merging can be done.
scala> val r3 = r1 ++ r2
r3: org.apache.spark.rdd.RDD[Int] = UnionRDD[3] at $plus$plus at :35
scala> r3.collect
res4: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500)
scala>
meging more than two sets.
                     ^
scala> val rr = r1.union(r2).union(rx)
rr: org.apache.spark.rdd.RDD[Int] = UnionRDD[6] at union at :37
scala> rr.count
res5: Long = 13
scala> rr.collect
res6: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500, 15, 25, 35)
scala>// or
scala> val rr = r1 ++ r2 ++ rx
rr: org.apache.spark.rdd.RDD[Int] = UnionRDD[8] at $plus$plus at :37
scala> rr.collect
res7: Array[Int] = Array(10, 20, 30, 40, 50, 100, 200, 300, 400, 500, 15, 25, 35)
scala>
--- eleminate duplicates.
scala> val x = List(10,20,30,40,10,10,20)
x: List[Int] = List(10, 20, 30, 40, 10, 10, 20)
scala> x.distinct
res8: List[Int] = List(10, 20, 30, 40)
scala> val y = sc.parallelize(x)
y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :29
scala> r1.collect
res14: Array[Int] = Array(10, 20, 30, 40, 50)
scala> y.collect
res15: Array[Int] = Array(10, 20, 30, 40, 10, 10, 20)
scala> val nodupes = (r1 ++ y).distinct
nodupes: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at distinct at :35
scala> nodupes.collect
[Stage 10:>                                                         (0 + 0                                                                          res16: Array[Int] = Array(30, 50, 40, 20, 10)
scala>
---------------------------------------
[cloudera@quickstart ~]$ hadoop fs -cat spLab/emp
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
[cloudera@quickstart ~]$ hadoop fs -cat spLab/emp2
201,Ravi,80000,m,12
202,Varun,90000,m,11
203,Varuna,100000,f,13
204,Vanila,50000,f,12
205,Mani,30000,m,14
206,Manisha,30000,f,14
[cloudera@quickstart ~]$
scala> val branch1 = sc.textFile("/user/cloudera/spLab/emp")
branch1: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/emp MapPartitionsRDD[15] at textFile at :27
scala> val branch2 = sc.textFile("/user/cloudera/spLab/emp2")
branch2: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/emp2 MapPartitionsRDD[17] at textFile at :27
scala> val emp = branch1.union(branch2)
emp: org.apache.spark.rdd.RDD[String] = UnionRDD[18] at union at :31
scala> emp.collect.foreach(println)
scala> emp.collect.foreach(println)
101,aaaa,40000,m,11
102,bbbbbb,50000,f,12
103,cccc,50000,m,12
104,dd,90000,f,13
105,ee,10000,m,12
106,dkd,40000,m,12
107,sdkfj,80000,f,13
108,iiii,50000,m,11
201,Ravi,80000,m,12
202,Varun,90000,m,11
203,Varuna,100000,f,13
204,Vanila,50000,f,12
205,Mani,30000,m,14
206,Manisha,30000,f,14
--------------------------------
 distinct:
  to eleminated duplicates
  based on entire row match.
 limitations: can not eleminated based on some  column(s) match.
   for this solution:
     by iterating compactBuffer.
   [ later we will see ]
grouping aggregation on merged set.
scala> val pair = emp.map{ x =>
     |       val w = x.split(",")
     |       val dno = w(4).toInt
     |       val sal = w(2).toInt
     |      (dno, sal)
     | }
pair: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[19] at map at :35
scala> val eres = pair.reduceByKey(_+_)
eres: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[20] at reduceByKey at :37
scala> eres.collect.foreach(println)
(14,60000)
(12,280000)
(13,270000)
(11,180000)
scala>
-- in this output we dont have seperate total for branch1 and branch2.
 for this solution: COGROUP.
    [NEXT DOCUMENT ]