POC #: Sensex Log Data Processing (PDF File Processing in Map Reduce)
POC #: Sensex Log Data Processing (PDF File Processing in Map Reduce)
Industry: Financial
Data:
Input Format - .PDF (Our Input Data is in PDF Format)
Like this below created 3000 records on my own
Input Dataset with attributes like:
Column
Description
1
SENSEXID
2
SENSEXNAME
3
TYPEOFTRADING
4
SENSEXLOC
5
OPEN_BALANCE
6
CLOSING_BAL
7
FLTUATION_RATE
Problem Statement: Analyse the data in
Hadoop Eco-system to:
1.
Take the complete
PDF Input data on HDFS
2.
Develop a Map
Reduce Use Case to get the below filtered results from the HDFS Input
data(Excel data)
If TYPE OF TRADING is -->'SIP'
- OPEN_BALANCE > 25000 &
FLTUATION_RATE > 10 --> store "HighDemandMarket"
-CLOSING_BALANCE
store "OnGoingMarketStretegy"
If TYPE OF TRADING is -->'SHORTTERM
- OPEN_BALANCE < 5000 --> store
"WealthyProducts"
- SensexLoc --> "NewYork OR
Mumbai" --> “ReliableProducts
else
store in "OtherProducts"
NOTE: In the mentioned file names only 5
outputs have to be generated
3. Develop a PIG Script to filter the Map
Reduce Output in the below fashion
- Provide the Unique data
- Sort the Unique data based on
SensexID.
4. EXPORT the same PIG Output from HDFS to
MySQL using SQOOP
5. Store the same PIG Output in a HIVE
External Table
POC Coding Details
MapReduce Code
PdfInputDriver.java
package com.bhavesh.poc.sensex;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import
org.apache.hadoop.util.GenericOptionsParser;
public class PdfInputDriver {
public
static void main(String[] args) throws IOException,
InterruptedException,
ClassNotFoundException {
Configuration
conf = new Configuration();
//GenericOptionsParser
is a utility to parse command line arguments generic to the Hadoop framework
GenericOptionsParser
parser = new GenericOptionsParser(conf, args);
//Returns
an array of Strings containing only application-specific arguments
args
= parser.getRemainingArgs();
Job
job = new Job(conf, "PdfSensexDetails");
job.setJarByClass(PdfInputDriver.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//
Custom InputFormat class
job.setInputFormatClass(PdfInputFormat.class);
//job.setOutputFormatClass(TextOutputFormat.class);
LazyOutputFormat.setOutputFormatClass(job,
TextOutputFormat.class);
MultipleOutputs.addNamedOutput(job,
"text", TextOutputFormat.class,IntWritable.class, Text.class);
FileInputFormat.setInputPaths(job,
new Path(args[0]));
FileOutputFormat.setOutputPath(job,
new Path(args[1]));
job.setMapperClass(SensexTradeMapper.class);
job.setReducerClass(SensexTradeReducer.class);
System.out.println(job.waitForCompletion(true));
}
}
PdfInputFormat.java
package com.bhavesh.poc.sensex;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.InputSplit;
import
org.apache.hadoop.mapreduce.RecordReader;
import
org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class PdfInputFormat extends
FileInputFormat {
@Override
public
RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException,
InterruptedException
{
return
new PdfRecordReader();
}
}
PdfRecordReader.java
package com.bhavesh.poc.sensex;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.
//context.write(new
IntWritable(ssxid), new Text(ssxname+","+TradeType+","+Location+","+Open_bal+","+Close_bal+","+Fluct_rate));
multipleOutputs.write(new
IntWritable(ssxid),
new
Text(str1),
generateFileName(key));
}
}
String
generateFileName(Text key){
return
key.toString();
}
@Override
public
void setup(Context context){
multipleOutputs
= new MultipleOutputs(context);
}
@Override
public
void cleanup(final Context context) throws IOException, InterruptedException{
multipleOutputs.close();
}
}
PIG Script
SENSEX.pig
A = LOAD '/hdfs/bhavesh/SENSEX/OUTPUT/HighDemandMarket-r-00000'
using PigStorage('\t') as
(Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disHM = DISTINCT A;
orHM = ORDER disHM by Sid;
STORE orHM into '/hdfs/bhavesh/SENSEX/HM' using
PigStorage(',');
A = LOAD
'/hdfs/bhavesh/SENSEX/OUTPUT/ReliableProducts-r-00000' using PigStorage('\t')
as
(Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disRP = DISTINCT A;
orRP = ORDER disRP by Sid;
STORE orRP into '/hdfs/bhavesh/SENSEX/RP' using
PigStorage(',');
A = LOAD '/hdfs/bhavesh/SENSEX/OUTPUT/OtherProducts-r-00000'
using PigStorage('\t') as
(Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disOP = DISTINCT A;
orOP = ORDER disOP by Sid;
STORE orOP into '/hdfs/bhavesh/SENSEX/OP' using
PigStorage(',');
A = LOAD
'/hdfs/bhavesh/SENSEX/OUTPUT/WealthyProducts-r-00000' using PigStorage('\t') as
(Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disWP= DISTINCT A;
orWP = ORDER disWP by Sid;
STORE orWP into '/hdfs/bhavesh/SENSEX/WP' using
PigStorage(',');
A = LOAD
'/hdfs/bhavesh/SENSEX/OUTPUT/OnGoingMarketStretegy-r-00000' using
PigStorage('\t') as (Sid:int,Sname:chararray,Ttrading:chararray,Sloc:chararray,OBal:int,CBal:int,Frate:int);
disOMS = DISTINCT A;
orOMS = ORDER disOMS by Sid;
STORE orOMS into '/hdfs/bhavesh/SENSEX/OMS' using
PigStorage(',');