POC #: Analyse social bookmarking sites to find insights

 Industry: Social Media Data: It comprises of the information gathered from sites which are bookmarking sites and allow you to bookmark, review, rate, on specific topic. A bookmarking site allows you to bookmark, review, rate, search various links on any topic. The data is in XML format and contains various categories defining it and the ratings linked with it. Problem Statement: Analyse the data in Hadoop Eco-system to: 1. Fetch the data into Hadoop Distributed File System and analyze it with the help of MapReduce, Pig and Hive to find the top rated links based on the user comments, likes etc. 2. Using MapReduce convert the semi-structured format (XML data) into structured 3. Push the (MapReduce) output HDFS and then feed it into PIG, which splits the data into two parts: Category data and Ratings data. 4. Write a fancy Hive Query to analyze the data further and push the output is into relational database (RDBMS) using Sqoop. POC Coding Details Input Data (XML Files)       MapReduce Code to convert XML File to Flat File or Comma Separated File. MyMapper.java import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; public class MyMapper extends Mapper {     private static final Log LOG = LogFactory.getLog(MyMapper.class);     // Fprivate Text videoName = new Text();     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {         try {             InputStream is = new ByteArrayInputStream(value.toString().getBytes());             DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();             DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();             Document doc = dBuilder.parse(is);             doc.getDocumentElement().normalize();             NodeList nList = doc.getElementsByTagName("book");             for (int temp = 0; temp < nList.getLength(); temp++) {                 Node nNode = nList.item(temp);                 if (nNode.getNodeType() == Node.ELEMENT_NODE) {                     Element eElement = (Element) nNode;                     String id = eElement.getElementsByTagName("id").item(0).getTextContent();                     String author = eElement.getElementsByTagName("author").item(0).getTextContent();                     String title = eElement.getElementsByTagName("title").item(0).getTextContent();                     String genre = eElement.getElementsByTagName("genre").item(0).getTextContent();                     String price = eElement.getElementsByTagName("price").item(0).getTextContent();                     String publish_date = eElement.getElementsByTagName("publish_date").item(0).getTextContent();                     String descriptions = eElement.getElementsByTagName("descriptions").item(0).getTextContent();                     String review = eElement.getElementsByTagName("review").item(0).getTextContent();                     String rate = eElement.getElementsByTagName("rate").item(0).getTextContent();                     String comments = eElement.getElementsByTagName("comments").item(0).getTextContent();                                  context.write(new Text(id + "," + author + "," + title + "," + genre + "," + price + "," + publish_date + "," + descriptions + "," + review + "," + rate + "," + comments), NullWritable.get());                 }             }         } catch (Exception e) {               throw new IOException(e);         }     } } XMLDriver.java import java.io.IOException;  import javax.xml.stream.XMLInputFactory; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;  import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.*; import org.apache.hadoop.util.GenericOptionsParser; public class XMLDriver {     /** Bhavesh - for processing XML file using Hadoop MapReduce      * @param args      * @throws IOException      */     public static void main(String[] args) throws IOException {         try {             Configuration conf = new Configuration();                         Strin

POC #: Analyse social bookmarking sites to find insights

 Industry: Social Media


Data: It comprises of the information gathered from sites which are bookmarking sites and allow you to bookmark, review, rate, on specific topic. A bookmarking site allows you to bookmark, review, rate, search various links on any topic. The data is in XML format and contains various categories defining it and the ratings linked with it.

Problem Statement: Analyse the data in Hadoop Eco-system to:

1. Fetch the data into Hadoop Distributed File System and analyze it with the help of MapReduce, Pig and Hive to find the top rated links based on the user comments, likes etc.

2. Using MapReduce convert the semi-structured format (XML data) into structured

3. Push the (MapReduce) output HDFS and then feed it into PIG, which splits the data into two parts: Category data and Ratings data.

4. Write a fancy Hive Query to analyze the data further and push the output is into relational database (RDBMS) using Sqoop.


POC Coding Details

Input Data (XML Files)



      MapReduce Code to convert XML File to Flat File or Comma Separated File.


MyMapper.java

import java.io.ByteArrayInputStream;

import java.io.IOException;

import java.io.InputStream;
import javax.xml.parsers.DocumentBuilder;

import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.w3c.dom.Document;

import org.w3c.dom.Element;

import org.w3c.dom.Node;

import org.w3c.dom.NodeList;


public class MyMapper extends Mapper {


    private static final Log LOG = LogFactory.getLog(MyMapper.class);


    // Fprivate Text videoName = new Text();


    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


        try {


            InputStream is = new ByteArrayInputStream(value.toString().getBytes());

            DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();

            DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();

            Document doc = dBuilder.parse(is);


            doc.getDocumentElement().normalize();


            NodeList nList = doc.getElementsByTagName("book");


            for (int temp = 0; temp < nList.getLength(); temp++) {


                Node nNode = nList.item(temp);


                if (nNode.getNodeType() == Node.ELEMENT_NODE) {


                    Element eElement = (Element) nNode;


                    String id = eElement.getElementsByTagName("id").item(0).getTextContent();

                    String author = eElement.getElementsByTagName("author").item(0).getTextContent();

                    String title = eElement.getElementsByTagName("title").item(0).getTextContent();

                    String genre = eElement.getElementsByTagName("genre").item(0).getTextContent();

                    String price = eElement.getElementsByTagName("price").item(0).getTextContent();

                    String publish_date = eElement.getElementsByTagName("publish_date").item(0).getTextContent();

                    String descriptions = eElement.getElementsByTagName("descriptions").item(0).getTextContent();

                    String review = eElement.getElementsByTagName("review").item(0).getTextContent();

                    String rate = eElement.getElementsByTagName("rate").item(0).getTextContent();

                    String comments = eElement.getElementsByTagName("comments").item(0).getTextContent();

            

                    context.write(new Text(id + "," + author + "," + title + "," + genre + "," + price + "," + publish_date + "," + descriptions + "," + review + "," + rate + "," + comments), NullWritable.get());


                }

            }

        } catch (Exception e) {

              throw new IOException(e);

        }


    }


}


