Hadoop: Find Inner Product of Two Sparse Vectors

Its been a while I wrote a hadoop program and thought to come back in action. I wrote the program for the following problem

Inner product of two sparse vectors—A vector is a list of values. Given two vectors, X=[x1,x2,…]andY=[y1,y2,…],theirinnerproductisZ=x1*y1+x2*y2+… . When X and Y are long but have many elements with zero value, they’re usually given in a sparse representation:1,0.469,0.2117,0.92…

Hadoop in Action Chapter 4 Problem 3

Given a problem statement this is pretty straightforward thing to do.

1
2
read the input
since the input could be sparse, means x1, x2 both could be zero, just ignore them, since they are not going to make any change in output

The program is simple to write

InnerProductOfTwoVectors link
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#! /usr/bin/env python

"""
Assumption : all inputs in matrix are integer
If they contain decimals, change the multiplication to
__get_float_multiplication
"""

import sys
key = '1'

def __get_integer_multiplication(x1, x2):
  return int(x1) * int(x2)

def __get_float_multiplication(x1, x2):
  return float(x1) * float(x2)

for line in sys.stdin:
  fields = line.split(',')
  if fields[0] == "0" and fields[1] == "0":
      continue
  print 'LongValueSum:' + key + '\t' + str(__get_integer_multiplication(fields[0], fields[1]))

Lets run this ..

1
2
3
4
5
$ less ~/Downloads/tmp/innerproduct
1,2
3,4
0,0
5,6

Think of these as (x1, x2) pairs

1
hadoop dfs -put innerproduct input/innerproduct
1
$ hadoop jar /usr/local/Cellar/hadoop/1.0.3/libexec/contrib/streaming/hadoop-streaming-1.0.3.jar -D mapred.reduce.tasks=1 -mapper 'mapper.py' -file mapper.py -input input -output output -reducer aggregate

This runs pretty quickly

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2012-10-02 13:02:52.781 java[50196:1203] Unable to load realm info from SCDynamicStore
packageJobJar: [mapper.py, /Users/hhimanshu/app/hadoop/tmp/hadoop-unjar3930199009535869988/] [] /var/folders/v4/tt6p3k950f1_m9p29d0mx5f80000gp/T/streamjob4922862718386718220.jar tmpDir=null
12/10/02 13:02:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
12/10/02 13:02:53 WARN snappy.LoadSnappy: Snappy native library not loaded
12/10/02 13:02:53 INFO mapred.FileInputFormat: Total input paths to process : 1
12/10/02 13:02:53 INFO streaming.StreamJob: getLocalDirs(): [/Users/hhimanshu/app/hadoop/tmp/mapred/local]
12/10/02 13:02:53 INFO streaming.StreamJob: Running job: job_201210021236_0008
12/10/02 13:02:53 INFO streaming.StreamJob: To kill this job, run:
12/10/02 13:02:53 INFO streaming.StreamJob: /usr/local/Cellar/hadoop/1.0.3/libexec/bin/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201210021236_0008
12/10/02 13:02:53 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201210021236_0008
12/10/02 13:02:54 INFO streaming.StreamJob:  map 0%  reduce 0%
12/10/02 13:03:06 INFO streaming.StreamJob:  map 100%  reduce 0%
12/10/02 13:03:18 INFO streaming.StreamJob:  map 100%  reduce 100%
12/10/02 13:03:24 INFO streaming.StreamJob: Job complete: job_201210021236_0008
12/10/02 13:03:24 INFO streaming.StreamJob: Output: output

and the output

1
2
3
$ hadoop dfs -cat output/part-00000
2012-10-02 15:05:46.009 java[77714:1203] Unable to load realm info from SCDynamicStore
1 44

Lets confirm this if this is right output

1
2
3
4
1*2 + 3*4 + 0*0 + 5*6
2 + 12 + 0 + 30
14 + 40
44

So we have implemented this correctly. Now you can run this on any number of machines as long as input format is same as mentioned above.

Packaging Module : Build Jar With Dependencies Included

I have been working with a project lately which will migrate all the documents from datacenter to Amazon Web Services S3.

As part of this project, I had to build a small upload app, which will put these documents in Amazon.

The requirement I had was to build my app as jar and include all dependencies in the jar.

I googled around and realized that the following code in your pom.xml will do the trick

Find the Hourly Traffic From Server Logs Using Hadoop

