Spark : Joins 2

Denormalizing datasets using Joins[cloudera@quickstart ~]$ cat > childrenc101,p101,Ravi,34 c102,p101,Rani,24c103,p102,Mani,20c104,p103,Giri,22c105,p102,Vani,22[cloudera@quickstart ~]$ cat > parentsp101,madhu,madhavi,hydp102,Sathya,Veni,Delp103,Varma,Varuna,hyd[cloudera@quickstart ~]$ hadoop fs -copyFromLocal children spLab[cloudera@quickstart ~]$ hadoop fs -copyFromLocal parents spLab[cloudera@quickstart ~]$val children = sc.textFile("/user/cloudera/spLab/children")val parents = sc.textFile("/user/cloudera/spLab/parents")val chPair = children.map{ x =>        val w = x.split(",")        val pid = w(1)val chInfo =Array(w(0), w(2), w(3)).                      mkString(",")       (pid, chInfo)   }chPair.collect.foreach(println)(p101,c101,Ravi,34)(p101,c102,Rani,24)(p102,c103,Mani,20)(p103,c104,Giri,22)(p102,c105,Vani,22)val PPair = parents.map{ x =>           val w = x.split(",")           val pid = w(0) val pInfo = Array(w(1),w(2),w(3)).mkString(",")     (pid, pInfo)   }PPair.collect.foreach(println)          PPair.collect.foreach(println)(p101,madhu,madhavi,hyd)(p102,Sathya,Veni,Del)(p103,Varma,Varuna,hyd)val family = chPair.join(PPair)family.collect.foreach(println)(p101,(c101,Ravi,34,madhu,madhavi,hyd))(p101,(c102,Rani,24,madhu,madhavi,hyd))(p102,(c103,Mani,20,Sathya,Veni,Del))(p102,(c105,Vani,22,Sathya,Veni,Del))(p103,(c104,Giri,22,Varma,Varuna,hyd))val profiles = family.map{ x =>         val cinfo = x._2._1         val pinfo = x._2._2      val info = cinfo +","+ pinfo       info }profiles.collect.foreach(println)c101,Ravi,34,madhu,madhavi,hydc102,Rani,24,madhu,madhavi,hydc103,Mani,20,Sathya,Veni,Delc105,Vani,22,Sathya,Veni,Delc104,Giri,22,Varma,Varuna,hydprofiles.saveAsTextFile("/user/cloudera/spLab/profiles")[cloudera@quickstart ~]$ hadoop fs -ls spLab/profilesFound 2 items-rw-r--r--   1 cloudera cloudera          0 2017-05-08 21:02 spLab/profiles/_SUCCESS-rw-r--r--   1 cloudera cloudera        150 2017-05-08 21:02 spLab/profiles/part-00000[cloudera@quickstart ~]$ hadoop fs -cat spLab/profiles/part-00000c101,Ravi,34,madhu,madhavi,hydc102,Rani,24,madhu,madhavi,hydc103,Mani,20,Sathya,Veni,Delc105,Vani,22,Sathya,Veni,Delc104,Giri,22,Varma,Varuna,hyd[cloudera@quickstart ~]$

Denormalizing datasets using Joins
[cloudera@quickstart ~]$ cat > children
c101,p101,Ravi,34
c102,p101,Rani,24
c103,p102,Mani,20
c104,p103,Giri,22
c105,p102,Vani,22
[cloudera@quickstart ~]$ cat > parents
p101,madhu,madhavi,hyd
p102,Sathya,Veni,Del
p103,Varma,Varuna,hyd
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal children spLab
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal parents spLab
[cloudera@quickstart ~]$
val children = sc.textFile("/user/cloudera/spLab/children")
val parents = sc.textFile("/user/cloudera/spLab/parents")
val chPair = children.map{ x =>
        val w = x.split(",")
        val pid = w(1)
val chInfo =Array(w(0), w(2), w(3)).
                      mkString(",")
       (pid, chInfo)
   }
chPair.collect.foreach(println)
(p101,c101,Ravi,34)
(p101,c102,Rani,24)
(p102,c103,Mani,20)
(p103,c104,Giri,22)
(p102,c105,Vani,22)
val PPair = parents.map{ x =>
           val w = x.split(",")
           val pid = w(0)
 val pInfo = Array(w(1),w(2),w(3)).mkString(",")
     (pid, pInfo)
   }
PPair.collect.foreach(println)
         
PPair.collect.foreach(println)
(p101,madhu,madhavi,hyd)
(p102,Sathya,Veni,Del)
(p103,Varma,Varuna,hyd)
val family = chPair.join(PPair)
family.collect.foreach(println)
(p101,(c101,Ravi,34,madhu,madhavi,hyd))
(p101,(c102,Rani,24,madhu,madhavi,hyd))
(p102,(c103,Mani,20,Sathya,Veni,Del))
(p102,(c105,Vani,22,Sathya,Veni,Del))
(p103,(c104,Giri,22,Varma,Varuna,hyd))
val profiles = family.map{ x =>
         val cinfo = x._2._1
         val pinfo = x._2._2
      val info = cinfo +","+ pinfo
       info
 }
profiles.collect.foreach(println)
c101,Ravi,34,madhu,madhavi,hyd
c102,Rani,24,madhu,madhavi,hyd
c103,Mani,20,Sathya,Veni,Del
c105,Vani,22,Sathya,Veni,Del
c104,Giri,22,Varma,Varuna,hyd


profiles.saveAsTextFile("/user/cloudera/spLab/profiles")
[cloudera@quickstart ~]$ hadoop fs -ls spLab/profiles
Found 2 items
-rw-r--r--   1 cloudera cloudera          0 2017-05-08 21:02 spLab/profiles/_SUCCESS
-rw-r--r--   1 cloudera cloudera        150 2017-05-08 21:02 spLab/profiles/part-00000
[cloudera@quickstart ~]$ hadoop fs -cat spLab/profiles/part-00000
c101,Ravi,34,madhu,madhavi,hyd
c102,Rani,24,madhu,madhavi,hyd
c103,Mani,20,Sathya,Veni,Del
c105,Vani,22,Sathya,Veni,Del
c104,Giri,22,Varma,Varuna,hyd
[cloudera@quickstart ~]$