HADOOP (PROOF OF CONCEPT) SENSEXLOG EXCEL DATA BY MAHESH CHANDRA MAHARANA

INDUSTRY: SENSEX LOG DATA INPUT FORMAT :- .xls (My Input Data is in excel 97-2003 Format) Kindly check my blog to read any kind of Excel sheet and use the Excel Input format, record reader and excel parser given in that blog. Please find link to my blog below: https://hadoop-poc-mahesh.blogspot.in/2017/01/hadoop-excel-input-format-to-read-any.html Like this below created 3000 records on my own:- ATTRIBUTES are like:-       1)      SENSEX_ID        2)      SENSEX_NAME             3)      TYPE_OF_TRADING                4)      SENSEX_LOCATION       5)      OPENING_BAL       6)      CLOSING_BAL       7)      FLUCTUATION_RATE EXAMPLE: 100001            BSE     SIP      NEW_YORK             23486              24876              28 DOWNLOAD MY INPUT FILE FROM BELOW LINK: https://drive.google.com/file/d/0BzYUKIo7aWL_cHN3Q2lIVHB0NTg/view?usp=sharing PROBLEM STATEMENT: -      1)      Take the complete EXCEL Input data on HDFS.      2)      Develop a Map Reduce Use Case to get the below filtered results from the HDFS Input data (.xls EXCEL data). 2.1)            If TYPEOFTRADING is -->'SIP'. 2.1.1)      If OPEN_BALANCE > 25000 & FLTUATION_RATE > 10 --> store "HighDemandMarket". 2.1.2)      If CLOSING_BALANCE store "OnGoingMarketStretegy". 2.2)            If TYPEOFTRADING is -->'SHORTTERM’. 2.2.1)       If OPEN_BALANCE < 5000 --> store "WealthyProducts". 2.2.2)      If SensexLoc --> "NewYork OR California"  --> “ReliableProducts”     ELSE 2.3)            store in "OtherProducts".   NOTE: In the mentioned file names, only 5 outputs have to be generated Example: “WealthyProducts-r-00000”      3)      Develop a PIG Script to filter the Map Reduce Output in the below fashion. 3.1)            Provide the Unique data 3.2)            Sort the Unique data based on Patient_ID.      4.)    EXPORT the same PIG Output from HDFS to MySQL using SQOOP.      5.)    Store the same PIG Output in a HIVE External Table NOTE:- For this POC I have used custom input format to read EXCEL files using external jar. So the corresponding jar files to be added during coding and to the lib directory of hadoop for successful execution. You can use poi-xml jar for the reading .xlsx file (2010 onwards excel format ). Below is the steps to make it work...  1. Download and Install ant from below link. http://muug.ca/mirror/apache-dist//ant/binaries/apache-ant-1.9.8-bin.tar.gz 2. To install give following command in terminal: tar -xzvf 3. Update bashrc:- nano ~/.bashrc Add below two lines:- export ANT_HOME=${ant_dir} export PATH=${ANT_HOME}/bin Now Source bashrc by command: source ~/.bashrc 4. Then restart the system. (Very Important for the effect to take place) 5. Download the required Jar files from below link: https://github.com/sreejithpillai/ExcelRecordReaderMapReduce/blob/master/target/ExcelRecordReaderMapReduce-0.0.1-SNAPSHOT-jar-with-dependencies.jar https://github.com/sreejithpillai/ExcelRecordReaderMapReduce/blob/master/target/ExcelRecordReaderMapReduce-0.0.1-SNAPSHOT.jar Place both jar files during Eclipse compilation and only SNAPSHOT.jar in hadoop lib directory. 6. If still not working try to add CLASSPATH: export CLASSPATH=.:$CLASSPATH:: Hope it will work now. POC Processing Details MAP REDUCE PROCESS IN DETAILS:- 1.     TO TAKE XLS INPUT DATA ON HDFS hadoop fs -mkdir /Pocinput hadoop fs -put SENSEX.xls /Pocinput jar xvf POC.jar 2.     MAP REDUCE CODES:- EXCEL INPUT DRIVER  (MAIN DRIVER CLASS) package com.poc; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class PocDriver {             static public int count = 0;             public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {                         Configuration conf = new Configuration();                         GenericOptionsParser parser = new GenericOptionsParser(conf, args);                         args = parser.getRemainingArgs();                         Job job = new Job(conf, "Sensex_Log");                         job.setJarByClass(PocDriver.class);                         job.setOutputKeyClass(Text.class);                         job.setOutputValueClass(Text.class);                         job.setInputFormatClass(ExcelInputFormat.class);                         job.setOutputFormatClass(TextOutp

