Spark : Handling CSV files .. Removing Headers

scala> val l = List(10,20,30,40,50,56,67) scala> val r2 = r.collect.reverse.take(3) r2: Array[Int] = Array(67, 56, 50) scala> val r2 = sc.parallelize(r.collect.reverse.take(3)) r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :31 ------------------------------- hadling CSV files [ first is header ] [cloudera@quickstart ~]$ gedit prods [cloudera@quickstart ~]$ hadoop fs -copyFromLocal prods spLab scala> val raw  = sc.textFile("/user/cloudera/spLab/prods") raw: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/prods MapPartitionsRDD[11] at textFile at :27 scala> raw.collect.foreach(println) "pid","name","price" p1,Tv,50000 p2,Lap,70000 p3,Ipod,8000 p4,Mobile,9000 scala> raw.count res18: Long = 5 to eleminate first element, slice is used .  scala> l res19: List[Int] = List(10, 20, 30, 40, 50, 50, 56, 67) scala> l.slice(2,5) res20: List[Int] = List(30, 40, 50) scala> l.slice(1,l.size) res21: List[Int] = List(20, 30, 40, 50, 50, 56, 67) way1: scala> raw.collect res29: Array[String] = Array("pid","name","price", p1,Tv,50000, p2,Lap,70000, p3,Ipod,8000, p4,Mobile,9000) scala> val data = sc.parallelize(raw.collect.slice(1,raw.collect.size)) data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at :29 scala> data.collect.foreach(println) p1,Tv,50000 p2,Lap,70000 p3,Ipod,8000 p4,Mobile,9000 scala>  here slice is not available with rdd.  so  , data to be collected into local , then  slice has to applied.  if rdd volume is bigger, client can not collect it. flow will be failed. Way2: ------  val data = raw.filter(x =>      !line.contains("pid"))  data.persist --adv: no need to collect data into client[local] --disadv : to eleminate 1 row, scanning all rows. -----------------------------------------

scala> val l = List(10,20,30,40,50,56,67)

scala> val r2 = r.collect.reverse.take(3)
r2: Array[Int] = Array(67, 56, 50)

scala> val r2 = sc.parallelize(r.collect.reverse.take(3))
r2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :31


-------------------------------
hadling CSV files [ first is header ]

[cloudera@quickstart ~]$ gedit prods
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal prods spLab

scala> val raw  = sc.textFile("/user/cloudera/spLab/prods")
raw: org.apache.spark.rdd.RDD[String] = /user/cloudera/spLab/prods MapPartitionsRDD[11] at textFile at :27

scala> raw.collect.foreach(println)
"pid","name","price"
p1,Tv,50000
p2,Lap,70000
p3,Ipod,8000
p4,Mobile,9000

scala> raw.count
res18: Long = 5



to eleminate first element, slice is used .

 scala> l
res19: List[Int] = List(10, 20, 30, 40, 50, 50, 56, 67)

scala> l.slice(2,5)
res20: List[Int] = List(30, 40, 50)

scala> l.slice(1,l.size)
res21: List[Int] = List(20, 30, 40, 50, 50, 56, 67)

way1:

scala> raw.collect
res29: Array[String] = Array("pid","name","price", p1,Tv,50000, p2,Lap,70000, p3,Ipod,8000, p4,Mobile,9000)

scala> val data = sc.parallelize(raw.collect.slice(1,raw.collect.size))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at :29

scala> data.collect.foreach(println)
p1,Tv,50000
p2,Lap,70000
p3,Ipod,8000
p4,Mobile,9000

scala>

 here slice is not available with rdd.
 so  , data to be collected into local , then  slice has to applied.
 if rdd volume is bigger, client can not collect it. flow will be failed.

Way2:
------

 val data = raw.filter(x =>
     !line.contains("pid"))

 data.persist
--adv: no need to collect data into client[local]

--disadv : to eleminate 1 row, scanning all rows.

-----------------------------------------