HADOOP (PROOF OF CONCEPTS) WEATHER REPORT ANALYSIS
Hello Friends, Welcome back... This blog is for analysis of Weather Report POC which was given to me and asked by one of my friends to complete it. While searching for the same I came across a very good website which I just can't wait to share with you all.. In this POC I have modified and used both for convenience of making you all understand the concept. Problem Statement: 1. The system receives temperatures of various cities captured at regular intervals of time on each day in an input file. 2. All cities weather information for a week will be inputted to the system in a single input file. 3. System will process the input data file and generates a report with Maximum and Minimum temperatures of each day. 4. Generates a separate output report for each city. Ex: California-r-00000 Newjersy-r-00000 Newyork-r-00000 5. Develop a PIG Script to filter the Map Reduce Output in the below fashion - Provide the Unique data - Sort the Unique data based on RETAIL_ID in DESC order 6. EXPORT the same PIG Output from HDFS to MySQL using SQOOP 7. Store the same PIG Output in a HIVE External Table. Input File Format:- .txt This POC Input file and Problem statement was shared to me by Mr. Amol Wani which contains temperature statistics with time for multiple cities.Schema of record set:- CA_25-Jan-2014 00:12:345 15.7 01:19:345 23.1 02:34:542 12.3 03:12:187 16 04:00:093 -14 05:12:345 35.7 06:19:345 23.1 07:34:542 12.3 08:12:187 16 09:00:093 -7 10:12:345 15.7 11:19:345 23.1 12:34:542 -22.3 13:12:187 16 14:00:093 -7 15:12:345 15.7 16:19:345 23.1 19:34:542 12.3 20:12:187 16 22:00:093 -7 CA is city code, here it stands for California followed by date. After that each pair of values represent time and temperature. DOWNLOAD MY INPUT FILE FROM BELOW LINK: https://drive.google.com/file/d/0BzYUKIo7aWL_eV9OYmZjcTM1ejA/view?usp=sharing 1. TO TAKE INPUT DATA ON HDFS hadoop fs -mkdir /InputData hadoop fs -put weather_report.txt /InputData jar xvf WeatherReportPoc.jar 2. MAP REDUCE CODES:- WEATHER REPORT PROCESSOR (DRIVER CLASS) package com.poc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WeatherReportProcessor { public static String caOutputName = "California"; public static String nyOutputName = "Newyork"; public static String njOutputName = "Newjersy"; public static String ausOutputName = "Austin"; public static String bosOutputName = "Boston"; public static String balOutputName = "Baltimore"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "Weather Report"); job.setJarByClass(WeatherReportProcessor.class); job.setMapperClass(WeatherMapper.class); job.setReducerClass(WeatherReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FloatWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); MultipleOutputs.addNamedOutput(job, caOutputName, TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, nyOutputName, TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, njOutputName, TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, bosOutputName, TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, ausOutputName, TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, balOutputName, TextOutputFormat.class, Text.class, Text.class); // job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } WEATHER MAPPER (HAVING MAPPER LOGIC) package com.poc; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WeatherMapper extends Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text dayReport, Context context) throws IOException, InterruptedException { StringTokenizer st2 = new StringTokenizer(dayReport.toString(), "\t"); int counter = 0; String cityDateString = ""; String maxTempTime = ""; String minTempTime = ""; String

Hello Friends,
Welcome back... This blog is for analysis of Weather Report POC which was given to me and asked by one of my friends to complete it. While searching for the same I came across a very good website which I just can't wait to share with you all.. In this POC I have modified and used both for convenience of making you all understand the concept.
Problem Statement:
1. The system receives temperatures of various cities captured at regular intervals of time on each day in an input file.
2. All cities weather information for a week will be inputted to the system in a single input file.
3. System will process the input data file and generates a report with Maximum and Minimum temperatures of each day.
4. Generates a separate output report for each city.
Ex: California-r-00000
Newjersy-r-00000
Newyork-r-00000
- Provide the Unique data
- Sort the Unique data based on RETAIL_ID in DESC order
6. EXPORT the same PIG Output from HDFS to MySQL using SQOOP
7. Store the same PIG Output in a HIVE External Table.
Input File Format:- .txt
DOWNLOAD MY INPUT FILE FROM BELOW LINK:
1. TO TAKE INPUT DATA ON HDFS
hadoop fs -mkdir /InputData
2. MAP REDUCE CODES:-
3. EXECUTING THE MAP REDUCE CODE
Explanation:-
In map method, we are parsing each input line and maintains a counter for extracting date and each temperature & time information.For a given input line, first extract date(counter ==0) and followed by alternatively extract time(counter%2==1) since time is on odd number position like (1,3,5....) and get temperature otherwise. Compare for max & min temperature and store it accordingly. Once while loop terminates for a given input line, write maxTempTime and minTempTime with date.
A = LOAD '/WeatherOutput/' USING PigStorage ('\t') AS (date:chararray, mintemp:float, maxtemp:float);
(In the output we can clearly see that it is reading all files and as we have given DISTINCT command it is removing duplicate entries)
NOTE:- Don't give DISTINCT command if you want to export all entries.
sqoop export --connect jdbc:mysql://localhost/WEATHER--table weatherpoc --export-dir /WEATHERPOC --fields-terminated-by '\t';
create external table weatherpoc(Name string, mintemp float, maxtemp float)