XMLDriver.java


import java.io.IOException; 
import javax.xml.stream.XMLInputFactory;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.*;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapred.TextOutputFormat;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.*;

import org.apache.hadoop.util.GenericOptionsParser;


public class XMLDriver {


    /** Bhavesh - for processing XML file using Hadoop MapReduce

     * @param args

     * @throws IOException

     */

    public static void main(String[] args) throws IOException {

        try {


            Configuration conf = new Configuration();

           

            String[] arg = new GenericOptionsParser(conf,args).getRemainingArgs();


            conf.set("START_TAG_KEY", "");

            conf.set("END_TAG_KEY", "");


            Job job = new Job(conf, "XML Processing");

            job.setJarByClass(XMLDriver.class);

            job.setMapperClass(MyMapper.class);


            job.setNumReduceTasks(0);


            job.setInputFormatClass(XMLInputFormat.class);

           


            job.setMapOutputKeyClass(Text.class);

            job.setMapOutputValueClass(LongWritable.class);


            job.setOutputKeyClass(Text.class);

            job.setOutputValueClass(LongWritable.class);


            FileInputFormat.addInputPath(job, new Path(args[0]));

            FileOutputFormat.setOutputPath(job, new Path(args[1]));


            job.waitForCompletion(true);


        } catch (Exception e)

        {

              throw new IOException(e);

          

        }

      


    }


}


XMLInputFormat.java


import java.io.IOException;

import java.util.List;

import org.apache.hadoop.fs.BlockLocation;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.DataOutputBuffer;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.JobContext;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;


public class XMLInputFormat extends TextInputFormat {

    public static final String START_TAG_KEY = "";

    public static final String END_TAG_KEY = "";


    /*Bhavesh - Creating XMLInputformat Class for reading XML File*/

    @Override

    public RecordReader createRecordReader(

            InputSplit split, TaskAttemptContext context) {

        return new XmlRecordReader();

    }


    public static class XmlRecordReader extends

