package normalize;
/**
*
* @author Gordon S. Linoff ([email protected])
* December 2009
*/
/* This set of Hadoop jobs normalizes the data in a given set of columns
* by doing the following:
*
* (1) It extracts all values the columns being normalized.
*
* (2) It matches the extracted list to the master list, keeping only the column values
* that do not match
* Note: this step is not combined with (1) so this step only uses data formats
* created by this process
*
* (3) Read the master key file again to calculate the maximum key value for each column
* a) read the maximum values back into the conf environment variables
*
* (4) It assigns a new key value to the unmatched column value
* a) first, by assigning a rownumber to the unmatched column values
* b) and then by adding the column master key offset back in (from 3a)
*
* (5) It merges the new keys with the existing keys
* The goal here is probably to split the file into multiple pieces,
* each about the same size.
*
* (6) It processes the original file and the new master to insert the new keys
* for each column value
*/
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.ArrayList;
import java.io.*;
import java.io.BufferedReader.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Normalize {
static final String PARAMETER_saverecordsdir = "normalize.saverecordsdir";
static final String PARAMETER_cumsum_numvals = "normalize.cumsum.numvals"; // number of partitions in the first pass
static final String PARAMETER_cumsum_nthvalue = "normalize.cumsum."; // offset for each partition
static final String PARAMETER_maxvals_numvals = "normalize.maxvals.numvals";
static final String PARAMETER_maxvals_nthvalue = "normalize.maxvals.";
static final String PARAMETER_usecolumns_nthvalue = "normalize.usecolumns.";
static final String PARAMETER_usecolumns_numvals = "normalize.usecolumns.numvals";
static final String PARAMETER_original_dataformatfile = "normalize.dataformatfilename";
// Step 1: Extract the column names and values
// Reads the data records
// Map: Reads a generic record and creates outputs for all columns being normalized
// Output key: :
// Value: 1 as a number
// Combine: Adds the values for column value pairs to count the number
// Output key: :
// Value: count
// Reduce: Adds the values for column value pairs to count the number
// Output key: :
// Value: GenericRecord with , ,
public static class ColumnValueMap extends Mapper {
static Text keytext = new Text("");
static final LongWritable valone = new LongWritable(1);
static String [] usecolumns;
static int numcolumns = 0;
static boolean output_column_names = false;
public void setup (Context context) {
Configuration conf = context.getConfiguration();
numcolumns = conf.getInt(PARAMETER_usecolumns_numvals, 0);
if (numcolumns > 0) {
usecolumns = new String[numcolumns];
for (int i = 0; i < numcolumns; i++) {
usecolumns[i] = conf.get(PARAMETER_usecolumns_nthvalue+i);
}
}
if (conf.getInt("mapred.task.partition", -1) == 0) {
output_column_names = true;
}
} // setup()
public void map(Text key, GenericRecord value, Context context)
throws IOException, InterruptedException {
if (output_column_names) {
Configuration conf = context.getConfiguration();
String fname = conf.get(PARAMETER_original_dataformatfile);
GenericRecord genrec = new GenericRecord(value.getMetadata());
genrec.set(value.getMetadata().getColumnNames());
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream outs = fs.create(new Path(fname), true);
outs.writeUTF(genrec.toString());
outs.close();
output_column_names = false;
}
if (numcolumns == 0) {
for (Map.Entry kv : value.getMetadata().entrySet()) {
keytext.set(kv.getKey()+":"+value.get(kv.getValue()));
context.write(keytext, valone);
}
}
else for (int i = 0; i < numcolumns; i++) {
try {
keytext.set(usecolumns[i]+":"+value.get(usecolumns[i]));
context.write(keytext, valone);
}
catch (Exception e) {
}
}
} // map()
} // class ColumnValueMap
public static class ColumnValueCombine extends Reducer {
static LongWritable longval = new LongWritable();
public void combine(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
Long count = 0L;
for (LongWritable val : values) {
count += val.get();
}
longval.set(count);
context.write(key, longval);
}
} // ColumnValueCombine()
public static class ColumnValueReduce extends
Reducer {
private static GenericRecordMetadata metadata;
private static GenericRecord genrec;
private static Text keytext = new Text();
private static boolean firsttime = false;
public void setup(Context context) {
String [] colnames = new String[] {"value", "count"};
metadata = new GenericRecordMetadata("summary", colnames);
genrec = new GenericRecord(metadata);
genrec.init();
genrec.set(colnames);
keytext.set("column");
firsttime = true;
} // setup()
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
if (firsttime) {
context.write(keytext, genrec);
firsttime = false;
}
Long count = 0L;
for (LongWritable val : values) {
count += val.get();
}
genrec.init();
String [] parts = key.toString().split(":");
genrec.set(0, parts[1]);
genrec.set(1, count.toString());
keytext.set(parts[0]);
context.write(keytext, genrec);
}
} // ColumnValueReduce()
// Step 2: Filter to keep only column name values that don't match
// Reads the output from Step 1 and the existing master file
// Map: Identifies the master records and the column value records
// Output key: :
// Value: GenericRecord , ,
// Reduce: Determines whether or not there is a master record for a given :
// combination. Only outputs records when there is no matching master record
// Output key: :
// Value: GenericRecord with "value", "count", "hadmaster", "id"
static GenericRecordMetadata getNonmatchingMetadata() {
String [] colnames = new String[] {"count", "ismaster", "key"};
return new GenericRecordMetadata("normalize.nonmatching", colnames);
} // getNonmatchingMetadata()
public static class GetNonmatchingMap extends Mapper {
static boolean ismaster = false;
static GenericRecord genrec;
static GenericRecordMetadata metadata;
static Text keytext = new Text("");
public void setup(Context context) {
FileSplit fs = (FileSplit) context.getInputSplit();
String path = fs.getPath().toString();
if (path.indexOf("/master") >= 0) {
ismaster = true;
}
else ismaster = false;
metadata = getNonmatchingMetadata();
genrec = new GenericRecord(metadata);
} // setup()
public void map(Text key, GenericRecord value, Context context)
throws IOException, InterruptedException {
genrec.init();
if (ismaster) {
genrec.set("count", value.get("count"));
genrec.set("ismaster", "1");
genrec.set("key", value.get("key"));
keytext.set(value.get("column")+":"+value.get("value"));
}
else {
genrec.set("count", value.get("count"));
genrec.set("ismaster", "0");
genrec.set("key", "");
keytext.set(value.get("column")+":"+value.get("value"));
}
context.write(keytext, genrec);
} // map()
} // class GetNonmatchingMap()
public static class GetNonmatchingReduce extends Reducer {
static Text keytext = new Text("");
static GenericRecord genrec;
static GenericRecordMetadata metadata;
static GenericRecordMetadata inmetadata;
static boolean firsttime = false;
public void setup (Context context) {
String [] colnames = new String[] {"value", "count", "hadmaster", "id"};
metadata = new GenericRecordMetadata("normalize.master", colnames);
genrec = new GenericRecord(metadata);
genrec.init();
genrec.set(colnames);
firsttime = true;
keytext.set("column");
inmetadata = getNonmatchingMetadata();
} // setup()
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
if (firsttime) {
context.write(keytext, genrec);
firsttime = false;
}
boolean hasmaster = false;
long count = 0;
long id = -1;
for (GenericRecord val : values) {
val._setMetadata(inmetadata);
count += Long.parseLong(val.get("count"));
if (val.get("ismaster").compareTo("1") == 0) {
hasmaster = true;
id = Long.parseLong(val.get("key"));
}
}
if (! hasmaster) {
String [] parts = key.toString().split(":");
keytext.set(parts[0]);
genrec.init();
genrec.set("value", parts[1]);
genrec.set("count", Long.toString(count));
genrec.set("hadmaster", Boolean.toString(hasmaster));
genrec.set("id", Long.toString(id));
context.write(keytext, genrec);
}
} // reduce
} // class GetNonmatchingReduce
// Step 3: Calculates the maximum id value for each column
// This is straight-forward map-reduce code
public static class CalcMaxIdMap extends Mapper {
static Text keytext = new Text("");
static LongWritable lw = new LongWritable();
public void map(Text key, GenericRecord value, Context context)
throws IOException, InterruptedException {
keytext.set(value.get("column"));
lw.set(Long.parseLong(value.get("key")));
context.write(keytext, lw);
} // map()
} // class CalcMaxIdMap()
public static class CalcMaxIdReduce extends Reducer {
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
long maxval = Long.MIN_VALUE;
for (LongWritable val : values) {
long l = val.get();
if (l > maxval) {
maxval = l;
}
}
context.write(key, new LongWritable(maxval));
} // reduce
} // class CalcMaxidReduce
// This is the map job for the first pass of calculating the ids for the unmatched records.
// Overall, this works by calculating a rownumber for each row, and then adding in the offset
// calculated in the previous step.
// This map reads in the data and does two things:
// a) saving the original data using partition id and local row number as key
// b) calculating the total for each partition
// Step 4: Assigns a new key value to the unmatched column value
// This is basically by appending the row number onto each unmatched
// : pair, and then adding the maximum id
// for the column onto that value.
// Calculating row numbers is a two step process. The first step
// appends partition number and local row number to the rows.
// The reduce calculates the maximum row number for each partition,
// which is added to the local row number in the second phase
// Map: Appends a new key to each unmatched : pair
// This key has the partition id and local row number. The results are
// actually saved in a sequence file.
// The map just outputs the maximum local row number value, in the
// cleanup() routine
// Output key: :
// Value: local row number
// Reduce: Calculates the overall master
public static class RowNumberPhase1Map extends Mapper {
static SequenceFile.Writer sfw;
static IntWritable partitionid = new IntWritable(0);
static Text outkey = new Text("");
static HashMap hm = new HashMap();
// This configure is used to set up the map job. It extracts
// information from the configuration and opens the sequence file.
public void setup (Context context) {
String saverecordsdir;
partitionid.set(context.getConfiguration().getInt("mapred.task.partition", 0));
saverecordsdir = new String(context.getConfiguration().get(PARAMETER_saverecordsdir));
if (saverecordsdir.endsWith("/")) {
saverecordsdir.substring(0, saverecordsdir.length() - 1);
}
try {
FileSystem fs = FileSystem.get(context.getConfiguration());
sfw = SequenceFile.createWriter(fs, context.getConfiguration(),
new Path(saverecordsdir+"/"+String.format("records%05d", partitionid.get())),
Text.class, Text.class);
} catch (Exception e) {
e.printStackTrace();
}
} // setup
public void map(Text key, GenericRecord value, Context context)
throws IOException, InterruptedException {
String colname = value.get("column");
Long rownum = (hm.containsKey(colname) ? hm.get(colname) : 1);
hm.put(colname, rownum+1);
outkey.set(partitionid.toString()+":" + colname + ";" + rownum);
Text valtext = new Text(value.toString());
sfw.append(outkey, valtext);
} // map()
public void cleanup (Context context) throws IOException, InterruptedException {
Text keytext = new Text("");
for (Map.Entry val : hm.entrySet()) {
keytext.set(partitionid.toString()+":"+val.getKey());
context.write(keytext, new LongWritable(val.getValue()));
}
sfw.close();
}
} // class RowNumberPhase1Map()
// This reduce counts the number of records in each partition by
// taking the maximum of the row numbers.
public static class RowNumberPhase1Reduce extends
Reducer {
public void reduce(Text key, Iterator values, Context context)
throws IOException, InterruptedException {
LongWritable maxval = new LongWritable(Long.MIN_VALUE);
while (values.hasNext()) {
long val = values.next().get();
if (maxval.get() < val) {
maxval.set(val);
}
}
context.write(key, maxval);
}
} // RowNumberPhase1Reduce()
// Step 4: This is phase 2 of calculating the id. It adds the partition and column-specifc
// offset back to the local row number.
// The particular offset is calculated in the master program and passed in
// using parameters.
public static class RowNumberPhase2Map extends Mapper {
HashMap offsets = new HashMap();
static Text keytext = new Text("");
static GenericRecordMetadata metadata;
static GenericRecord genrec;
public void setup(Context context) {
// First, get all the columns
Configuration conf = context.getConfiguration();
int numvals = 0;
numvals = conf.getInt(PARAMETER_cumsum_numvals, 0);
offsets.clear();
for (int i = 0; i < numvals; i++) {
String val = conf.get(PARAMETER_cumsum_nthvalue + i);
String[] parts = val.split("\t");
offsets.put(parts[0], Long.parseLong(parts[2]));
}
String [] colnames = new String[]{"column", "value", "count"};
metadata = new GenericRecordMetadata("masterkey", colnames);
genrec = new GenericRecord(metadata);
genrec.set(colnames);
// We do *not* put the column names at the top of the file, since there may be more than
// map task.
} // setup()
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
String[] parts = key.toString().split(";");
long newid = Long.parseLong(parts[1]) + offsets.get(parts[0]);
keytext.set(Long.toString(newid));
String [] valpieces = value.toString().split("\t");
genrec.init();
genrec.set("column", parts[0].split(":")[1]);
genrec.set("value", valpieces[1]);
genrec.set("count", valpieces[2]);
context.write(keytext, new Text(genrec.toString()));
} // map()
} // class RowNumberPhase2Map
public static class RowNumberPhase2Reduce extends Reducer {
public void setup(Context context) {
try {
context.write(new Text("id"), new Text("column\tvalue\tcount"));
}
catch (Exception e) {
e.printStackTrace();
}
} // setup()
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
for (Text val : values) {
context.write(key, val);
}
}
} // RowNumberPhase2Reduce()
// Step 5: Merges new column value ids into the master file
public static class MergeMap extends Mapper {
static Text keytext = new Text("");
static Text valtext = new Text("");
public void map(Text key, GenericRecord value, Context context)
throws IOException, InterruptedException {
String val = value.toString();
String [] pieces = val.split("\t");
keytext.set(pieces[0]);
StringBuffer sb = new StringBuffer();
for (int i = 1; i < pieces.length; i++) {
if (i > 1) {
sb.append("\t");
}
sb.append(pieces[i]);
}
valtext.set(sb.toString());
context.write(keytext, valtext);
} // map()
} // class MergeMap
public static class MergeReduce extends Reducer {
public void setup(Context context) {
try {
context.write(new Text("id"), new Text("column\tvalue\tcount"));
}
catch (Exception e) {
e.printStackTrace();
}
} // setup()
public void reduce(Text key, Iterator values, Context context)
throws IOException, InterruptedException {
while (values.hasNext()) { //values.next();
context.write(key, values.next());
}
}
} // MergeReduce()
// This map denormalizes all columns, with the key being "column:value":
// * ismaster flag
// * for nonmasters the partitionid/rownumber (to identify the records again) (or empty for the master key)
// * for nonmasters the column number
// * for masters, the key
public static class MatchColumnsMap extends Mapper {
static Text keytext = new Text("");
static Text valtext = new Text("");
static String partitionid;
static long rownumber = 0;
static boolean ismaster = false;
static HashMap usecolumns = new HashMap();
static int numcolumns = 0;
public void setup (Context context) {
FileSplit fs = (FileSplit) context.getInputSplit();
String path = fs.getPath().toString();
if ((path.indexOf("/master") >= 0) || (path.indexOf("/newmaster") >= 0)) {
ismaster = true;
}
else ismaster = false;
Configuration conf = context.getConfiguration();
partitionid = Integer.toString(conf.getInt("mapred.task.partition", 0));
rownumber = 0;
numcolumns = conf.getInt(PARAMETER_usecolumns_numvals, 0);
if (numcolumns > 0) {
for (int i = 0; i < numcolumns; i++) {
usecolumns.put(conf.get(PARAMETER_usecolumns_nthvalue+i), i);
}
}
} // setup()
public void map(Text key, GenericRecord value, Context context)
throws IOException, InterruptedException {
for (Map.Entry kv : value.getMetadata().entrySet()) {
if (ismaster) {
keytext.set(value.get("column")+":"+value.get("value"));
valtext.set("master\t" + value.get("id"));
context.write(keytext, valtext);
}
else {
String colname = kv.getKey();
boolean expectmaster = ((numcolumns == 0) || usecolumns.containsKey(colname));
keytext.set(colname+":"+value.get(kv.getValue()));
valtext.set((expectmaster ? "expect" : "nomaster") + "\t" +
partitionid+":"+Long.toString(rownumber) + "\t" +
kv.getValue().toString());
context.write(keytext, valtext);
}
}
if (! ismaster) rownumber++;
} // map()
} // class MatchColumnsMap
public static class MatchColumnsReduce extends Reducer {
static Text keytext = new Text("");
static Text valtext = new Text("");
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
boolean foundmaster = false;
ArrayList unmatchedvals = new ArrayList();
String idstr = "";
String colval = key.toString().split(":")[1];
for (Text valt : values) {
String val = valt.toString();
if (val.startsWith("nomaster")) {
String [] pieces = val.split("\t");
keytext.set(pieces[1]);
valtext.set(pieces[2] + "\t" + colval);
context.write(keytext, valtext);
}
else if (val.startsWith("master")) {
String [] pieces = val.split("\t");
idstr = pieces[1];
foundmaster = true;
}
else if (foundmaster) {
String [] pieces = val.split("\t");
keytext.set(pieces[1]);
valtext.set(pieces[2] + "\t" + idstr);
context.write(keytext, valtext);
}
else {
unmatchedvals.add(new String(val));
}
}
if (foundmaster) {
for (String val : unmatchedvals) {
String [] pieces = val.split("\t");
keytext.set(pieces[1]);
valtext.set(pieces[2] + "\t" + idstr);
context.write(keytext, valtext);
}
}
else {
for (String val : unmatchedvals) {
String [] pieces = val.split("\t");
keytext.set(pieces[1]);
valtext.set(pieces[2] + "\t" + colval);
context.write(keytext, valtext);
}
}
}
} // class MatchColumnsReduce
public static class PutTogetherMap extends Mapper {
static Text keytext = new Text("");
static Text valtext = new Text("");
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String [] pieces = value.toString().split("\t");
keytext.set(pieces[0]);
valtext.set(pieces[1] + "\t" + pieces[2]);
context.write(keytext, valtext);
} // map()
} // class PutTogetherMap
public static class PutTogetherReduce extends Reducer {
static Text keytext = new Text("");
static Text valtext = new Text("");
public void setup (Context context) {
try {
Configuration conf = context.getConfiguration();
String fname = conf.get(PARAMETER_original_dataformatfile);
FileSystem fs = FileSystem.get(conf);
FSDataInputStream ins = fs.open(new Path(fname));
String line = ins.readUTF();
String [] pieces = line.split("\t");
String outval = "";
for (int i = 1; i < pieces.length; i++) {
outval = (i == 1 ? "" : outval + "\t") + pieces[i];
}
keytext.set(pieces[0]);
valtext.set(outval);
context.write(keytext, valtext);
ins.close();
} catch (Exception e) {
e.printStackTrace();
}
} // setup()
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
String [] parts = new String[1000];
int maxpart = 0;
for (Text valt : values) {
String [] pieces = valt.toString().split("\t");
int pos = Integer.parseInt(pieces[0]);
if (pos > maxpart) {
maxpart = pos;
}
if (pos == 0) {
keytext.set(pieces[1]);
}
else parts[pos] = new String((pos > 0 ? "\t" : "") + pieces[1]);
}
String outstr = "";
for (int i = 1; i <= maxpart; i++) {
outstr = outstr + parts[i];
}
valtext.set(outstr);
context.write(keytext, valtext);
}
} // class PutTogetherReduce
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
FileSystem fs;
FileStatus[] files;
int numvals;
String uniquecolumnvaluespath = "normalize/tmp/newcolumnvalues";
String nonmatchingcolumnvaluespath = "normalize/tmp/nonmatching";
String matchingcolumnvaluespath = "normalize/tmp/matching";
String saverecordsdir = "normalize/tmp/saverecords";
String keysummaryoutput = "normalize/tmp/keysummaryoutput";
String unmatchedwithidspath = "normalize/tmp/unmatchedwithids";
String newidmasterpath = "normalize/tmp/newmaster";
String formatchingtmppath = "normalize/tmp/formatching";
String originaldataformatpath = "normalize/tmp/columnnames.txt";
String lookupmatchespath = args[2];
String finaloutputpath = args[1];
// args[3] has the list of columns
if (args.length > 3) {
String [] colpieces = args[3].split(",");
for (int i = 0; i < colpieces.length; i++) {
conf.set(PARAMETER_usecolumns_nthvalue + i, colpieces[i]);
}
conf.setInt(PARAMETER_usecolumns_numvals, colpieces.length);
}
conf.set(PARAMETER_original_dataformatfile, originaldataformatpath);
// Before running the job, delete the output files
fs = FileSystem.get(conf);
fs.delete(new Path(originaldataformatpath), true);
fs.delete(new Path(formatchingtmppath), true);
fs.delete(new Path(finaloutputpath), true);
fs.delete(new Path(newidmasterpath), true);
fs.delete(new Path(saverecordsdir), true);
fs.delete(new Path(keysummaryoutput), true);
fs.delete(new Path(uniquecolumnvaluespath), true);
fs.delete(new Path(args[1]), true);
fs.delete(new Path(nonmatchingcolumnvaluespath), true);
fs.delete(new Path(matchingcolumnvaluespath), true);
fs.delete(new Path(unmatchedwithidspath), true);
// master key file is in args[2]
{ // Step 1
Job j = new Job(conf, "Step 1: Extract unique column values from appropriate columns");
j.setJarByClass(Normalize.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(GenericRecord.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(LongWritable.class);
j.setInputFormatClass(GenericRecordInputFormat.class);
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(uniquecolumnvaluespath));
j.setMapperClass(Normalize.ColumnValueMap.class);
j.setCombinerClass(Normalize.ColumnValueCombine.class);
j.setReducerClass(Normalize.ColumnValueReduce.class);
System.out.println("RUNNING: "+j.getJobName());
j.waitForCompletion(true);
}
{
Job j = new Job(conf, "Step 2: Find non-matching values");
j.setJarByClass(Normalize.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(GenericRecord.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(GenericRecord.class);
j.setInputFormatClass(GenericRecordInputFormat.class);
FileInputFormat.addInputPath(j, new Path(uniquecolumnvaluespath));
FileInputFormat.addInputPath(j, new Path(lookupmatchespath));
FileOutputFormat.setOutputPath(j, new Path(nonmatchingcolumnvaluespath));
j.setMapperClass(Normalize.GetNonmatchingMap.class);
j.setReducerClass(Normalize.GetNonmatchingReduce.class);
System.out.println("RUNNING: "+j.getJobName());
j.waitForCompletion(true);
}
{
Job j = new Job(conf, "Step 3: Find maximum id for each column");
j.setJarByClass(Normalize.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(LongWritable.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(LongWritable.class);
j.setInputFormatClass(GenericRecordInputFormat.class);
FileInputFormat.addInputPath(j, new Path(lookupmatchespath));
FileOutputFormat.setOutputPath(j, new Path(matchingcolumnvaluespath));
j.setMapperClass(Normalize.CalcMaxIdMap.class);
j.setCombinerClass(Normalize.CalcMaxIdReduce.class);
j.setReducerClass(Normalize.CalcMaxIdReduce.class);
System.out.println("RUNNING: "+j.getJobName());
j.waitForCompletion(true);
}
files = fs.globStatus(new Path(matchingcolumnvaluespath+ "/p*"));
numvals = 0;
HashMap hmIdOffsets = new HashMap();
for (FileStatus fstat : files) {
FSDataInputStream fsdis = fs.open(fstat.getPath());
//BufferedReader br = new BufferedReader(new FileReader(fstat.getPath().toString()));
String line = "";
while ((line = fsdis.readLine()) != null) {
conf.set(PARAMETER_maxvals_nthvalue + numvals++, line);
String [] pieces = line.split("\t");
hmIdOffsets.put(pieces[0], Long.parseLong(pieces[1]));
}
fsdis.close();
}
conf.setInt(PARAMETER_maxvals_numvals, numvals);
// This code block just prints out the offsets
for (int i = 0; i < numvals; i++) {
System.out.println("maxvals[" + i + "] = "
+ conf.get("normalize.maxvals." + i));
}
{ // Step 4, part 1
conf.set(PARAMETER_saverecordsdir, saverecordsdir);
Job j = new Job(conf, "Step 4: Set ids for unmatched columns (part 1)");
j.setJarByClass(Normalize.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(LongWritable.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(LongWritable.class);
j.setInputFormatClass(GenericRecordInputFormat.class);
FileInputFormat.addInputPath(j, new Path(nonmatchingcolumnvaluespath));
FileOutputFormat.setOutputPath(j, new Path(keysummaryoutput));
j.setMapperClass(Normalize.RowNumberPhase1Map.class);
j.setReducerClass(Normalize.RowNumberPhase1Reduce.class);
System.out.println("RUNNING: "+j.getJobName());
j.waitForCompletion(true);
}
// Now read the results in the key summary file
files = fs.globStatus(new Path(keysummaryoutput+ "/p*"));
HashMap hmcumsums = new HashMap();
numvals = 0;
for (FileStatus fstat : files) {
FSDataInputStream fsdis = fs.open(fstat.getPath());
String line = "";
while ((line = fsdis.readLine()) != null) {
String [] pieces = line.split("\t");
String colname = pieces[0].split(":")[1];
long cumsum;
if (hmcumsums.containsKey(colname)) {
cumsum = hmcumsums.get(colname);
}
else {
cumsum = (hmIdOffsets.containsKey(colname) ? hmIdOffsets.get(colname) : 0);
}
conf.set(PARAMETER_cumsum_nthvalue + numvals++, line + "\t" + cumsum);
String[] vals = line.split("\t");
hmcumsums.put(colname, cumsum + Long.parseLong(vals[1]));
}
}
conf.setInt(PARAMETER_cumsum_numvals, numvals);
for (int i = 0; i < numvals; i++) {
System.out.println("cumsum[" + i + "] = " + conf.get(PARAMETER_cumsum_nthvalue + i));
}
{
Job j = new Job(conf, "Step 4: Set ids for unmatched columns (part 2)");
j.setJarByClass(Normalize.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(Text.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(Text.class);
j.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.addInputPath(j, new Path(saverecordsdir));
FileOutputFormat.setOutputPath(j, new Path(unmatchedwithidspath));
j.setMapperClass(Normalize.RowNumberPhase2Map.class);
j.setReducerClass(Normalize.RowNumberPhase2Reduce.class);
j.setNumReduceTasks(5);
System.out.println("RUNNING: "+j.getJobName());
j.waitForCompletion(true);
}
// Next merge
{ // Step 5
Job j = new Job(conf, "Step 5: Merge results from (4) with master keys");
j.setJarByClass(Normalize.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(Text.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(Text.class);
j.setInputFormatClass(GenericRecordInputFormat.class);
FileInputFormat.addInputPath(j, new Path(unmatchedwithidspath));
FileInputFormat.addInputPath(j, new Path(lookupmatchespath));
FileOutputFormat.setOutputPath(j, new Path(newidmasterpath));
j.setMapperClass(Normalize.MergeMap.class);
j.setReducerClass(Normalize.MergeReduce.class);
j.setNumReduceTasks(4);
System.out.println("RUNNING: "+j.getJobName());
j.waitForCompletion(true);
}
{ // Step 6, part 1
Job j = new Job(conf, "Step 6: Match back the original data, step 1");
j.setJarByClass(Normalize.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(Text.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(Text.class);
j.setInputFormatClass(GenericRecordInputFormat.class);
FileInputFormat.addInputPath(j, new Path(newidmasterpath));
FileInputFormat.addInputPath(j, new Path(args[0]));
FileOutputFormat.setOutputPath(j, new Path(formatchingtmppath));
j.setMapperClass(Normalize.MatchColumnsMap.class);
j.setReducerClass(Normalize.MatchColumnsReduce.class);
System.out.println("RUNNING: "+j.getJobName());
j.waitForCompletion(true);
}
{ // Step 6, part 2
Job j = new Job(conf, "Step 6: Match back the original data, step 2");
j.setJarByClass(Normalize.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(Text.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(Text.class);
j.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(j, new Path(formatchingtmppath));
FileOutputFormat.setOutputPath(j, new Path(finaloutputpath));
j.setMapperClass(Normalize.PutTogetherMap.class);
j.setReducerClass(Normalize.PutTogetherReduce.class);
System.out.println("RUNNING: "+j.getJobName());
j.waitForCompletion(true);
}
} catch (Exception e) {
e.printStackTrace();
}
} // main
} // class Normalize