/*
* This code was written by Gordon Linoff of Data Miners, Inc. (http://www.data-miners.com on 29 Nov 2009.
*
* The purpose of the code is to assign row numbers to data stored in HDFS files.
*
* The method uses two passes of the map/reduce framework. The first pass accomplishes two things:
* (1) It stores the data in a sequence file using the partition number and row number within
* the partition as keys.
* (2) It calculates the number of rows in each partition.
*
* This information is then used to obtain an offset for each partition.
*
* The final pass adds the offset to the row number to obtain the final row number for each row.
*/
package rownumbertwopass;
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.map.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
public class RowNumberTwoPass {
// The following variables are used to pass parameters between the main
// program and the Map/Reduce classes.
static final String PARAMETER_saverecordsdir = "rownumbertwopass.saverecordsdir"; // directory for storing records with new key
static final String PARAMETER_cumsum_numvals = "rownumbertwopass.cumsum.numvals"; // number of partitions in the first pass
static final String PARAMETER_cumsum_nthvalue = "rownumbertwopass.cumsum."; // offset for each partition
// This Map function does two things. First it outputs the data in a new
// sequence file, creating a key from the partition id and row numbers within
// the partition.
public static class NewKeyOutputMap
extends Mapper {
static SequenceFile.Writer sfw;
static IntWritable partitionid = new IntWritable(0);
static Text outkey = new Text("");
static long localrownum = 0;
static LongWritable localrownumvalue = new LongWritable(0);
// This configure is used to set up the map job. It extracts
// information from the configuration and opens the sequence file.
public void setup (Context context) {
String saverecordsdir;
partitionid.set(context.getConfiguration().getInt("mapred.task.partition", 0));
saverecordsdir = new String(context.getConfiguration().get(PARAMETER_saverecordsdir));
if (saverecordsdir.endsWith("/")) {
saverecordsdir.substring(0, saverecordsdir.length() - 1);
}
try {
FileSystem fs = FileSystem.get(context.getConfiguration());
sfw = SequenceFile.createWriter(fs, context.getConfiguration(),
new Path(saverecordsdir+"/"+String.format("records%05d", partitionid.get())),
Text.class, Text.class);
} catch (Exception e) {
e.printStackTrace();
}
} // setup
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
localrownumvalue.set(++localrownum);
context.write(partitionid, localrownumvalue);
outkey.set(partitionid.toString()+";" + localrownum);
sfw.append(outkey, value);
} // map()
public void cleanup (Context context) throws IOException {
sfw.close();
}
} // class NewKeyOutputMap
// This reduce counts the number of records in each partition by
// taking the maximum of the row numbers. This reduce function is
// used both as a combiner and reducer.
public static class KeySummaryReduce extends
Reducer {
public void reduce(IntWritable key, Iterator values, Context context)
throws IOException, InterruptedException {
LongWritable maxval = new LongWritable(Long.MIN_VALUE);
while (values.hasNext()) {
long val = values.next().get();
if (maxval.get() < val) {
maxval.set(val);
}
}
context.write(key, maxval);
}
} // KeySummaryReduce()
// This map function adds the appropriate offset to the row number and
// outputs the results in the results directory.
public static class CalcRowNumberMap extends Mapper {
HashMap offsets = new HashMap();
static LongWritable outkey = new LongWritable(0);
public void setup(Context context) {
int numvals = 0;
numvals = context.getConfiguration().getInt(PARAMETER_cumsum_numvals, 0);
offsets.clear();
for (int i = 0; i < numvals; i++) {
String val = context.getConfiguration().get(PARAMETER_cumsum_nthvalue + i);
String[] parts = val.split("\t");
offsets.put(parts[0], Long.parseLong(parts[2]));
}
} // configure
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
String[] parts = key.toString().split(";");
long rownum = Long.parseLong(parts[1]) + offsets.get(parts[0]);
outkey.set(rownum);
context.write(outkey, value);
} // map()
} // class CalcRowNumberMap
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
FileSystem fs;
String workdir = "rownumbertwopass";
fs = FileSystem.get(conf);
fs.delete(new Path(workdir), true);
String keysummaryoutput = workdir + "/pass1_keysummary";
String recordsoutput = workdir + "/pass1_records";
conf.set(PARAMETER_saverecordsdir, recordsoutput);
Job newkeyjob = new Job(conf, "Create keys from partition id and rownumber and calculate the size of each partition");
newkeyjob.setJarByClass(rownumbertwopass.RowNumberTwoPass.class);
// Before running the job, delete the output files
newkeyjob.setOutputKeyClass(Text.class);
newkeyjob.setOutputValueClass(Text.class);
newkeyjob.setMapOutputKeyClass(IntWritable.class);
newkeyjob.setMapOutputValueClass(LongWritable.class);
newkeyjob.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(newkeyjob, new Path("input"));
FileOutputFormat.setOutputPath(newkeyjob, new Path(keysummaryoutput));
newkeyjob.setMapperClass(RowNumberTwoPass.NewKeyOutputMap.class);
newkeyjob.setCombinerClass(RowNumberTwoPass.KeySummaryReduce.class);
newkeyjob.setReducerClass(RowNumberTwoPass.KeySummaryReduce.class);
newkeyjob.waitForCompletion(true);
Job finaljob = new Job(conf, "Finalize the row numbers");
finaljob.setJarByClass(rownumbertwopass.RowNumberTwoPass.class);
// Now read the results in the key summary file
FileStatus[] files = fs.globStatus(new Path(keysummaryoutput+ "/p*"));
int numvals = 0;
long cumsum = 0;
for (FileStatus fstat : files) {
FSDataInputStream fsdis = fs.open(fstat.getPath());
String line = "";
while ((line = fsdis.readLine()) != null) {
conf.set(PARAMETER_cumsum_nthvalue + numvals++, line + "\t" + cumsum);
String[] vals = line.split("\t");
cumsum += Long.parseLong(vals[1]);
}
}
conf.setInt(PARAMETER_cumsum_numvals, numvals);
// This code block just prints out the offsets
//for (int i = 0; i < numvals; i++) {
// System.out.println("cumsum[" + i + "] = "
// + finalconf.get(PARAMETER_cumsum_nthvalue + i));
//}
String finalconfoutput = workdir + "/results";
finaljob.setOutputKeyClass(LongWritable.class);
finaljob.setOutputValueClass(Text.class);
finaljob.setMapOutputKeyClass(LongWritable.class);
finaljob.setMapOutputValueClass(Text.class);
finaljob.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.addInputPath(finaljob, new Path(recordsoutput));
FileOutputFormat.setOutputPath(finaljob, new Path(finalconfoutput));
finaljob.setMapperClass(RowNumberTwoPass.CalcRowNumberMap.class);
newkeyjob.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
} // main
} // class RowNumberTwoPass