            RecordReader {

        private byte[] startTag;

        private byte[] endTag;

        private long start;

        private long end;

        private FSDataInputStream fsin;

        private DataOutputBuffer buffer = new DataOutputBuffer();

        private LongWritable key = new LongWritable();

        private Text value = new Text();


        @Override

        public void initialize(InputSplit is, TaskAttemptContext tac)

                throws IOException, InterruptedException {

            FileSplit fileSplit = (FileSplit) is;

            String START_TAG_KEY = "";

            String END_TAG_KEY = "";

            startTag = START_TAG_KEY.getBytes("utf-8");

            endTag = END_TAG_KEY.getBytes("utf-8");


            start = fileSplit.getStart();

            end = start + fileSplit.getLength();

            Path file = fileSplit.getPath();


            FileSystem fs = file.getFileSystem(tac.getConfiguration());

            fsin = fs.open(fileSplit.getPath());

            fsin.seek(start);


        }


        @Override

        public boolean nextKeyValue() throws IOException, InterruptedException {

            if (fsin.getPos() < end) {

                if (readUntilMatch(startTag, false)) {

                    try {

                        buffer.write(startTag);

                        if (readUntilMatch(endTag, true)) {


                            value.set(buffer.getData(), 0, buffer.getLength());

                            key.set(fsin.getPos());

                            return true;

                        }

                    } finally {

                        buffer.reset();

                    }

                }

            }

            return false;

        }


        @Override

        public LongWritable getCurrentKey() throws IOException,

                InterruptedException {

            return key;

        }


        @Override

        public Text getCurrentValue() throws IOException, InterruptedException {

            return value;


        }


        @Override

        public float getProgress() throws IOException, InterruptedException {

            return (fsin.getPos() - start) / (float) (end - start);

        }


        @Override

        public void close() throws IOException {

            fsin.close();

        }


        private boolean readUntilMatch(byte[] match, boolean withinBlock)

                throws IOException {

            int i = 0;

            while (true) {

                int b = fsin.read();


                if (b == -1)

                    return false;


                if (withinBlock)

                    buffer.write(b);


                if (b == match[i]) {

                    i++;

                    if (i >= match.length)

                        return true;

                } else

                    i = 0;


                if (!withinBlock && i == 0 && fsin.getPos() >= end)

                    return false;

            }

        }


    }


}



PIG Script


bookmarkanalysis.pig

Input_File = LOAD '/BookMarkOutput/' using PigStorage(',') as

(book_id:chararray,author:chararray,Title:chararray,genre:chararray,price:float,publish_date:chararray,
description:chararray,review:chararray,rate:float,comments:chararray); 
orderedfile1  = order Input_File by genre;  

split orderedfile1 into computer_file if genre == 'Computer',

                       database_file if genre == 'Database';

store computer_file into '/BookMarkOutput/Type_Computer/';

Store database_file into '/BookMarkOutput/Type_Database/';

orderedfile2 = order Input_File by rate desc;

split orderedfile2 into rate5plus if rate >= 5 and rate <= 10,

                                                rate5minus if rate >= 1 and rate < 5;

store rate5plus into '/BookMarkOutput/Rating5+/';

store rate5minus into '/BookMarkOutput/Rating5-/';



Shell Script

#####################################################################

#############################  COMPLETE SCRIPT   ###################

### HEADER - PROGRAM NAME -                               ###

### AUTHOR - BHAVESH BHADRICHA                                                           ###

### DATE  - 12/NOV/2015                                                                                      ###

### VERSION - 1.0                                                                                                  ###

### DESCRIPTION - Data: It comprises of the information gathered from sites     ###

### which are bookmarking sites and allow you to bookmark, review, rate, on        ###

### specific topic. A bookmarking site allows you to bookmark, review, rate,         ###

### search various links on any topic. The data is in XML format and                     ###

### contains various links/posts URL, categories defining it and the ratings             ###

### linked with it.                                                                                                       ###

### Problem Statement: Analyse the data in Hadoop Eco-system to:                         ###

### 1. Fetch the data into Hadoop Distributed File System and analyze it                 ###

### with the help of MapReduce, Pig and Hive to find the top rated books              ###

### based on the user comments, likes etc.                                                                ###

### 2. Using MapReduce convert the semi-structured format (XML data) into        ###

### structured format and categorize the user rating as positive and                          ###

### negative for each of the thousand links.                                                              ###

### 3. Push the output HDFS and then feed it into PIG, which splits the                 ###

### data into two parts: Category data and Ratings data.                                           ###

### 4. Write a fancy Hive Query to analyze the data further and push the                 ###

### output is into relational database (RDBMS) using Sqoop.                                  ###

###############################################################################

###############################################################################

##################################

###DEFINING THE LOCAL VARIABLES###

##################################

DATE=$(date +"%Y%m%d_%H%M%S")

LOGFILE="/home/bhavesh/Bookmark_POC/LOG/"$DATE".log"

##################################################################################

############## Converting XML to Flatfile using Mapreduce ########################

##################################################################################

echo "Mapreduce Program starts here"


echo "Converting XML to Flatfile using Mapreduce" >> $LOGFILE


hadoop fs -rmr /BookMarkOutput