HADOOP (PROOF OF CONCEPT) SENSEXLOG EXCEL DATA BY MAHESH CHANDRA MAHARANA

INDUSTRY: SENSEX LOG


DATA INPUT FORMAT :- .xls (My Input Data is in excel 97-2003 Format)


Kindly check my blog to read any kind of Excel sheet and use the Excel Input format, record reader and excel parser given in that blog. Please find link to my blog below:

https://hadoop-poc-mahesh.blogspot.in/2017/01/hadoop-excel-input-format-to-read-any.html

Like this below created 3000 records on my own:-


ATTRIBUTES are like:-


      1)      SENSEX_ID 

      2)      SENSEX_NAME      

      3)      TYPE_OF_TRADING         

      4)      SENSEX_LOCATION

      5)      OPENING_BAL

      6)      CLOSING_BAL

      7)      FLUCTUATION_RATE


EXAMPLE:



100001            BSE     SIP      NEW_YORK             23486              24876              28




PROBLEM STATEMENT: -


     1)      Take the complete EXCEL Input data on HDFS.


     2)      Develop a Map Reduce Use Case to get the below filtered results from the HDFS Input data (.xls EXCEL data).



2.1)            If TYPEOFTRADING is -->'SIP'.



2.1.1)      If OPEN_BALANCE > 25000 & FLTUATION_RATE > 10 --> store "HighDemandMarket".



2.1.2)      If CLOSING_BALANCE<22000 & FLTUATION_RATE IN BETWEEN 20 - 30  --> store "OnGoingMarketStretegy".



2.2)            If TYPEOFTRADING is -->'SHORTTERM’.



2.2.1)       If OPEN_BALANCE < 5000 --> store "WealthyProducts".



2.2.2)      If SensexLoc --> "NewYork OR California"  --> “ReliableProducts”

    ELSE



2.3)            store in "OtherProducts".


  NOTE: In the mentioned file names, only 5 outputs have to be generated

Example: “WealthyProducts-r-00000”

     3)      Develop a PIG Script to filter the Map Reduce Output in the below fashion.


3.1)            Provide the Unique data

3.2)            Sort the Unique data based on Patient_ID.


     4.)    EXPORT the same PIG Output from HDFS to MySQL using SQOOP.


     5.)    Store the same PIG Output in a HIVE External Table


NOTE:- For this POC I have used custom input format to read EXCEL files using external jar. So the corresponding jar files to be added during coding and to the lib directory of hadoop for successful execution. You can use poi-xml jar for the reading .xlsx file (2010 onwards excel format ).

Below is the steps to make it work... 

1. Download and Install ant from below link.

http://muug.ca/mirror/apache-dist//ant/binaries/apache-ant-1.9.8-bin.tar.gz


2. To install give following command in terminal:

tar -xzvf

3. Update bashrc:-

nano ~/.bashrc

Add below two lines:-

export ANT_HOME=${ant_dir}

export PATH=${ANT_HOME}/bin

Now Source bashrc by command:
source ~/.bashrc

4. Then restart the system. (Very Important for the effect to take place)

5. Download the required Jar files from below link:



Place both jar files during Eclipse compilation and only SNAPSHOT.jar in hadoop lib directory.
6. If still not working try to add CLASSPATH:
export CLASSPATH=.:$CLASSPATH::

Hope it will work now.

POC Processing Details



MAP REDUCE PROCESS IN DETAILS:-




1.     TO TAKE XLS INPUT DATA ON HDFS


hadoop fs -mkdir /Pocinput

hadoop fs -put SENSEX.xls /Pocinput

jar xvf POC.jar


2.     MAP REDUCE CODES:-


EXCEL INPUT DRIVER 
(MAIN DRIVER CLASS)

