package normalize; /** * * @author Gordon S. Linoff (gordon@data-miners.com) * December 2009 */ /* This set of Hadoop jobs normalizes the data in a given set of columns * by doing the following: * * (1) It extracts all values the columns being normalized. * * (2) It matches the extracted list to the master list, keeping only the column values * that do not match * Note: this step is not combined with (1) so this step only uses data formats * created by this process * * (3) Read the master key file again to calculate the maximum key value for each column * a) read the maximum values back into the conf environment variables * * (4) It assigns a new key value to the unmatched column value * a) first, by assigning a rownumber to the unmatched column values * b) and then by adding the column master key offset back in (from 3a) * * (5) It merges the new keys with the existing keys * The goal here is probably to split the file into multiple pieces, * each about the same size. * * (6) It processes the original file and the new master to insert the new keys * for each column value */ import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.ArrayList; import java.io.*; import java.io.BufferedReader.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Normalize { static final String PARAMETER_saverecordsdir = "normalize.saverecordsdir"; static final String PARAMETER_cumsum_numvals = "normalize.cumsum.numvals"; // number of partitions in the first pass static final String PARAMETER_cumsum_nthvalue = "normalize.cumsum."; // offset for each partition static final String PARAMETER_maxvals_numvals = "normalize.maxvals.numvals"; static final String PARAMETER_maxvals_nthvalue = "normalize.maxvals."; static final String PARAMETER_usecolumns_nthvalue = "normalize.usecolumns."; static final String PARAMETER_usecolumns_numvals = "normalize.usecolumns.numvals"; static final String PARAMETER_original_dataformatfile = "normalize.dataformatfilename"; // Step 1: Extract the column names and values // Reads the data records // Map: Reads a generic record and creates outputs for all columns being normalized // Output key: : // Value: 1 as a number // Combine: Adds the values for column value pairs to count the number // Output key: : // Value: count // Reduce: Adds the values for column value pairs to count the number // Output key: : // Value: GenericRecord with , , public static class ColumnValueMap extends Mapper { static Text keytext = new Text(""); static final LongWritable valone = new LongWritable(1); static String [] usecolumns; static int numcolumns = 0; static boolean output_column_names = false; public void setup (Context context) { Configuration conf = context.getConfiguration(); numcolumns = conf.getInt(PARAMETER_usecolumns_numvals, 0); if (numcolumns > 0) { usecolumns = new String[numcolumns]; for (int i = 0; i < numcolumns; i++) { usecolumns[i] = conf.get(PARAMETER_usecolumns_nthvalue+i); } } if (conf.getInt("mapred.task.partition", -1) == 0) { output_column_names = true; } } // setup() public void map(Text key, GenericRecord value, Context context) throws IOException, InterruptedException { if (output_column_names) { Configuration conf = context.getConfiguration(); String fname = conf.get(PARAMETER_original_dataformatfile); GenericRecord genrec = new GenericRecord(value.getMetadata()); genrec.set(value.getMetadata().getColumnNames()); FileSystem fs = FileSystem.get(conf); FSDataOutputStream outs = fs.create(new Path(fname), true); outs.writeUTF(genrec.toString()); outs.close(); output_column_names = false; } if (numcolumns == 0) { for (Map.Entry kv : value.getMetadata().entrySet()) { keytext.set(kv.getKey()+":"+value.get(kv.getValue())); context.write(keytext, valone); } } else for (int i = 0; i < numcolumns; i++) { try { keytext.set(usecolumns[i]+":"+value.get(usecolumns[i])); context.write(keytext, valone); } catch (Exception e) { } } } // map() } // class ColumnValueMap public static class ColumnValueCombine extends Reducer { static LongWritable longval = new LongWritable(); public void combine(Text key, Iterable values, Context context) throws IOException, InterruptedException { Long count = 0L; for (LongWritable val : values) { count += val.get(); } longval.set(count); context.write(key, longval); } } // ColumnValueCombine() public static class ColumnValueReduce extends Reducer { private static GenericRecordMetadata metadata; private static GenericRecord genrec; private static Text keytext = new Text(); private static boolean firsttime = false; public void setup(Context context) { String [] colnames = new String[] {"value", "count"}; metadata = new GenericRecordMetadata("summary", colnames); genrec = new GenericRecord(metadata); genrec.init(); genrec.set(colnames); keytext.set("column"); firsttime = true; } // setup() public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { if (firsttime) { context.write(keytext, genrec); firsttime = false; } Long count = 0L; for (LongWritable val : values) { count += val.get(); } genrec.init(); String [] parts = key.toString().split(":"); genrec.set(0, parts[1]); genrec.set(1, count.toString()); keytext.set(parts[0]); context.write(keytext, genrec); } } // ColumnValueReduce() // Step 2: Filter to keep only column name values that don't match // Reads the output from Step 1 and the existing master file // Map: Identifies the master records and the column value records // Output key: : // Value: GenericRecord , , // Reduce: Determines whether or not there is a master record for a given : // combination. Only outputs records when there is no matching master record // Output key: : // Value: GenericRecord with "value", "count", "hadmaster", "id" static GenericRecordMetadata getNonmatchingMetadata() { String [] colnames = new String[] {"count", "ismaster", "key"}; return new GenericRecordMetadata("normalize.nonmatching", colnames); } // getNonmatchingMetadata() public static class GetNonmatchingMap extends Mapper { static boolean ismaster = false; static GenericRecord genrec; static GenericRecordMetadata metadata; static Text keytext = new Text(""); public void setup(Context context) { FileSplit fs = (FileSplit) context.getInputSplit(); String path = fs.getPath().toString(); if (path.indexOf("/master") >= 0) { ismaster = true; } else ismaster = false; metadata = getNonmatchingMetadata(); genrec = new GenericRecord(metadata); } // setup() public void map(Text key, GenericRecord value, Context context) throws IOException, InterruptedException { genrec.init(); if (ismaster) { genrec.set("count", value.get("count")); genrec.set("ismaster", "1"); genrec.set("key", value.get("key")); keytext.set(value.get("column")+":"+value.get("value")); } else { genrec.set("count", value.get("count")); genrec.set("ismaster", "0"); genrec.set("key", ""); keytext.set(value.get("column")+":"+value.get("value")); } context.write(keytext, genrec); } // map() } // class GetNonmatchingMap() public static class GetNonmatchingReduce extends Reducer { static Text keytext = new Text(""); static GenericRecord genrec; static GenericRecordMetadata metadata; static GenericRecordMetadata inmetadata; static boolean firsttime = false; public void setup (Context context) { String [] colnames = new String[] {"value", "count", "hadmaster", "id"}; metadata = new GenericRecordMetadata("normalize.master", colnames); genrec = new GenericRecord(metadata); genrec.init(); genrec.set(colnames); firsttime = true; keytext.set("column"); inmetadata = getNonmatchingMetadata(); } // setup() public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { if (firsttime) { context.write(keytext, genrec); firsttime = false; } boolean hasmaster = false; long count = 0; long id = -1; for (GenericRecord val : values) { val._setMetadata(inmetadata); count += Long.parseLong(val.get("count")); if (val.get("ismaster").compareTo("1") == 0) { hasmaster = true; id = Long.parseLong(val.get("key")); } } if (! hasmaster) { String [] parts = key.toString().split(":"); keytext.set(parts[0]); genrec.init(); genrec.set("value", parts[1]); genrec.set("count", Long.toString(count)); genrec.set("hadmaster", Boolean.toString(hasmaster)); genrec.set("id", Long.toString(id)); context.write(keytext, genrec); } } // reduce } // class GetNonmatchingReduce // Step 3: Calculates the maximum id value for each column // This is straight-forward map-reduce code public static class CalcMaxIdMap extends Mapper { static Text keytext = new Text(""); static LongWritable lw = new LongWritable(); public void map(Text key, GenericRecord value, Context context) throws IOException, InterruptedException { keytext.set(value.get("column")); lw.set(Long.parseLong(value.get("key"))); context.write(keytext, lw); } // map() } // class CalcMaxIdMap() public static class CalcMaxIdReduce extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long maxval = Long.MIN_VALUE; for (LongWritable val : values) { long l = val.get(); if (l > maxval) { maxval = l; } } context.write(key, new LongWritable(maxval)); } // reduce } // class CalcMaxidReduce // This is the map job for the first pass of calculating the ids for the unmatched records. // Overall, this works by calculating a rownumber for each row, and then adding in the offset // calculated in the previous step. // This map reads in the data and does two things: // a) saving the original data using partition id and local row number as key // b) calculating the total for each partition // Step 4: Assigns a new key value to the unmatched column value // This is basically by appending the row number onto each unmatched // : pair, and then adding the maximum id // for the column onto that value. // Calculating row numbers is a two step process. The first step // appends partition number and local row number to the rows. // The reduce calculates the maximum row number for each partition, // which is added to the local row number in the second phase // Map: Appends a new key to each unmatched : pair // This key has the partition id and local row number. The results are // actually saved in a sequence file. // The map just outputs the maximum local row number value, in the // cleanup() routine // Output key: : // Value: local row number // Reduce: Calculates the overall master public static class RowNumberPhase1Map extends Mapper { static SequenceFile.Writer sfw; static IntWritable partitionid = new IntWritable(0); static Text outkey = new Text(""); static HashMap hm = new HashMap(); // This configure is used to set up the map job. It extracts // information from the configuration and opens the sequence file. public void setup (Context context) { String saverecordsdir; partitionid.set(context.getConfiguration().getInt("mapred.task.partition", 0)); saverecordsdir = new String(context.getConfiguration().get(PARAMETER_saverecordsdir)); if (saverecordsdir.endsWith("/")) { saverecordsdir.substring(0, saverecordsdir.length() - 1); } try { FileSystem fs = FileSystem.get(context.getConfiguration()); sfw = SequenceFile.createWriter(fs, context.getConfiguration(), new Path(saverecordsdir+"/"+String.format("records%05d", partitionid.get())), Text.class, Text.class); } catch (Exception e) { e.printStackTrace(); } } // setup public void map(Text key, GenericRecord value, Context context) throws IOException, InterruptedException { String colname = value.get("column"); Long rownum = (hm.containsKey(colname) ? hm.get(colname) : 1); hm.put(colname, rownum+1); outkey.set(partitionid.toString()+":" + colname + ";" + rownum); Text valtext = new Text(value.toString()); sfw.append(outkey, valtext); } // map() public void cleanup (Context context) throws IOException, InterruptedException { Text keytext = new Text(""); for (Map.Entry val : hm.entrySet()) { keytext.set(partitionid.toString()+":"+val.getKey()); context.write(keytext, new LongWritable(val.getValue())); } sfw.close(); } } // class RowNumberPhase1Map() // This reduce counts the number of records in each partition by // taking the maximum of the row numbers. public static class RowNumberPhase1Reduce extends Reducer { public void reduce(Text key, Iterator values, Context context) throws IOException, InterruptedException { LongWritable maxval = new LongWritable(Long.MIN_VALUE); while (values.hasNext()) { long val = values.next().get(); if (maxval.get() < val) { maxval.set(val); } } context.write(key, maxval); } } // RowNumberPhase1Reduce() // Step 4: This is phase 2 of calculating the id. It adds the partition and column-specifc // offset back to the local row number. // The particular offset is calculated in the master program and passed in // using parameters. public static class RowNumberPhase2Map extends Mapper { HashMap offsets = new HashMap(); static Text keytext = new Text(""); static GenericRecordMetadata metadata; static GenericRecord genrec; public void setup(Context context) { // First, get all the columns Configuration conf = context.getConfiguration(); int numvals = 0; numvals = conf.getInt(PARAMETER_cumsum_numvals, 0); offsets.clear(); for (int i = 0; i < numvals; i++) { String val = conf.get(PARAMETER_cumsum_nthvalue + i); String[] parts = val.split("\t"); offsets.put(parts[0], Long.parseLong(parts[2])); } String [] colnames = new String[]{"column", "value", "count"}; metadata = new GenericRecordMetadata("masterkey", colnames); genrec = new GenericRecord(metadata); genrec.set(colnames); // We do *not* put the column names at the top of the file, since there may be more than // map task. } // setup() public void map(Text key, Text value, Context context) throws IOException, InterruptedException { String[] parts = key.toString().split(";"); long newid = Long.parseLong(parts[1]) + offsets.get(parts[0]); keytext.set(Long.toString(newid)); String [] valpieces = value.toString().split("\t"); genrec.init(); genrec.set("column", parts[0].split(":")[1]); genrec.set("value", valpieces[1]); genrec.set("count", valpieces[2]); context.write(keytext, new Text(genrec.toString())); } // map() } // class RowNumberPhase2Map public static class RowNumberPhase2Reduce extends Reducer { public void setup(Context context) { try { context.write(new Text("id"), new Text("column\tvalue\tcount")); } catch (Exception e) { e.printStackTrace(); } } // setup() public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (Text val : values) { context.write(key, val); } } } // RowNumberPhase2Reduce() // Step 5: Merges new column value ids into the master file public static class MergeMap extends Mapper { static Text keytext = new Text(""); static Text valtext = new Text(""); public void map(Text key, GenericRecord value, Context context) throws IOException, InterruptedException { String val = value.toString(); String [] pieces = val.split("\t"); keytext.set(pieces[0]); StringBuffer sb = new StringBuffer(); for (int i = 1; i < pieces.length; i++) { if (i > 1) { sb.append("\t"); } sb.append(pieces[i]); } valtext.set(sb.toString()); context.write(keytext, valtext); } // map() } // class MergeMap public static class MergeReduce extends Reducer { public void setup(Context context) { try { context.write(new Text("id"), new Text("column\tvalue\tcount")); } catch (Exception e) { e.printStackTrace(); } } // setup() public void reduce(Text key, Iterator values, Context context) throws IOException, InterruptedException { while (values.hasNext()) { //values.next(); context.write(key, values.next()); } } } // MergeReduce() // This map denormalizes all columns, with the key being "column:value": // * ismaster flag // * for nonmasters the partitionid/rownumber (to identify the records again) (or empty for the master key) // * for nonmasters the column number // * for masters, the key public static class MatchColumnsMap extends Mapper { static Text keytext = new Text(""); static Text valtext = new Text(""); static String partitionid; static long rownumber = 0; static boolean ismaster = false; static HashMap usecolumns = new HashMap(); static int numcolumns = 0; public void setup (Context context) { FileSplit fs = (FileSplit) context.getInputSplit(); String path = fs.getPath().toString(); if ((path.indexOf("/master") >= 0) || (path.indexOf("/newmaster") >= 0)) { ismaster = true; } else ismaster = false; Configuration conf = context.getConfiguration(); partitionid = Integer.toString(conf.getInt("mapred.task.partition", 0)); rownumber = 0; numcolumns = conf.getInt(PARAMETER_usecolumns_numvals, 0); if (numcolumns > 0) { for (int i = 0; i < numcolumns; i++) { usecolumns.put(conf.get(PARAMETER_usecolumns_nthvalue+i), i); } } } // setup() public void map(Text key, GenericRecord value, Context context) throws IOException, InterruptedException { for (Map.Entry kv : value.getMetadata().entrySet()) { if (ismaster) { keytext.set(value.get("column")+":"+value.get("value")); valtext.set("master\t" + value.get("id")); context.write(keytext, valtext); } else { String colname = kv.getKey(); boolean expectmaster = ((numcolumns == 0) || usecolumns.containsKey(colname)); keytext.set(colname+":"+value.get(kv.getValue())); valtext.set((expectmaster ? "expect" : "nomaster") + "\t" + partitionid+":"+Long.toString(rownumber) + "\t" + kv.getValue().toString()); context.write(keytext, valtext); } } if (! ismaster) rownumber++; } // map() } // class MatchColumnsMap public static class MatchColumnsReduce extends Reducer { static Text keytext = new Text(""); static Text valtext = new Text(""); public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { boolean foundmaster = false; ArrayList unmatchedvals = new ArrayList(); String idstr = ""; String colval = key.toString().split(":")[1]; for (Text valt : values) { String val = valt.toString(); if (val.startsWith("nomaster")) { String [] pieces = val.split("\t"); keytext.set(pieces[1]); valtext.set(pieces[2] + "\t" + colval); context.write(keytext, valtext); } else if (val.startsWith("master")) { String [] pieces = val.split("\t"); idstr = pieces[1]; foundmaster = true; } else if (foundmaster) { String [] pieces = val.split("\t"); keytext.set(pieces[1]); valtext.set(pieces[2] + "\t" + idstr); context.write(keytext, valtext); } else { unmatchedvals.add(new String(val)); } } if (foundmaster) { for (String val : unmatchedvals) { String [] pieces = val.split("\t"); keytext.set(pieces[1]); valtext.set(pieces[2] + "\t" + idstr); context.write(keytext, valtext); } } else { for (String val : unmatchedvals) { String [] pieces = val.split("\t"); keytext.set(pieces[1]); valtext.set(pieces[2] + "\t" + colval); context.write(keytext, valtext); } } } } // class MatchColumnsReduce public static class PutTogetherMap extends Mapper { static Text keytext = new Text(""); static Text valtext = new Text(""); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String [] pieces = value.toString().split("\t"); keytext.set(pieces[0]); valtext.set(pieces[1] + "\t" + pieces[2]); context.write(keytext, valtext); } // map() } // class PutTogetherMap public static class PutTogetherReduce extends Reducer { static Text keytext = new Text(""); static Text valtext = new Text(""); public void setup (Context context) { try { Configuration conf = context.getConfiguration(); String fname = conf.get(PARAMETER_original_dataformatfile); FileSystem fs = FileSystem.get(conf); FSDataInputStream ins = fs.open(new Path(fname)); String line = ins.readUTF(); String [] pieces = line.split("\t"); String outval = ""; for (int i = 1; i < pieces.length; i++) { outval = (i == 1 ? "" : outval + "\t") + pieces[i]; } keytext.set(pieces[0]); valtext.set(outval); context.write(keytext, valtext); ins.close(); } catch (Exception e) { e.printStackTrace(); } } // setup() public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { String [] parts = new String[1000]; int maxpart = 0; for (Text valt : values) { String [] pieces = valt.toString().split("\t"); int pos = Integer.parseInt(pieces[0]); if (pos > maxpart) { maxpart = pos; } if (pos == 0) { keytext.set(pieces[1]); } else parts[pos] = new String((pos > 0 ? "\t" : "") + pieces[1]); } String outstr = ""; for (int i = 1; i <= maxpart; i++) { outstr = outstr + parts[i]; } valtext.set(outstr); context.write(keytext, valtext); } } // class PutTogetherReduce public static void main(String[] args) { try { Configuration conf = new Configuration(); FileSystem fs; FileStatus[] files; int numvals; String uniquecolumnvaluespath = "normalize/tmp/newcolumnvalues"; String nonmatchingcolumnvaluespath = "normalize/tmp/nonmatching"; String matchingcolumnvaluespath = "normalize/tmp/matching"; String saverecordsdir = "normalize/tmp/saverecords"; String keysummaryoutput = "normalize/tmp/keysummaryoutput"; String unmatchedwithidspath = "normalize/tmp/unmatchedwithids"; String newidmasterpath = "normalize/tmp/newmaster"; String formatchingtmppath = "normalize/tmp/formatching"; String originaldataformatpath = "normalize/tmp/columnnames.txt"; String lookupmatchespath = args[2]; String finaloutputpath = args[1]; // args[3] has the list of columns if (args.length > 3) { String [] colpieces = args[3].split(","); for (int i = 0; i < colpieces.length; i++) { conf.set(PARAMETER_usecolumns_nthvalue + i, colpieces[i]); } conf.setInt(PARAMETER_usecolumns_numvals, colpieces.length); } conf.set(PARAMETER_original_dataformatfile, originaldataformatpath); // Before running the job, delete the output files fs = FileSystem.get(conf); fs.delete(new Path(originaldataformatpath), true); fs.delete(new Path(formatchingtmppath), true); fs.delete(new Path(finaloutputpath), true); fs.delete(new Path(newidmasterpath), true); fs.delete(new Path(saverecordsdir), true); fs.delete(new Path(keysummaryoutput), true); fs.delete(new Path(uniquecolumnvaluespath), true); fs.delete(new Path(args[1]), true); fs.delete(new Path(nonmatchingcolumnvaluespath), true); fs.delete(new Path(matchingcolumnvaluespath), true); fs.delete(new Path(unmatchedwithidspath), true); // master key file is in args[2] { // Step 1 Job j = new Job(conf, "Step 1: Extract unique column values from appropriate columns"); j.setJarByClass(Normalize.class); j.setOutputKeyClass(Text.class); j.setOutputValueClass(GenericRecord.class); j.setMapOutputKeyClass(Text.class); j.setMapOutputValueClass(LongWritable.class); j.setInputFormatClass(GenericRecordInputFormat.class); FileInputFormat.addInputPath(j, new Path(args[0])); FileOutputFormat.setOutputPath(j, new Path(uniquecolumnvaluespath)); j.setMapperClass(Normalize.ColumnValueMap.class); j.setCombinerClass(Normalize.ColumnValueCombine.class); j.setReducerClass(Normalize.ColumnValueReduce.class); System.out.println("RUNNING: "+j.getJobName()); j.waitForCompletion(true); } { Job j = new Job(conf, "Step 2: Find non-matching values"); j.setJarByClass(Normalize.class); j.setOutputKeyClass(Text.class); j.setOutputValueClass(GenericRecord.class); j.setMapOutputKeyClass(Text.class); j.setMapOutputValueClass(GenericRecord.class); j.setInputFormatClass(GenericRecordInputFormat.class); FileInputFormat.addInputPath(j, new Path(uniquecolumnvaluespath)); FileInputFormat.addInputPath(j, new Path(lookupmatchespath)); FileOutputFormat.setOutputPath(j, new Path(nonmatchingcolumnvaluespath)); j.setMapperClass(Normalize.GetNonmatchingMap.class); j.setReducerClass(Normalize.GetNonmatchingReduce.class); System.out.println("RUNNING: "+j.getJobName()); j.waitForCompletion(true); } { Job j = new Job(conf, "Step 3: Find maximum id for each column"); j.setJarByClass(Normalize.class); j.setOutputKeyClass(Text.class); j.setOutputValueClass(LongWritable.class); j.setMapOutputKeyClass(Text.class); j.setMapOutputValueClass(LongWritable.class); j.setInputFormatClass(GenericRecordInputFormat.class); FileInputFormat.addInputPath(j, new Path(lookupmatchespath)); FileOutputFormat.setOutputPath(j, new Path(matchingcolumnvaluespath)); j.setMapperClass(Normalize.CalcMaxIdMap.class); j.setCombinerClass(Normalize.CalcMaxIdReduce.class); j.setReducerClass(Normalize.CalcMaxIdReduce.class); System.out.println("RUNNING: "+j.getJobName()); j.waitForCompletion(true); } files = fs.globStatus(new Path(matchingcolumnvaluespath+ "/p*")); numvals = 0; HashMap hmIdOffsets = new HashMap(); for (FileStatus fstat : files) { FSDataInputStream fsdis = fs.open(fstat.getPath()); //BufferedReader br = new BufferedReader(new FileReader(fstat.getPath().toString())); String line = ""; while ((line = fsdis.readLine()) != null) { conf.set(PARAMETER_maxvals_nthvalue + numvals++, line); String [] pieces = line.split("\t"); hmIdOffsets.put(pieces[0], Long.parseLong(pieces[1])); } fsdis.close(); } conf.setInt(PARAMETER_maxvals_numvals, numvals); // This code block just prints out the offsets for (int i = 0; i < numvals; i++) { System.out.println("maxvals[" + i + "] = " + conf.get("normalize.maxvals." + i)); } { // Step 4, part 1 conf.set(PARAMETER_saverecordsdir, saverecordsdir); Job j = new Job(conf, "Step 4: Set ids for unmatched columns (part 1)"); j.setJarByClass(Normalize.class); j.setOutputKeyClass(Text.class); j.setOutputValueClass(LongWritable.class); j.setMapOutputKeyClass(Text.class); j.setMapOutputValueClass(LongWritable.class); j.setInputFormatClass(GenericRecordInputFormat.class); FileInputFormat.addInputPath(j, new Path(nonmatchingcolumnvaluespath)); FileOutputFormat.setOutputPath(j, new Path(keysummaryoutput)); j.setMapperClass(Normalize.RowNumberPhase1Map.class); j.setReducerClass(Normalize.RowNumberPhase1Reduce.class); System.out.println("RUNNING: "+j.getJobName()); j.waitForCompletion(true); } // Now read the results in the key summary file files = fs.globStatus(new Path(keysummaryoutput+ "/p*")); HashMap hmcumsums = new HashMap(); numvals = 0; for (FileStatus fstat : files) { FSDataInputStream fsdis = fs.open(fstat.getPath()); String line = ""; while ((line = fsdis.readLine()) != null) { String [] pieces = line.split("\t"); String colname = pieces[0].split(":")[1]; long cumsum; if (hmcumsums.containsKey(colname)) { cumsum = hmcumsums.get(colname); } else { cumsum = (hmIdOffsets.containsKey(colname) ? hmIdOffsets.get(colname) : 0); } conf.set(PARAMETER_cumsum_nthvalue + numvals++, line + "\t" + cumsum); String[] vals = line.split("\t"); hmcumsums.put(colname, cumsum + Long.parseLong(vals[1])); } } conf.setInt(PARAMETER_cumsum_numvals, numvals); for (int i = 0; i < numvals; i++) { System.out.println("cumsum[" + i + "] = " + conf.get(PARAMETER_cumsum_nthvalue + i)); } { Job j = new Job(conf, "Step 4: Set ids for unmatched columns (part 2)"); j.setJarByClass(Normalize.class); j.setOutputKeyClass(Text.class); j.setOutputValueClass(Text.class); j.setMapOutputKeyClass(Text.class); j.setMapOutputValueClass(Text.class); j.setInputFormatClass(SequenceFileInputFormat.class); FileInputFormat.addInputPath(j, new Path(saverecordsdir)); FileOutputFormat.setOutputPath(j, new Path(unmatchedwithidspath)); j.setMapperClass(Normalize.RowNumberPhase2Map.class); j.setReducerClass(Normalize.RowNumberPhase2Reduce.class); j.setNumReduceTasks(5); System.out.println("RUNNING: "+j.getJobName()); j.waitForCompletion(true); } // Next merge { // Step 5 Job j = new Job(conf, "Step 5: Merge results from (4) with master keys"); j.setJarByClass(Normalize.class); j.setOutputKeyClass(Text.class); j.setOutputValueClass(Text.class); j.setMapOutputKeyClass(Text.class); j.setMapOutputValueClass(Text.class); j.setInputFormatClass(GenericRecordInputFormat.class); FileInputFormat.addInputPath(j, new Path(unmatchedwithidspath)); FileInputFormat.addInputPath(j, new Path(lookupmatchespath)); FileOutputFormat.setOutputPath(j, new Path(newidmasterpath)); j.setMapperClass(Normalize.MergeMap.class); j.setReducerClass(Normalize.MergeReduce.class); j.setNumReduceTasks(4); System.out.println("RUNNING: "+j.getJobName()); j.waitForCompletion(true); } { // Step 6, part 1 Job j = new Job(conf, "Step 6: Match back the original data, step 1"); j.setJarByClass(Normalize.class); j.setOutputKeyClass(Text.class); j.setOutputValueClass(Text.class); j.setMapOutputKeyClass(Text.class); j.setMapOutputValueClass(Text.class); j.setInputFormatClass(GenericRecordInputFormat.class); FileInputFormat.addInputPath(j, new Path(newidmasterpath)); FileInputFormat.addInputPath(j, new Path(args[0])); FileOutputFormat.setOutputPath(j, new Path(formatchingtmppath)); j.setMapperClass(Normalize.MatchColumnsMap.class); j.setReducerClass(Normalize.MatchColumnsReduce.class); System.out.println("RUNNING: "+j.getJobName()); j.waitForCompletion(true); } { // Step 6, part 2 Job j = new Job(conf, "Step 6: Match back the original data, step 2"); j.setJarByClass(Normalize.class); j.setOutputKeyClass(Text.class); j.setOutputValueClass(Text.class); j.setMapOutputKeyClass(Text.class); j.setMapOutputValueClass(Text.class); j.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(j, new Path(formatchingtmppath)); FileOutputFormat.setOutputPath(j, new Path(finaloutputpath)); j.setMapperClass(Normalize.PutTogetherMap.class); j.setReducerClass(Normalize.PutTogetherReduce.class); System.out.println("RUNNING: "+j.getJobName()); j.waitForCompletion(true); } } catch (Exception e) { e.printStackTrace(); } } // main } // class Normalize