/* * This code was written by Gordon Linoff of Data Miners, Inc. (http://www.data-miners.com on 25 Nov 2009. * * The purpose of the code is to assign row numbers to data stored in HDFS files. * * The method uses two passes of the map/reduce framework. The first pass accomplishes two things: * (1) It stores the data in a sequence file using the partition number and row number within * the partition as keys. * (2) It calculates the number of rows in each partition. * * This information is then used to obtain an offset for each partition. * * The final pass adds the offset to the row number to obtain the final row number for each row. */ package rownumbertwopass; import java.io.*; import java.util.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.jobcontrol.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; public class RowNumberTwoPass { // The following variables are used to pass parameters between the main // program and the Map/Reduce classes. static final String PARAMETER_saverecordsdir = "rownumbertwopass.saverecordsdir"; // directory for storing records with new key static final String PARAMETER_cumsum_numvals = "rownumbertwopass.cumsum.numvals"; // number of partitions in the first pass static final String PARAMETER_cumsum_nthvalue = "rownumbertwopass.cumsum."; // offset for each partition // This Map function does two things. First it outputs the data in a new // sequence file, creating a key from the partition id and row numbers within // the partition. public static class NewKeyOutputMap extends MapReduceBase implements Mapper { static SequenceFile.Writer sfw; static IntWritable partitionid = new IntWritable(0); static Text outkey = new Text(""); static long localrownum = 0; static LongWritable localrownumvalue = new LongWritable(0); // This configure is used to set up the map job. It extracts // information from the configuruation and opens the sequence file. public void configure(JobConf conf) { String saverecordsdir; partitionid.set(conf.getInt("mapred.task.partition", 0)); saverecordsdir = new String(conf.get(PARAMETER_saverecordsdir)); if (saverecordsdir.endsWith("/")) { saverecordsdir.substring(0, saverecordsdir.length() - 1); } try { FileSystem fs = FileSystem.get(conf); sfw = SequenceFile.createWriter(fs, conf, new Path(saverecordsdir+"/"+String.format("records%05d", partitionid.get())), Text.class, Text.class); } catch (Exception e) { e.printStackTrace(); } } // configure public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { localrownumvalue.set(++localrownum); output.collect(partitionid, localrownumvalue); outkey.set(partitionid.toString()+";" + localrownum); sfw.append(outkey, value); } // map() } // class NewKeyOutputMap // This reduce counts the number of records in each partition by // taking the maximum of the row numbers. This reduce function is // used both as a combiner and reducer. public static class KeySummaryReduce extends MapReduceBase implements Reducer { public void reduce(IntWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { LongWritable maxval = new LongWritable(Long.MIN_VALUE); while (values.hasNext()) { long val = values.next().get(); if (maxval.get() < val) { maxval.set(val); } } output.collect(key, maxval); } } // KeySummaryReduce() // This map function adds the appropriate offset to the row number and // outputs the results in the results directory. public static class CalcRowNumberMap extends MapReduceBase implements Mapper { HashMap offsets = new HashMap(); static LongWritable outkey = new LongWritable(0); public void configure(JobConf conf) { int numvals = 0; numvals = conf.getInt(PARAMETER_cumsum_numvals, 0); offsets.clear(); for (int i = 0; i < numvals; i++) { String val = conf.get(PARAMETER_cumsum_nthvalue + i); String[] parts = val.split("\t"); offsets.put(parts[0], Long.parseLong(parts[2])); } } // configure public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException { String[] parts = key.toString().split(";"); long rownum = Long.parseLong(parts[1]) + offsets.get(parts[0]); outkey.set(rownum); output.collect(outkey, value); } // map() } // class CalcRowNumberMap public static void main(String[] args) { try { JobClient client = new JobClient(); FileSystem fs; JobConf newkeyconf = new JobConf(rownumbertwopass.RowNumberTwoPass.class); newkeyconf.setJobName("Create keys from partition id and rownumber and calculate the size of each partition"); // Before running the job, delete the output files String workdir = "rownumbertwopass"; fs = FileSystem.get(newkeyconf); fs.delete(new Path(workdir), true); String keysummaryoutput = workdir + "/pass1_keysummary"; String recordsoutput = workdir + "/pass1_records"; newkeyconf.set(PARAMETER_saverecordsdir, recordsoutput); newkeyconf.setOutputKeyClass(Text.class); newkeyconf.setOutputValueClass(Text.class); newkeyconf.setMapOutputKeyClass(IntWritable.class); newkeyconf.setMapOutputValueClass(LongWritable.class); newkeyconf.setInputFormat(TextInputFormat.class); newkeyconf.setInputPath(new Path("input")); newkeyconf.setOutputPath(new Path(keysummaryoutput)); newkeyconf.setMapperClass(RowNumberTwoPass.NewKeyOutputMap.class); newkeyconf.setCombinerClass(RowNumberTwoPass.KeySummaryReduce.class); newkeyconf.setReducerClass(RowNumberTwoPass.KeySummaryReduce.class); client.setConf(newkeyconf); try { JobClient.runJob(newkeyconf); } catch (Exception e) { e.printStackTrace(); } JobConf finalconf = new JobConf(rownumbertwopass.RowNumberTwoPass.class); // Now read the results in the key summary file FileStatus[] files = fs.globStatus(new Path(keysummaryoutput+ "/p*")); int numvals = 0; long cumsum = 0; for (FileStatus fstat : files) { FSDataInputStream fsdis = fs.open(fstat.getPath()); String line = ""; while ((line = fsdis.readLine()) != null) { finalconf.set(PARAMETER_cumsum_nthvalue + numvals++, line + "\t" + cumsum); String[] vals = line.split("\t"); cumsum += Long.parseLong(vals[1]); } } finalconf.setInt(PARAMETER_cumsum_numvals, numvals); // This code block just prints out the offsets //for (int i = 0; i < numvals; i++) { // System.out.println("cumsum[" + i + "] = " // + finalconf.get(PARAMETER_cumsum_nthvalue + i)); //} finalconf.setJobName("Finalize the row numbers"); String finalconfoutput = workdir + "/results"; finalconf.setOutputKeyClass(LongWritable.class); finalconf.setOutputValueClass(Text.class); finalconf.setMapOutputKeyClass(LongWritable.class); finalconf.setMapOutputValueClass(Text.class); finalconf.setInputFormat(SequenceFileInputFormat.class); finalconf.setInputPath(new Path(recordsoutput)); finalconf.setOutputPath(new Path(finalconfoutput)); finalconf.setMapperClass(RowNumberTwoPass.CalcRowNumberMap.class); client.setConf(finalconf); try { JobClient.runJob(finalconf); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } // main } // class RowNumberTwoPass