hadoop jar /home/bhavesh/Bookmark_POC/Mapreduce/XMLProcessing.jar XMLDriver /BookMarkInput/* /BookMarkOutput


if [ $? -eq 0 ]; then

    echo "Succesfully finished Mapreduce Processing " >> $LOGFILE

else

    echo "XMLProcessing MapReduce Failed Please check the Log " >> $LOGFILE

fi


########################## PIG Processing ########################################

#### PIG, which splits the data into two parts: Category data and Ratings data ###

##################################################################################


echo "Pig Script starts here"


echo "PIG Script,which splits the data into two parts: Category data and Ratings data" >> $LOGFILE


hadoop fs -rmr /BookMarkOutput/Type_Computer

hadoop fs -rmr /BookMarkOutput/Type_Database

hadoop fs -rmr /BookMarkOutput/Rating5+

hadoop fs -rmr /BookMarkOutput/Rating5-


pig /home/bhavesh/Bookmark_POC/PIG/bookmarkanalysis.pig


if [ $? -eq 0 ]; then

    echo "Succesfully finished PIG  Processing " >> $LOGFILE

else

    echo "PIG Processing Failed Please check the Log " >> $LOGFILE

fi



############################ HIVE Processing #######################################

###### HIVE will load the Category data and Rating Data into Hive Tables  ##########

####################################################################################


echo "HIVE Script starts here"


echo "HIVE which LOAD the data into two parts: Category data Tables and Ratings data Table " >> $LOGFILE


hive -e 'drop table if exists ComputerBooks';

hive -e 'drop table if exists DatabaseBooks';

hive -e 'drop table if exists Highest_Rating';

hive -e 'drop table if exists Lowest_Rating';


hive -e "create external table ComputerBooks

(Bookid string,

author string,

title string,

genre string,

price float,

publish_date string,

descriptions string,

review string,

rate float,

comments int)

row format delimited

fields terminated by','

lines terminated by '\n'

stored as textfile location '/BookMarkOutput/hive/Computerbooks'";


hive -e "create external table DatabaseBooks

(Bookid string,

author string,

title string,

genre string,

price float,

publish_date string,

descriptions string,

review string,

rate float,

comments int)

row format delimited

fields terminated by','

lines terminated by '\n'

stored as textfile location '/BookMarkOutput/hive/Databasebooks'";


hive -e "create external table Highest_Rating

(Bookid string,

author string,

title string,

genre string,

price float,

publish_date string,

descriptions string,

review string,

rate float,

comments int)

row format delimited

fields terminated by','

lines terminated by '\n'

stored as textfile location '/BookMarkOutput/hive/HighestRating'";


hive -e "create external table Lowest_Rating

(Bookid string,

author string,

title string,

genre string,

price float,

publish_date string,

descriptions string,

review string,

rate float,

comments int)

row format delimited

fields terminated by','

lines terminated by '\n'

stored as textfile location '/BookMarkOutput/hive/LowestRating'";


hive -e "load data inpath '/BookMarkOutput/Type_Computer/part-r-00000' overwrite into table ComputerBooks";

hive -e "load data inpath '/BookMarkOutput/Type_Database/part-r-00000' overwrite into table DatabaseBooks";

hive -e "load data inpath '/BookMarkOutput/Rating5+/part-r-00000' overwrite into table Highest_Rating";

hive -e "load data inpath '/BookMarkOutput/Rating5-/part-r-00000' overwrite into table Lowest_Rating";


############################ SQOOP Processing #######################################

###### Pushing the HIVE Tale data into RDBMS Tables via SQOOP #######################

#####################################################################################


sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table ComputerBooks --export-dir /BookMarkOutput/hive/Computerbooks/part-r-00000 --input-fields-terminated-by '\t';


sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table Databasebooks --export-dir /BookMarkOutput/hive/Databasebooks/part-r-00000 --input-fields-terminated-by '\t';


sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table HighestRating --export-dir /BookMarkOutput/hive/HighestRating/part-r-00000 --input-fields-terminated-by '\t';


sqoop export --connect jdbc:mysql://localhost/mysql --username root --password root --table LowestRating --export-dir /BookMarkOutput/hive/LowestRating/part-r-00000 --input-fields-terminated-by '\t';


####################################################################################


Execution of Shell Script


MapReduce Output (XML Converted to Comma Separated file)


PIG Script Execution


Pig script generate 4 Output files



HIVE Execution



HIVE Output






Sqoop Execution


Sqoop Output in RDBMS Tables