package com.poc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;


public class PocDriver {

            static public int count = 0;

            public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

                        Configuration conf = new Configuration();

                        GenericOptionsParser parser = new GenericOptionsParser(conf, args);

                        args = parser.getRemainingArgs();

                        Job job = new Job(conf, "Sensex_Log");

                        job.setJarByClass(PocDriver.class);

                        job.setOutputKeyClass(Text.class);

                        job.setOutputValueClass(Text.class);

                        job.setInputFormatClass(ExcelInputFormat.class);

                        job.setOutputFormatClass(TextOutputFormat.class);

                        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

                        FileInputFormat.addInputPath(job, new Path(args[0]));

                        FileOutputFormat.setOutputPath(job, new Path(args[1]));

                        job.setMapperClass(PocMapper.class);

                        job.setReducerClass(PocReducer.class);

                        MultipleOutputs.addNamedOutput(job, "HighDemandMarket", TextOutputFormat.class, IntWritable.class, Text.class);

                        MultipleOutputs.addNamedOutput(job, "OnGoingMarketStretegy", TextOutputFormat.class, IntWritable.class,Text.class);

                        MultipleOutputs.addNamedOutput(job, "WealthyProducts", TextOutputFormat.class, IntWritable.class, Text.class);

                        MultipleOutputs.addNamedOutput(job, "ReliableProducts", TextOutputFormat.class, IntWritable.class, Text.class);

                        MultipleOutputs.addNamedOutput(job, "OtherProducts", TextOutputFormat.class, IntWritable.class, Text.class);

                        System.exit(job.waitForCompletion(true) ? 0 : 1);

            }

}


EXCEL INPUT FORMAT 
(CUSTOM INPUT FORMAT TO READ EXCEL FILES)



package com.poc;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


public class ExcelInputFormat extends FileInputFormat {

            @Override

            public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)

                                    throws IOException, InterruptedException {

                        return new ExcelRecordReader();

            }

}



EXCEL RECORD READER 
(TO READ EXCEL FILE AND SEND AS KEY, VALUE FORMAT)

package com.poc;

import java.io.IOException;

import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import com.sreejithpillai.excel.parser.ExcelParser;


public class ExcelRecordReader extends RecordReader {

            private LongWritable key;

            private Text value;

            private InputStream is;

            private String[] strArrayofLines;

            @Override

            public void initialize(InputSplit genericSplit, TaskAttemptContext context)

                                    throws IOException, InterruptedException {

                        FileSplit split = (FileSplit) genericSplit;

                        Configuration job = context.getConfiguration();

                        final Path file = split.getPath();

                        FileSystem fs = file.getFileSystem(job);

                        FSDataInputStream fileIn = fs.open(split.getPath());

                        is = fileIn;

                        String line = new ExcelParser().parseExcelData(is);

                        this.strArrayofLines = line.split("\n");

            }

            @Override

            public boolean nextKeyValue() throws IOException, InterruptedException {

                        if (key == null) {

                                    key = new LongWritable(0);

                                    value = new Text(strArrayofLines[0]);

                        } else {

                                    if (key.get() < (this.strArrayofLines.length - 1)) {

                                                long pos = (int) key.get();

                                                key.set(pos + 1);

                                                value.set(this.strArrayofLines[(int) (pos + 1)]);

                                                pos++;

                                    } else {

                                                return false;

                                    }

                        }

                        if (key == null || value == null) {

                                    return false;

                        } else {

                                    return true;

                        }

            }

            @Override

            public LongWritable getCurrentKey() throws IOException, InterruptedException {

                        return key;

            }

            @Override

            public Text getCurrentValue() throws IOException, InterruptedException {

                        return value;

            }

            @Override

            public float getProgress() throws IOException, InterruptedException {

                        return 0;

            }

            @Override

            public void close() throws IOException {

                        if (is != null) {

                                    is.close();

                        }

            }

}



EXCEL PARSER 
(TO PARSE EXCEL SHEET)



package com.poc;

import java.io.IOException;

import java.io.InputStream;

import java.util.Iterator;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.poi.hssf.usermodel.HSSFSheet;

