Its been a while I wrote a hadoop program and thought to come back in action.
I wrote the program for the following problem
Inner product of two sparse vectors—A vector is a list of values. Given two vectors, X=[x1,x2,…]andY=[y1,y2,…],theirinnerproductisZ=x1*y1+x2*y2+… . When X and Y are long but have many elements with zero value, they’re usually given in a sparse representation:1,0.469,0.2117,0.92…
Given a problem statement this is pretty straightforward thing to do.
12
read the input
since the input could be sparse, means x1, x2 both could be zero, just ignore them, since they are not going to make any change in output
#! /usr/bin/env python"""Assumption : all inputs in matrix are integerIf they contain decimals, change the multiplication to__get_float_multiplication"""importsyskey='1'def__get_integer_multiplication(x1,x2):returnint(x1)*int(x2)def__get_float_multiplication(x1,x2):returnfloat(x1)*float(x2)forlineinsys.stdin:fields=line.split(',')iffields[0]=="0"andfields[1]=="0":continueprint'LongValueSum:'+key+'\t'+str(__get_integer_multiplication(fields[0],fields[1]))
Lets run this ..
12345
$ less ~/Downloads/tmp/innerproduct
1,2
3,4
0,0
5,6
- get the timestamp from logs
- make key such that each everything between HOUR:00 to HOUR:59 is HOUR:00
- call the aggregate package to sum up the values for you
- read each record
- get the relevant column
- group the elements by common key, so that one reducer will get all the data
- get the unique elements in the list received by reducer and sort it
- return only the number of values requested as K parameter on command line
packagecom.hadoop.programs;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.conf.Configured;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.Tool;importorg.apache.hadoop.util.ToolRunner;importjava.io.IOException;importjava.util.Iterator;importjava.util.TreeSet;/** * User: hhimanshu * Date: 8/1/12 * Time: 5:42 AM * * @author Harit Himanshu</a> */publicclassTopKRecordextendsConfiguredimplementsTool{publicstaticclassMapClassextendsMapper<LongWritable,Text,Text,LongWritable>{publicvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{// your map code goes hereString[]fields=value.toString().split(",");Textyear=newText("topKRecords");LongWritableclaims=newLongWritable();if(fields[8].length()>0&&(!fields[8].startsWith("\""))){claims.set(Long.parseLong(fields[8].toString()));context.write(year,claims);}}}publicstaticclassReduceextendsReducer<Text,LongWritable,Text,Text>{publicvoidreduce(Textkey,Iterable<LongWritable>values,Contextcontext)throwsIOException,InterruptedException{// your reduce function goes hereTreeSet<Long>uniqueSorted=newTreeSet<Long>();for(LongWritablevalue:values){uniqueSorted.add(Long.parseLong(value.toString()));}StringBuffersb=newStringBuffer();IteratorreverseIterator=uniqueSorted.descendingIterator();Configurationconf=context.getConfiguration();intmaxRecords=Integer.parseInt(conf.get("NumberOfRecords"));intcount=0;while(reverseIterator.hasNext()){if(count==maxRecords){break;}sb.append(reverseIterator.next().toString()+",");count++;}context.write(key,newText(sb.toString()));}}publicintrun(Stringargs[])throwsException{Configurationconf=newConfiguration();conf.set("NumberOfRecords",args[2]);Jobjob=newJob(conf);job.setJarByClass(TopKRecord.class);job.setMapperClass(MapClass.class);job.setReducerClass(Reduce.class);FileInputFormat.setInputPaths(job,newPath(args[0]));FileOutputFormat.setOutputPath(job,newPath(args[1]));job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);job.setJobName("TopKRecord");// job.setNumReduceTasks(0); // to just run Mapperbooleansuccess=job.waitForCompletion(true);returnsuccess?0:1;}publicstaticvoidmain(Stringargs[])throwsException{intret=ToolRunner.run(newTopKRecord(),args);System.exit(ret);}}
This came out as my part of learning where I had to pass variable on runtime to reducer to show up only the number of records I pass as parameter
I looked up and found that there are two ways - using old API and using new API.
Assuming that you are using the latest hadoop distribution, you are using new API, here is how you would do it
12345678
// while setting up jobConfigurationconf=newConfiguration();conf.set("NumberOfRecords",args[2]);// I chose to get param from command-line, you can override to any valueJobjob=newJob(conf);// in your mapper() or reducer()Configurationconf=context.getConfiguration();intmaxRecords=Integer.parseInt(conf.get("NumberOfRecords"));
This is my first post in attempt to learn Hadoop using Java.
I am using “Hadoop in Action” book to learn. The first program is about building a Citation Histogram (mentioned completely in the book)
It has two parts:
Part - 1 : Building the map reduce program to count the number of citations “cited”
Part - 2 : Building the map reduce program to count the counts and plot them
and the you can read this in R(language) to plot the graph
1234
#CitationHistogrammydata=read.csv(file="~/Downloads/hadoop/output/CitationHistogramCount")mydataplot(mydata,xlab="Number of Citations",ylab="Number of Patents",log="y")
and you can see the CitationHistogram following Power Law as following