Denormalizing datasets using Joins [cloudera@quickstart ~]$ cat > children c101,p101,Ravi,34 c102,p101,Rani,24 c103,p102,Mani,20 c104,p103,Giri,22 c105,p102,Vani,22 [cloudera@quickstart ~]$ cat > parents p101,madhu,madhavi,hyd p102,Sathya,Veni,Del p103,Varma,Varuna,hyd [cloudera@quickstart ~]$ hadoop fs -copyFromLocal children spLab [cloudera@quickstart ~]$ hadoop fs -copyFromLocal parents spLab [cloudera@quickstart ~]$ val children = sc.textFile("/user/cloudera/spLab/children") val parents = sc.textFile("/user/cloudera/spLab/parents") val chPair = children.map{ x => val w = x.split(",") val pid = w(1) val chInfo =Array(w(0), w(2), w(3)). mkString(",") (pid, chInfo) } chPair.collect.foreach(println) (p101,c101,Ravi,34) (p101,c102,Rani,24) (p102,c103,Mani,20) (p103,c104,Giri,22) (p102,c105,Vani,22) val PPair = parents.map{ x => val w = x.split(",") val pid = w(0) val pInfo = Array(w(1),w(2),w(3)).mkString(",") (pid, pInfo) } PPair.collect.foreach(println)
PPair.collect.foreach(println) (p101,madhu,madhavi,hyd) (p102,Sathya,Veni,Del) (p103,Varma,Varuna,hyd) val family = chPair.join(PPair) family.collect.foreach(println) (p101,(c101,Ravi,34,madhu,madhavi,hyd)) (p101,(c102,Rani,24,madhu,madhavi,hyd)) (p102,(c103,Mani,20,Sathya,Veni,Del)) (p102,(c105,Vani,22,Sathya,Veni,Del)) (p103,(c104,Giri,22,Varma,Varuna,hyd)) val profiles = family.map{ x => val cinfo = x._2._1 val pinfo = x._2._2 val info = cinfo +","+ pinfo info } profiles.collect.foreach(println) c101,Ravi,34,madhu,madhavi,hyd c102,Rani,24,madhu,madhavi,hyd c103,Mani,20,Sathya,Veni,Del c105,Vani,22,Sathya,Veni,Del c104,Giri,22,Varma,Varuna,hyd