import org.apache.poi.hssf.usermodel.HSSFWorkbook;

import org.apache.poi.ss.usermodel.Cell;

import org.apache.poi.ss.usermodel.Row;


public class ExcelParser {

            private static final Log LOG = LogFactory.getLog(ExcelParser.class);

            private StringBuilder currentString = null;

            private long bytesRead = 0;

            public String parseExcelData(InputStream is) {

                        try {

                                    HSSFWorkbook workbook = new HSSFWorkbook(is);

                                    HSSFSheet sheet = workbook.getSheetAt(0);

                                    Iterator rowIterator = sheet.iterator();

                                    currentString = new StringBuilder();

                                    while (rowIterator.hasNext()) {

                                                Row row = rowIterator.next();

                                                Iterator cellIterator = row.cellIterator();

                                                while (cellIterator.hasNext()) {

                                                            Cell cell = cellIterator.next();

                                                            switch (cell.getCellType()) {

                                                            case Cell.CELL_TYPE_BOOLEAN:

                                                                        bytesRead++;

                                                                        currentString.append(cell.getBooleanCellValue() + "\t");

                                                                        break;

                                                case Cell.CELL_TYPE_NUMERIC:

                                                                        bytesRead++;

                                                                        currentString.append(cell.getNumericCellValue() + "\t");

                                                                        break;

                                                            case Cell.CELL_TYPE_STRING:

                                                                        bytesRead++;

                                                                        currentString.append(cell.getStringCellValue() + "\t");

                                                                        break;

                                                            }

                                                }

                                                currentString.append("\n");

                                    }

                                    is.close();

                        } catch (IOException e) {

                                    LOG.error("IO Exception : File not found " + e);

                        }

                        return currentString.toString();

            }

            public long getBytesRead() {

                        return bytesRead;

            }

}


EXCEL MAPPER
(HAVING MAPPER LOGIC)


package com.poc;


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;


public class PocMapper extends Mapper {

            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

                        try {

                                    if (value.toString().contains("SENSEX_NAME") && value.toString().contains("TYPE_OF_TRADING"))

                                                return;

                                    else {

                                                String[] str = value.toString().split(" ");

                                                String data = "";

                                                for (int i = 0; i < str.length; i++) {

                                                            if (str[i] != null || str[i] != " ") {

                                                                        data += (str[i] + " ");

                                                            }

                                                }

                                                String dr1 = data.trim().replaceAll("\\s+", "\t");

                                                String[] str1 = dr1.toString().split("\t");

                                                int id = (int)Double.parseDouble(str1[0]);

                                                int flucrate = (int)Double.parseDouble(str1[6]);

                                                int openbal = (int)Double.parseDouble(str1[4]);

                                                int closebal = (int)Double.parseDouble(str1[5]);

                                                String dr = Integer.toString(id)+"\t"+str1[1]+"\t"+str1[2]+"\t"+str1[3]+"\t"+Integer.toString(openbal)+"\t"+Integer.toString(closebal)+"\t"+Integer.toString(flucrate);

                                                context.write(new Text(""), new Text(dr));

                                    }

                        } catch (Exception e) {

                                    e.printStackTrace();

                        }

            }

}


EXCEL REDUCER
(HAVING REDUCER LOGIC)

package com.poc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;


public class PocReducer extends Reducer {

            MultipleOutputs mos;

            @Override

            public void setup(Context context) {

                        mos = new MultipleOutputs(context);

            }

            @Override