This was relatively eaiser than I thought, or I understood it completely wrong ;)

The question goes like

Web traffic measurement—Take a web server log file and write a Streaming program with the Aggregate package to find the hourly traffic to that site.

Hadoop in Action Chapter 4 Problem 2

This program makes use of hadoop-streaming and uses Aggregate package to implement this program

The idea is simple

- get the timestamp from logs
- make key such that each everything between HOUR:00 to HOUR:59 is HOUR:00
- call the aggregate package to sum up the values for you

Then write a mapper as

Web Traffic Measurementsource code
1
2
3
4
5
6
7
8
9
   #! /usr/bin/env python

  import sys

  for line in sys.stdin:
      fields = line.split('\t')
      time_split = fields[2].split(':')
      key = str(time_split[0]) + ':00'
      print 'LongValueSum:' + key + '\t' + '1'

and you need to run this using streaming.jar

1
hadoop jar /usr/local/Cellar/hadoop/1.0.3/libexec/contrib/streaming/hadoop-streaming-1.0.3.jar -input input -output output -file mapper.py -mapper 'mapper.py' -reducer aggregate

get the output

1
$ hadoop dfs -get output .

and your data after aggregation will look like

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2006-05-29 14:00 2335
2006-05-29 15:00  2169
2006-05-29 16:00  2231
2006-05-29 17:00  2624
2006-05-29 18:00  2935
2006-05-29 19:00  2952
2006-05-29 20:00  2918
2006-05-29 21:00  3054
2006-05-29 22:00  3071
2006-05-29 23:00  2369
2006-05-30 00:00  1752
2006-05-30 01:00  1071
2006-05-30 02:00  736
2006-05-30 03:00  428
2006-05-30 04:00  269
2006-05-30 05:00  267
2006-05-30 06:00  395
2006-05-30 07:00  694
2006-05-30 08:00  1027
2006-05-30 09:00  1595
2006-05-30 10:00  1668
2006-05-30 11:00  2291
2006-05-30 12:00  2052
2006-05-30 13:00  2311
2006-05-30 14:00  2250
2006-05-30 15:00  2215
2006-05-30 16:00  2454
2006-05-30 17:00  3002
2006-05-30 18:00  2744
2006-05-30 19:00  2894
2006-05-30 20:00  3072
2006-05-30 21:00  3065
2006-05-30 22:00  3031
# .. more output stripped

Finding TopKRecords Using Hadoop

This is my first attempt to write a Map Reduce program myself :)

The input data - apat63_99.zip

The idea I had is following:

- read each record
- get the relevant column
- group the elements by common key, so that one reducer will get all the data
- get the unique elements in the list received by reducer and sort it
- return only the number of values requested as K parameter on command line

The entire source code is

Finding Top K Records source code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package com.hadoop.programs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeSet;

/**
 * User: hhimanshu
 * Date: 8/1/12
 * Time: 5:42 AM
 *
 * @author Harit Himanshu</a>
 */
public class TopKRecord extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, Text, LongWritable> {

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // your map code goes here
            String[] fields = value.toString().split(",");
            Text year = new Text("topKRecords");
            LongWritable claims = new LongWritable();

            if (fields[8].length() > 0 && (!fields[8].startsWith("\""))) {
                claims.set(Long.parseLong(fields[8].toString()));
                context.write(year, claims);
            }
        }
    }

    public static class Reduce extends Reducer<Text, LongWritable, Text, Text> {

        public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            // your reduce function goes here
            TreeSet<Long> uniqueSorted = new TreeSet<Long>();
            for (LongWritable value: values) {
                uniqueSorted.add(Long.parseLong(value.toString()));
            }

            StringBuffer sb = new StringBuffer();
            Iterator reverseIterator = uniqueSorted.descendingIterator();
            Configuration conf = context.getConfiguration();
            int maxRecords = Integer.parseInt(conf.get("NumberOfRecords"));
            int count = 0;
            while (reverseIterator.hasNext()) {
                    if (count == maxRecords) {
                        break;
                    }
                    sb.append(reverseIterator.next().toString() + ",");
                    count ++;
                }
            context.write(key, new Text(sb.toString()));
        }
    }

    public int run(String args[]) throws Exception {
        Configuration conf = new Configuration();
        conf.set("NumberOfRecords", args[2]);

        Job job = new Job(conf);
        job.setJarByClass(TopKRecord.class);

        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setJobName("TopKRecord");

//        job.setNumReduceTasks(0); // to just run Mapper
        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }

    public static void main(String args[]) throws Exception {
        int ret = ToolRunner.run(new TopKRecord(), args);
        System.exit(ret);
    }
}

