/*
* This code was written by Gordon Linoff of Data Miners, Inc. (http://www.data-miners.com on 25 Nov 2009.
*
* The purpose of the code is to assign row numbers to data stored in HDFS files.
*
* The method uses two passes of the map/reduce framework. The first pass accomplishes two things:
* (1) It stores the data in a sequence file using the partition number and row number within
* the partition as keys.
* (2) It calculates the number of rows in each partition.
*
* This information is then used to obtain an offset for each partition.
*
* The final pass adds the offset to the row number to obtain the final row number for each row.
*/
package rownumbertwopass;
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.jobcontrol.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
public class RowNumberTwoPass {
// The following variables are used to pass parameters between the main
// program and the Map/Reduce classes.
static final String PARAMETER_saverecordsdir = "rownumbertwopass.saverecordsdir"; // directory for storing records with new key
static final String PARAMETER_cumsum_numvals = "rownumbertwopass.cumsum.numvals"; // number of partitions in the first pass
static final String PARAMETER_cumsum_nthvalue = "rownumbertwopass.cumsum."; // offset for each partition
// This Map function does two things. First it outputs the data in a new
// sequence file, creating a key from the partition id and row numbers within
// the partition.
public static class NewKeyOutputMap extends MapReduceBase implements
Mapper {
static SequenceFile.Writer sfw;
static IntWritable partitionid = new IntWritable(0);
static Text outkey = new Text("");
static long localrownum = 0;
static LongWritable localrownumvalue = new LongWritable(0);
// This configure is used to set up the map job. It extracts
// information from the configuruation and opens the sequence file.
public void configure(JobConf conf) {
String saverecordsdir;
partitionid.set(conf.getInt("mapred.task.partition", 0));
saverecordsdir = new String(conf.get(PARAMETER_saverecordsdir));
if (saverecordsdir.endsWith("/")) {
saverecordsdir.substring(0, saverecordsdir.length() - 1);
}
try {
FileSystem fs = FileSystem.get(conf);
sfw = SequenceFile.createWriter(fs, conf,
new Path(saverecordsdir+"/"+String.format("records%05d", partitionid.get())),
Text.class, Text.class);
} catch (Exception e) {
e.printStackTrace();
}
} // configure
public void map(LongWritable key, Text value,
OutputCollector output, Reporter reporter)
throws IOException {
localrownumvalue.set(++localrownum);
output.collect(partitionid, localrownumvalue);
outkey.set(partitionid.toString()+";" + localrownum);
sfw.append(outkey, value);
} // map()
} // class NewKeyOutputMap
// This reduce counts the number of records in each partition by
// taking the maximum of the row numbers. This reduce function is
// used both as a combiner and reducer.
public static class KeySummaryReduce extends MapReduceBase implements
Reducer {
public void reduce(IntWritable key, Iterator values,
OutputCollector output, Reporter reporter)
throws IOException {
LongWritable maxval = new LongWritable(Long.MIN_VALUE);
while (values.hasNext()) {
long val = values.next().get();
if (maxval.get() < val) {
maxval.set(val);
}
}
output.collect(key, maxval);
}
} // KeySummaryReduce()
// This map function adds the appropriate offset to the row number and
// outputs the results in the results directory.
public static class CalcRowNumberMap extends MapReduceBase implements
Mapper {
HashMap offsets = new HashMap();
static LongWritable outkey = new LongWritable(0);
public void configure(JobConf conf) {
int numvals = 0;
numvals = conf.getInt(PARAMETER_cumsum_numvals, 0);
offsets.clear();
for (int i = 0; i < numvals; i++) {
String val = conf.get(PARAMETER_cumsum_nthvalue + i);
String[] parts = val.split("\t");
offsets.put(parts[0], Long.parseLong(parts[2]));
}
} // configure
public void map(Text key, Text value,
OutputCollector output, Reporter reporter)
throws IOException {
String[] parts = key.toString().split(";");
long rownum = Long.parseLong(parts[1]) + offsets.get(parts[0]);
outkey.set(rownum);
output.collect(outkey, value);
} // map()
} // class CalcRowNumberMap
public static void main(String[] args) {
try {
JobClient client = new JobClient();
FileSystem fs;
JobConf newkeyconf = new JobConf(rownumbertwopass.RowNumberTwoPass.class);
newkeyconf.setJobName("Create keys from partition id and rownumber and calculate the size of each partition");
// Before running the job, delete the output files
String workdir = "rownumbertwopass";
fs = FileSystem.get(newkeyconf);
fs.delete(new Path(workdir), true);
String keysummaryoutput = workdir + "/pass1_keysummary";
String recordsoutput = workdir + "/pass1_records";
newkeyconf.set(PARAMETER_saverecordsdir, recordsoutput);
newkeyconf.setOutputKeyClass(Text.class);
newkeyconf.setOutputValueClass(Text.class);
newkeyconf.setMapOutputKeyClass(IntWritable.class);
newkeyconf.setMapOutputValueClass(LongWritable.class);
newkeyconf.setInputFormat(TextInputFormat.class);
newkeyconf.setInputPath(new Path("input"));
newkeyconf.setOutputPath(new Path(keysummaryoutput));
newkeyconf.setMapperClass(RowNumberTwoPass.NewKeyOutputMap.class);
newkeyconf.setCombinerClass(RowNumberTwoPass.KeySummaryReduce.class);
newkeyconf.setReducerClass(RowNumberTwoPass.KeySummaryReduce.class);
client.setConf(newkeyconf);
try {
JobClient.runJob(newkeyconf);
} catch (Exception e) {
e.printStackTrace();
}
JobConf finalconf = new JobConf(rownumbertwopass.RowNumberTwoPass.class);
// Now read the results in the key summary file
FileStatus[] files = fs.globStatus(new Path(keysummaryoutput+ "/p*"));
int numvals = 0;
long cumsum = 0;
for (FileStatus fstat : files) {
FSDataInputStream fsdis = fs.open(fstat.getPath());
String line = "";
while ((line = fsdis.readLine()) != null) {
finalconf.set(PARAMETER_cumsum_nthvalue + numvals++, line + "\t" + cumsum);
String[] vals = line.split("\t");
cumsum += Long.parseLong(vals[1]);
}
}
finalconf.setInt(PARAMETER_cumsum_numvals, numvals);
// This code block just prints out the offsets
//for (int i = 0; i < numvals; i++) {
// System.out.println("cumsum[" + i + "] = "
// + finalconf.get(PARAMETER_cumsum_nthvalue + i));
//}
finalconf.setJobName("Finalize the row numbers");
String finalconfoutput = workdir + "/results";
finalconf.setOutputKeyClass(LongWritable.class);
finalconf.setOutputValueClass(Text.class);
finalconf.setMapOutputKeyClass(LongWritable.class);
finalconf.setMapOutputValueClass(Text.class);
finalconf.setInputFormat(SequenceFileInputFormat.class);
finalconf.setInputPath(new Path(recordsoutput));
finalconf.setOutputPath(new Path(finalconfoutput));
finalconf.setMapperClass(RowNumberTwoPass.CalcRowNumberMap.class);
client.setConf(finalconf);
try {
JobClient.runJob(finalconf);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
} // main
} // class RowNumberTwoPass