            public void reduce(Text k1, Iterable k2, Context context) throws IOException, InterruptedException {

                        while (k2.iterator().hasNext()) {

                                    String sr = k2.iterator().next().toString();

                                    String sr1 = sr.trim().replaceAll("\\s+", "\t");

                                    String[] str1 = sr1.split("\t");

                                    int id = (int)Double.parseDouble(str1[0]);

                                    String s = "NEW_YORK";

                                    String s1 = "CALIFORNIA";

                                    int flucrate = (int)Double.parseDouble(str1[6]);

                                    int openbal = (int)Double.parseDouble(str1[4]);

                                    int closebal = (int)Double.parseDouble(str1[5]);

                                    String dr = Integer.toString(id)+"\t"+str1[1]+"\t"+str1[2]+"\t"+str1[3]+"\t"+Integer.toString(openbal)+"\t"+Integer.toString(closebal)+"\t"+Integer.toString(flucrate);

                                    if (str1[2].equalsIgnoreCase("SIP")) {

                                                if (openbal > 25000 && flucrate > 10) {

                                                            mos.write("HighDemandMarket", null, new Text(dr), "/SensexLog/HighDemandMarket");

                                                } else if (closebal < 22000) {

                                                            mos.write("OnGoingMarketStretegy", null, new Text(dr), "/SensexLog/OnGoingMarketStretegy");

                                                } else {

                                                            mos.write("OtherProducts", null, new Text(dr), "/SensexLog/OtherProducts");

                                                }

                                    } else if (str1[2].equalsIgnoreCase("SHORTTERM")) {

                                                if (openbal > 5000) {

                                                            mos.write("WealthyProducts", null, new Text(dr), "/SensexLog/WealthyProducts");

                                                } else if (str1[3].toLowerCase().contains(s.toLowerCase()) || str1[3].toLowerCase().contains(s1.toLowerCase())) {

                                                            mos.write("ReliableProducts", null, new Text(dr), "/SensexLog/ReliableProducts");

                                                } else {

                                                            mos.write("OtherProducts", null, new Text(dr), "/SensexLog/OtherProducts");

                                                }

                                    } else {

                                                mos.write("OtherProducts", null, new Text(dr), "/SensexLog/OtherProducts");

                                    }

                        }

            }

            @Override

            protected void cleanup(Context context) throws IOException, InterruptedException {

                        mos.close();

            }

}


EXECUTING THE MAP REDUCE CODE


hadoop jar SensexLog.jar com.poc.PocDriver /Input/SENSEX.xls /Sensex



Goto Firefox and open name node page by following command:


http://localhost:50070 and browse the file system , then click on HealthCarePOC directory to check the files created.






3.     PIG SCRIPT



To execute Pig Script file from any directory in terminal type the following command:-
1. For LFS file:
pig -x local

2. For HDFS files:
pig

In this case since our data is in HDFS the follwing command should be used:


pig PigScript.pig

PigScript1.pig


A = LOAD '/ SensexLog /' USING PigStorage ('\t') AS (id:int, Name:chararray, age:int, gender:chararray, disease:chararray, hospital:chararray, admitdate:chararray, address:chararray);

B = DISTINCT A;

DUMP B; 




PigScript2.pig


A = LOAD '/ SensexPOC /' USING PigStorage ('\t') AS (id:int, Name:chararray, trade:chararray, openbal:int, closebal:int, flucrate:int);

B = DISTINCT A;

C = ORDER B by Sid;


STORE C INTO '/SensexPOC';







4.     EXPORT the PIG Output from HDFS to MySQL using SQOOP


sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "create database if not exists SENSEX;";



sqoop eval --connect jdbc:mysql://localhost/ --username root --password root --query "use SENSEX;";


sqoop eval --connect jdbc:mysql://localhost/SENSEX --username root --password root --query "grant all privileges on HEALTHCARE.* to ‘localhost’@’%’;”;


sqoop eval --connect jdbc:mysql://localhost/SENSEX --username root --password root --query "grant all privileges on HEALTHCARE.* to ‘’@’localhost’;”;




sqoop eval --connect jdbc:mysql://localhost/SENSEX --username root --password root --query "create table sensexlogpoc(id int, name varchar(50), trade varchar(50), loc varchar(200), openbal int, closebal int, flucrate int);";



sqoop export --connect jdbc:mysql://localhost/SENSEX --table sensexlogpoc --export-dir /SensexPOC --fields-terminated-by '\t';





5.     STORE THE PIG OUTPUT IN A HIVE EXTERNAL TABLE


goto hive shell using command:

hive


show databases;


create database SensexPOC;


use SensexPOC;



create external table sensexpoc(id int, Name string, trading string, loc string, openbal int, closebal int, flucrate int)

row format delimited

fields terminated by '\t'

stored as textfile location '/SensexPOC';



select * from sensexlogpoc;



Hope you all understood the procedures... 
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...