DATA INPUT FORMAT :- .xls (My Input Data is
in excel 97-2003 Format)
100001 BSE SIP NEW_YORK 23486 24876 28
1)
Take the complete EXCEL Input data on HDFS.
2)
Develop a Map
Reduce Use Case to get the below filtered results from the HDFS Input data (.xls EXCEL data).
2.1)
If TYPEOFTRADING
is -->'SIP'.
2.1.1)
If OPEN_BALANCE
> 25000 & FLTUATION_RATE > 10 --> store
"HighDemandMarket".
2.1.2)
If CLOSING_BALANCE<22000
& FLTUATION_RATE IN BETWEEN 20 - 30
--> store "OnGoingMarketStretegy".
2.2)
If TYPEOFTRADING
is -->'SHORTTERM’.
2.2.1)
If OPEN_BALANCE < 5000 --> store
"WealthyProducts".
2.2.2)
If SensexLoc
--> "NewYork OR California"
--> “ReliableProducts”
2.3)
store in
"OtherProducts".
NOTE: In the mentioned file names, only
5 outputs have to be generated
Example: “WealthyProducts-r-00000”
3)
Develop a PIG
Script to filter the Map Reduce Output in the below fashion.
3.1)
Provide the Unique
data
3.2)
Sort the Unique
data based on Patient_ID.
4.)
EXPORT the same
PIG Output from HDFS to MySQL using SQOOP.
5.)
Store the same PIG
Output in a HIVE External Table
NOTE:- For this POC I have used custom input format to read EXCEL files using external jar. So the corresponding jar files to be added during coding and to the lib directory of hadoop for successful execution. You can use poi-xml jar for the reading .xlsx file (2010 onwards excel format ).
Below is the steps to make it work...
1. Download and Install ant from below link.
http://muug.ca/mirror/apache-dist//ant/binaries/apache-ant-1.9.8-bin.tar.gz
2. To install give following command in terminal:
tar -xzvf
3. Update bashrc:-
nano ~/.bashrc
Add below two lines:-
export ANT_HOME=${ant_dir}
export PATH=${ANT_HOME}/bin
Now Source bashrc by command:
source ~/.bashrc
4. Then restart the system. (Very Important for the effect to take place)
5. Download the required Jar files from below link:
Place both jar files during Eclipse compilation and only SNAPSHOT.jar in hadoop lib directory.
6. If still not working try to add CLASSPATH:
export CLASSPATH=.:$CLASSPATH:
:
Hope it will work now.
MAP
REDUCE PROCESS IN DETAILS:-
1.
TO TAKE XLS INPUT DATA ON HDFS
hadoop
fs -mkdir /Pocinput
hadoop
fs -put SENSEX.xls /Pocinput
jar
xvf POC.jar
EXCEL INPUT DRIVER
(MAIN DRIVER CLASS)
import
java.io.IOException;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import
org.apache.hadoop.util.GenericOptionsParser;
static public int count = 0;
public static void main(String[]
args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new
Configuration();
GenericOptionsParser
parser = new GenericOptionsParser(conf, args);
args =
parser.getRemainingArgs();
Job job = new Job(conf,
"Sensex_Log");
job.setJarByClass(PocDriver.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(ExcelInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
LazyOutputFormat.setOutputFormatClass(job,
TextOutputFormat.class);
FileInputFormat.addInputPath(job,
new Path(args[0]));
FileOutputFormat.setOutputPath(job,
new Path(args[1]));
job.setMapperClass(PocMapper.class);
job.setReducerClass(PocReducer.class);
MultipleOutputs.addNamedOutput(job,
"HighDemandMarket", TextOutputFormat.class, IntWritable.class,
Text.class);
MultipleOutputs.addNamedOutput(job,
"OnGoingMarketStretegy", TextOutputFormat.class, IntWritable.class,Text.class);
MultipleOutputs.addNamedOutput(job,
"WealthyProducts", TextOutputFormat.class, IntWritable.class,
Text.class);
MultipleOutputs.addNamedOutput(job,
"ReliableProducts", TextOutputFormat.class, IntWritable.class,
Text.class);
MultipleOutputs.addNamedOutput(job,
"OtherProducts", TextOutputFormat.class, IntWritable.class,
Text.class);
System.exit(job.waitForCompletion(true)
? 0 : 1);
EXCEL INPUT FORMAT
(CUSTOM INPUT
FORMAT TO READ EXCEL FILES)
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ExcelInputFormat extends
FileInputFormat {
public
RecordReader createRecordReader(InputSplit split,
TaskAttemptContext context)
throws
IOException, InterruptedException {
return
new ExcelRecordReader();
EXCEL RECORD READER
(TO READ EXCEL FILE
AND SEND AS KEY, VALUE FORMAT)
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import com.sreejithpillai.excel.parser.ExcelParser;
public class ExcelRecordReader extends
RecordReader {
private
LongWritable key;
private
String[] strArrayofLines;
public
void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws
IOException, InterruptedException {
FileSplit
split = (FileSplit) genericSplit;
Configuration
job = context.getConfiguration();
final
Path file = split.getPath();
FileSystem
fs = file.getFileSystem(job);
FSDataInputStream
fileIn = fs.open(split.getPath());
String
line = new ExcelParser().parseExcelData(is);
this.strArrayofLines
= line.split("\n");
public
boolean nextKeyValue() throws IOException, InterruptedException {
key
= new LongWritable(0);
value
= new Text(strArrayofLines[0]);
if
(key.get() < (this.strArrayofLines.length - 1)) {
long
pos = (int) key.get();
value.set(this.strArrayofLines[(int)
(pos + 1)]);
if
(key == null || value == null) {
public
LongWritable getCurrentKey() throws IOException, InterruptedException {
public
Text getCurrentValue() throws IOException, InterruptedException {
public
float getProgress() throws IOException, InterruptedException {
public
void close() throws IOException {
EXCEL PARSER
(TO PARSE EXCEL SHEET)
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import
org.apache.commons.logging.LogFactory;
import
org.apache.poi.hssf.usermodel.HSSFSheet;
import
org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
public
class ExcelParser {
private static final Log LOG =
LogFactory.getLog(ExcelParser.class);
private StringBuilder currentString
= null;
private long bytesRead = 0;
public String
parseExcelData(InputStream is) {
HSSFWorkbook
workbook = new HSSFWorkbook(is);
HSSFSheet
sheet = workbook.getSheetAt(0);
Iterator
rowIterator = sheet.iterator();
currentString
= new StringBuilder();
while
(rowIterator.hasNext()) {
Row
row = rowIterator.next();
Iterator
cellIterator = row.cellIterator(); |
while
(cellIterator.hasNext()) {
Cell
cell = cellIterator.next();
switch
(cell.getCellType()) {
case
Cell.CELL_TYPE_BOOLEAN:
currentString.append(cell.getBooleanCellValue()
+ "\t");
case
Cell.CELL_TYPE_NUMERIC:
currentString.append(cell.getNumericCellValue()
+ "\t");
case
Cell.CELL_TYPE_STRING:
currentString.append(cell.getStringCellValue()
+ "\t");
currentString.append("\n");
} catch (IOException e)
{
LOG.error("IO
Exception : File not found " + e);
return
currentString.toString();
public long getBytesRead() {
EXCEL MAPPER
(HAVING MAPPER LOGIC)
import
java.io.IOException;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public
class PocMapper extends Mapper {
public void map(LongWritable key,
Text value, Context context) throws IOException, InterruptedException {
if
(value.toString().contains("SENSEX_NAME") &&
value.toString().contains("TYPE_OF_TRADING"))
String[]
str = value.toString().split(" ");
for
(int i = 0; i < str.length; i++) {
if
(str[i] != null || str[i] != " ") {
String
dr1 = data.trim().replaceAll("\\s+", "\t");
String[]
str1 = dr1.toString().split("\t");
int
id = (int)Double.parseDouble(str1[0]);
int
flucrate = (int)Double.parseDouble(str1[6]);
int
openbal = (int)Double.parseDouble(str1[4]);
int
closebal = (int)Double.parseDouble(str1[5]);
String
dr = Integer.toString(id)+"\t"+str1[1]+"\t"+str1[2]+"\t"+str1[3]+"\t"+Integer.toString(openbal)+"\t"+Integer.toString(closebal)+"\t"+Integer.toString(flucrate);
context.write(new
Text(""), new Text(dr));
EXCEL REDUCER
(HAVING REDUCER LOGIC)
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class PocReducer extends Reducer {
public
void setup(Context context) {
mos
= new MultipleOutputs(context);
public
void reduce(Text k1, Iterable k2, Context context) throws
IOException, InterruptedException {
while
(k2.iterator().hasNext()) {
String
sr = k2.iterator().next().toString();
String
sr1 = sr.trim().replaceAll("\\s+", "\t");
String[]
str1 = sr1.split("\t");
int
id = (int)Double.parseDouble(str1[0]);
String
s1 = "CALIFORNIA";
int
flucrate = (int)Double.parseDouble(str1[6]);
int
openbal = (int)Double.parseDouble(str1[4]);
int
closebal = (int)Double.parseDouble(str1[5]);
String
dr =
Integer.toString(id)+"\t"+str1[1]+"\t"+str1[2]+"\t"+str1[3]+"\t"+Integer.toString(openbal)+"\t"+Integer.toString(closebal)+"\t"+Integer.toString(flucrate);
if
(str1[2].equalsIgnoreCase("SIP")) {
if
(openbal > 25000 && flucrate > 10) {
mos.write("HighDemandMarket",
null, new Text(dr), "/SensexLog/HighDemandMarket");
}
else if (closebal < 22000) {
mos.write("OnGoingMarketStretegy",
null, new Text(dr), "/SensexLog/OnGoingMarketStretegy");
mos.write("OtherProducts",
null, new Text(dr), "/SensexLog/OtherProducts");
}
else if (str1[2].equalsIgnoreCase("SHORTTERM")) {
mos.write("WealthyProducts",
null, new Text(dr), "/SensexLog/WealthyProducts");
}
else if (str1[3].toLowerCase().contains(s.toLowerCase()) ||
str1[3].toLowerCase().contains(s1.toLowerCase())) {
mos.write("ReliableProducts",
null, new Text(dr), "/SensexLog/ReliableProducts");
mos.write("OtherProducts",
null, new Text(dr), "/SensexLog/OtherProducts");
mos.write("OtherProducts",
null, new Text(dr), "/SensexLog/OtherProducts");
protected
void cleanup(Context context) throws IOException, InterruptedException {
EXECUTING
THE MAP REDUCE CODE
hadoop jar SensexLog.jar com.poc.PocDriver /Input/SENSEX.xls /Sensex
Goto Firefox and open name node
page by following command:
http://localhost:50070 and browse the file system , then
click on HealthCarePOC directory to check the files created.
To execute Pig Script file from any directory in terminal type the following command:-
1. For LFS file:
pig -x local
2. For HDFS files:
pig
In this case since our data is in HDFS the follwing command should be used:
pig PigScript.pig
A = LOAD '/
SensexLog /' USING PigStorage
('\t') AS (id:int, Name:chararray, age:int, gender:chararray, disease:chararray,
hospital:chararray, admitdate:chararray, address:chararray);
A = LOAD '/ SensexPOC /' USING PigStorage ('\t') AS
(id:int, Name:chararray, trade:chararray, openbal:int, closebal:int,
flucrate:int);
STORE C INTO '/SensexPOC';
4.
EXPORT the PIG Output from HDFS to MySQL using SQOOP
sqoop eval --connect jdbc:mysql://localhost/
--username root --password root --query "create database if not exists
SENSEX;";
sqoop eval --connect jdbc:mysql://localhost/
--username root --password root --query "use SENSEX;";
sqoop eval --connect jdbc:mysql://localhost/SENSEX --username
root --password root --query "grant all privileges on HEALTHCARE.* to ‘localhost’@’%’;”;
sqoop eval --connect jdbc:mysql://localhost/SENSEX --username
root --password root --query "grant all privileges on HEALTHCARE.* to ‘’@’localhost’;”;
sqoop eval --connect jdbc:mysql://localhost/SENSEX
--username root --password root --query "create table sensexlogpoc(id int, name varchar(50), trade varchar(50), loc varchar(200), openbal
int, closebal int, flucrate int);";
sqoop export --connect jdbc:mysql://localhost/SENSEX
--table sensexlogpoc --export-dir /SensexPOC --fields-terminated-by '\t';
5.
STORE THE PIG OUTPUT IN A HIVE EXTERNAL TABLE
goto hive shell using command:
create database SensexPOC;
create external table sensexpoc(id int, Name string, trading
string, loc string, openbal int, closebal int, flucrate int)
fields terminated by '\t'
stored as textfile location '/SensexPOC';
select * from sensexlogpoc;
Hope you all understood the procedures...
Please do notify me for any corrections...
Kindly leave a comment for any queries/clarification...
(Detailed Description of each phase to be added soon).
ALL D BEST...