For running the program, do the following

1
2
3
4
5
6
7
8

    $ hadoop jar Hadoop-programs.jar com/hadoop/programs/TopKRecord in out 4
   $ cat out/part-00000
   topKRecords     868,706,642,472,
  
   $ hadoop jar Hadoop-programs.jar com/hadoop/programs/TopKRecord in out 10
   $ cat out/part-00000
   topKRecords     868,706,642,472,394,393,348,346,320,313,

Please feel free to point out the better ways of doing it, would be happy to learn

Passing Parameters to Mappers and Reducers in New Hadoop API

This came out as my part of learning where I had to pass variable on runtime to reducer to show up only the number of records I pass as parameter

I looked up and found that there are two ways - using old API and using new API.

Assuming that you are using the latest hadoop distribution, you are using new API, here is how you would do it

1
2
3
4
5
6
7
8
    // while setting up job
  Configuration conf = new Configuration();
  conf.set("NumberOfRecords", args[2]); // I chose to get param from command-line, you can override to any value
  Job job = new Job(conf);
  
  // in your mapper() or reducer()
  Configuration conf = context.getConfiguration();
  int maxRecords = Integer.parseInt(conf.get("NumberOfRecords"));

and this is all you need!

Basic Template for Your Most Map Reduce Programs

This is true that you would never have to write a Map Reduce program from scratch. This is what I learned while reading Hadoop in Action

I thought I would be a nice thing to write a basic Map Reduce skeleton that I and almost anyone want to write Map Reduce program can leverage.

Here is it

Let me know if you see any issues with that or you would like to share anything that would be useful.

Extracting Specific Fields From Your Mongo Collection

This morning I had a requirement where in I had to get the list of all names in my collections in a file.

I looked over the MongoDB documentation and found mongoexport utility that makes your life easier

To use it, it was pretty simple. Consider my schema as

1
2
3
4
5
6
7
# database - mydatabase, collection - mycollection
{
  name: <something>,
  date: <some_date>,
  value: <some_value>,
  # .... and so on
}

Suppose you need to take out all the ‘name’ and ‘value’ from collection in a file. You will do the following

1
   $mongoexport -d mydatabase -c mycollection -f name, value --csv

Thats it!!

CitationHistogram - First Hadoop Program

This is my first post in attempt to learn Hadoop using Java. I am using “Hadoop in Action” book to learn. The first program is about building a Citation Histogram (mentioned completely in the book) It has two parts:

Part - 1 : Building the map reduce program to count the number of citations “cited”

Part - 2 : Building the map reduce program to count the counts and plot them

I have used R for plotting the graph.

For part - 1, you need to uncomment the part

1
//      job.set("key.value.separator.in.input.line", ",");

because input dataset has “,” as input separator.

The output of part - 1 becomes the input for part - 2 and the input separator is tab(\t). Our InputFormat class is

1
job.setInputFormat(KeyValueTextInputFormat.class);

and the default input separator is tab(\t) for this class.

So we see that the same program is reused twice.

Now the way you run this is as following:

 - Build jar for the class CitationHistogram (you can do this by using your IDE, I am using IntelliJ IDEA)

 - copy the input to hadoop file system
1
2
3
4
hadoop dfs -mkdir input
hadoop dfs -copyFromLocal <path_to_acite_75_99.txt> input/
hadoop jar Hadoop-programs.jar com/hadoop/patent/CitationHistogram input output
# Note: make sure output drectory doesn't exists on hadoop filesystem

Once your program completes, you can run the following to get the data out from HDFS

1
hadoop dfs -cat output/part-00000 > ~/Downloads/hadoop/output/CitationHistogramCount

and the you can read this in R(language) to plot the graph

1
2
3
4
# CitationHistogram
mydata = read.csv(file="~/Downloads/hadoop/output/CitationHistogramCount")
mydata
plot(mydata, xlab="Number of Citations", ylab="Number of Patents", log="y")

and you can see the CitationHistogram following Power Law as following

First Post

1
2
def sayHello():
    print "Hello World"

This is my first post