package normalize;

/**
*
* @author Gordon S. Linoff (gordon@data-miners.com)
* December 2009
*/

/* This set of Hadoop jobs normalizes the data in a given set of columns
 * by doing the following:
 * 
 * (1) It extracts all values the columns being normalized.
 *     
 * (2) It matches the extracted list to the master list, keeping only the column values
 *     that do not match
 *     Note: this step is not combined with (1) so this step only uses data formats
 *     created by this process
 *     
 * (3) Read the master key file again to calculate the maximum key value for each column
 *     a) read the maximum values back into the conf environment variables
 *     
 * (4) It assigns a new key value to the unmatched column value
 *     a) first, by assigning a rownumber to the unmatched column values
 *     b) and then by adding the column master key offset back in (from 3a)
 *     
 * (5) It merges the new keys with the existing keys
 * 		The goal here is probably to split the file into multiple pieces,
 * 		each about the same size.
 * 
 * (6) It processes the original file and the new master to insert the new keys
 *     for each column value
 */


import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.ArrayList;
import java.io.*;
import java.io.BufferedReader.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Normalize {
	static final String PARAMETER_saverecordsdir = "normalize.saverecordsdir";
	static final String PARAMETER_cumsum_numvals = "normalize.cumsum.numvals";	// number of partitions in the first pass
	static final String PARAMETER_cumsum_nthvalue = "normalize.cumsum.";			// offset for each partition
	static final String PARAMETER_maxvals_numvals = "normalize.maxvals.numvals";
	static final String PARAMETER_maxvals_nthvalue = "normalize.maxvals.";
	static final String PARAMETER_usecolumns_nthvalue = "normalize.usecolumns.";
	static final String PARAMETER_usecolumns_numvals = "normalize.usecolumns.numvals";
	static final String PARAMETER_original_dataformatfile = "normalize.dataformatfilename";
	

	// Step 1:  Extract the column names and values
	//          Reads the data records
	// Map:      Reads a generic record and creates outputs for all columns being normalized
	//           Output key:  <column name>:<column value>
	//           Value:  1 as a number
	// Combine:  Adds the values for column value pairs to count the number
	//           Output key:  <column name>:<column value>
	//           Value:  count
	// Reduce:   Adds the values for column value pairs to count the number
	//           Output key:  <column name>:<column value>
	//           Value:  GenericRecord with <column name>, <column value>, <count>
	
	public static class ColumnValueMap extends Mapper<Text, GenericRecord, Text, LongWritable> {
		static Text keytext = new Text("");
		static final LongWritable valone = new LongWritable(1);
		static String [] usecolumns;
		static int numcolumns = 0;
		static boolean output_column_names = false;
		
		public void setup (Context context) {
			Configuration conf = context.getConfiguration();
			numcolumns = conf.getInt(PARAMETER_usecolumns_numvals, 0);
			if (numcolumns > 0) {
				usecolumns = new String[numcolumns];
				for (int i = 0; i < numcolumns; i++) {
					usecolumns[i] = conf.get(PARAMETER_usecolumns_nthvalue+i);
				}
			}
			if (conf.getInt("mapred.task.partition", -1) == 0) {
				output_column_names = true;
			}

		}  // setup()

		public void map(Text key, GenericRecord value, Context context)
		throws IOException, InterruptedException {
			if (output_column_names) {
				Configuration conf = context.getConfiguration();
				String fname = conf.get(PARAMETER_original_dataformatfile);
				GenericRecord genrec = new GenericRecord(value.getMetadata());
				genrec.set(value.getMetadata().getColumnNames());
				FileSystem fs = FileSystem.get(conf);
				FSDataOutputStream outs = fs.create(new Path(fname), true);
				outs.writeUTF(genrec.toString());
				outs.close();
				output_column_names = false;
			}
			if (numcolumns == 0) {
				for (Map.Entry<String, Integer> kv : value.getMetadata().entrySet()) {
					keytext.set(kv.getKey()+":"+value.get(kv.getValue()));
					context.write(keytext, valone);
				}
			}
			else for (int i = 0; i < numcolumns; i++) {
				try {
					keytext.set(usecolumns[i]+":"+value.get(usecolumns[i]));
					context.write(keytext, valone);
				}
				catch (Exception e) {
				}
			}
		} // map()
	} // class ColumnValueMap


	public static class ColumnValueCombine extends Reducer<Text, LongWritable, Text, LongWritable> {
		static LongWritable longval = new LongWritable();

		public void combine(Text key, Iterable<LongWritable> values, Context context)
		throws IOException, InterruptedException {
			Long count = 0L;
			for (LongWritable val : values) {
				count += val.get();
			}
			longval.set(count);
			context.write(key, longval);
		}
	} // ColumnValueCombine()

	public static class ColumnValueReduce extends
	Reducer<Text, LongWritable, Text, GenericRecord> {
		private static GenericRecordMetadata metadata;
		private static GenericRecord genrec;
		private static Text keytext = new Text();
		private static boolean firsttime = false;

		public void setup(Context context) {
			String [] colnames = new String[] {"value", "count"};
			metadata = new GenericRecordMetadata("summary", colnames);
			genrec = new GenericRecord(metadata);
			genrec.init();
			genrec.set(colnames);
			keytext.set("column");
			firsttime = true;
		}  // setup()

		public void reduce(Text key, Iterable<LongWritable> values, Context context)
		throws IOException, InterruptedException {
			if (firsttime) {
				context.write(keytext, genrec);
				firsttime = false;
			}
			Long count = 0L;
			for (LongWritable val : values) {
				count += val.get();
			}
			genrec.init();
			String [] parts = key.toString().split(":");
			genrec.set(0, parts[1]);
			genrec.set(1, count.toString());
			keytext.set(parts[0]);
			context.write(keytext, genrec);
		}
	} // ColumnValueReduce()
	
	
	// Step 2:  Filter to keep only column name values that don't match
	//          Reads the output from Step 1 and the existing master file
	// Map:      Identifies the master records and the column value records 
	//           Output key:  <column name>:<column value>
	//           Value:  GenericRecord <count>, <ismaster>, <key>
	// Reduce:   Determines whether or not there is a master record for a given <column name>:<column value>
	//           combination.  Only outputs records when there is no matching master record
	//           Output key:  <column name>:<column value>
	//           Value:  GenericRecord with "value", "count", "hadmaster", "id"
	
	static GenericRecordMetadata getNonmatchingMetadata() {
		String [] colnames = new String[] {"count", "ismaster", "key"};
		return new GenericRecordMetadata("normalize.nonmatching", colnames);
	}  // getNonmatchingMetadata()
	
	
	public static class GetNonmatchingMap extends Mapper<Text, GenericRecord, Text, GenericRecord> {
		static boolean ismaster = false;
		static GenericRecord genrec;
		static GenericRecordMetadata metadata;
		static Text keytext = new Text("");
		
		public void setup(Context context) {
			FileSplit fs = (FileSplit) context.getInputSplit();
			String path = fs.getPath().toString();
			if (path.indexOf("/master") >= 0) {
				ismaster = true;
			}
			else ismaster = false;
			metadata = getNonmatchingMetadata();
			genrec = new GenericRecord(metadata);
		}  // setup()

		public void map(Text key, GenericRecord value, Context context)
		throws IOException, InterruptedException {
			genrec.init();
			if (ismaster) {
				genrec.set("count", value.get("count"));
				genrec.set("ismaster", "1");
				genrec.set("key", value.get("key"));
				keytext.set(value.get("column")+":"+value.get("value"));
			}
			else {
				genrec.set("count", value.get("count"));
				genrec.set("ismaster", "0");
				genrec.set("key", "");
				keytext.set(value.get("column")+":"+value.get("value"));
			}
			context.write(keytext, genrec);
		} // map()
	}  // class GetNonmatchingMap()
	
	public static class GetNonmatchingReduce extends Reducer<Text, GenericRecord, Text, GenericRecord> {
		static Text keytext = new Text("");
		static GenericRecord genrec;
		static GenericRecordMetadata metadata;
		static GenericRecordMetadata inmetadata;
		static boolean firsttime = false;
		
		public void setup (Context context) {
			String [] colnames = new String[] {"value", "count", "hadmaster", "id"};
			metadata = new GenericRecordMetadata("normalize.master", colnames);
			genrec = new GenericRecord(metadata);
			genrec.init();
			genrec.set(colnames);
			firsttime = true;
			keytext.set("column");
			inmetadata = getNonmatchingMetadata();
			
		} // setup()

		public void reduce(Text key, Iterable<GenericRecord> values, Context context)
		throws IOException, InterruptedException {
			if (firsttime) {
				context.write(keytext, genrec);
				firsttime = false;
			}
			boolean hasmaster = false;
			long count = 0;
			long id = -1;
			for (GenericRecord val : values) {
				val._setMetadata(inmetadata);
				count += Long.parseLong(val.get("count"));
				if (val.get("ismaster").compareTo("1") == 0) {
					hasmaster = true;
					id = Long.parseLong(val.get("key"));
				}
			}
			if (! hasmaster) {
				String [] parts = key.toString().split(":");
				keytext.set(parts[0]);
				genrec.init();
				genrec.set("value", parts[1]);
				genrec.set("count", Long.toString(count));
				genrec.set("hadmaster", Boolean.toString(hasmaster));
				genrec.set("id", Long.toString(id));
				context.write(keytext, genrec);
			}
		}  // reduce
		
	}  // class GetNonmatchingReduce

	
	// Step 3:  Calculates the maximum id value for each column
	//          This is straight-forward map-reduce code
	
	public static class CalcMaxIdMap extends Mapper<Text, GenericRecord, Text, LongWritable> {
		static Text keytext = new Text("");
		static LongWritable lw = new LongWritable();
		public void map(Text key, GenericRecord value, Context context)
		throws IOException, InterruptedException {
			keytext.set(value.get("column"));
			lw.set(Long.parseLong(value.get("key")));
			context.write(keytext, lw);
		} // map()
	}  // class CalcMaxIdMap()
	
	public static class CalcMaxIdReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
		public void reduce(Text key, Iterable<LongWritable> values, Context context)
		throws IOException, InterruptedException {
			long maxval = Long.MIN_VALUE;
			for (LongWritable val : values) {
				long l = val.get();
				if (l > maxval) {
					maxval = l;
				}
			}
			context.write(key, new LongWritable(maxval));
		}  // reduce
		
	}  // class CalcMaxidReduce
	
	// This is the map job for the first pass of calculating the ids for the unmatched records.
	// Overall, this works by calculating a rownumber for each row, and then adding in the offset
	// calculated in the previous step.
	// This map reads in the data and does two things:
	// a) saving the original data using partition id and local row number as key
	// b) calculating the total for each partition
	
	// Step 4:  Assigns a new key value to the unmatched column value
	//          This is basically by appending the row number onto each unmatched
	//          <column name>:<column value> pair, and then adding the maximum id
	//          for the column onto that value.
	//          Calculating row numbers is a two step process.  The first step
	//          appends partition number and local row number to the rows.
	//          The reduce calculates the maximum row number for each partition,
	//          which is added to the local row number in the second phase
	// Map:      Appends a new key to each unmatched <column name>:<column value> pair
	//           This key has the partition id and local row number.  The results are
	//           actually saved in a sequence file.
	//           The map just outputs the maximum local row number value, in the 
	//           cleanup() routine
	//           Output key:  <column name>:<column value>
	//           Value:  local row number
	// Reduce:   Calculates the overall master
	
	public static class RowNumberPhase1Map extends Mapper<Text, GenericRecord, Text, LongWritable> {
		static SequenceFile.Writer sfw;
		static IntWritable partitionid = new IntWritable(0);
		static Text outkey = new Text("");
		static HashMap<String, Long> hm = new HashMap<String, Long>();

		// This configure is used to set up the map job.  It extracts
		// information from the configuration and opens the sequence file.
		public void setup (Context context) {
			String saverecordsdir; 
			partitionid.set(context.getConfiguration().getInt("mapred.task.partition", 0));
			saverecordsdir = new String(context.getConfiguration().get(PARAMETER_saverecordsdir));
			if (saverecordsdir.endsWith("/")) {
			    saverecordsdir.substring(0, saverecordsdir.length() - 1);
			}
			try {
				FileSystem fs = FileSystem.get(context.getConfiguration());
				sfw = SequenceFile.createWriter(fs, context.getConfiguration(),
						new Path(saverecordsdir+"/"+String.format("records%05d", partitionid.get())),
						Text.class,	Text.class);
			} catch (Exception e) {
				e.printStackTrace();
			}
		} // setup

		public void map(Text key, GenericRecord value, Context context)
		throws IOException, InterruptedException {
			String colname = value.get("column");
			Long rownum = (hm.containsKey(colname) ? hm.get(colname) : 1);
			hm.put(colname, rownum+1);
			outkey.set(partitionid.toString()+":" + colname + ";" + rownum);
			Text valtext = new Text(value.toString());
			sfw.append(outkey, valtext);
		} // map()
		
		public void cleanup (Context context) throws IOException, InterruptedException {
			Text keytext = new Text("");
			for (Map.Entry<String, Long> val : hm.entrySet()) {
				keytext.set(partitionid.toString()+":"+val.getKey());
				context.write(keytext, new LongWritable(val.getValue()));
			}
			sfw.close();
		}
	}  // class RowNumberPhase1Map()
	
	// This reduce counts the number of records in each partition by
	// taking the maximum of the row numbers. 
	public static class RowNumberPhase1Reduce extends
			Reducer<Text, LongWritable, Text, LongWritable> {
		
		public void reduce(Text key, Iterator<LongWritable> values, Context context)
				throws IOException, InterruptedException {
			LongWritable maxval = new LongWritable(Long.MIN_VALUE);
			while (values.hasNext()) {
				long val = values.next().get();
				if (maxval.get() < val) {
					maxval.set(val);
				}
			}
			context.write(key, maxval);
		}
	} // RowNumberPhase1Reduce()

	
	// Step 4:  This is phase 2 of calculating the id.  It adds the partition and column-specifc
	//          offset back to the local row number.
	//          The particular offset is calculated in the master program and passed in
	//          using parameters.
	public static class RowNumberPhase2Map extends Mapper<Text, Text, Text, Text> {
		HashMap<String, Long> offsets = new HashMap<String, Long>();
		static Text keytext = new Text("");
		static GenericRecordMetadata metadata;
		static GenericRecord genrec;

		public void setup(Context context) {
			// First, get all the columns
			Configuration conf = context.getConfiguration();
			int numvals = 0;
			numvals = conf.getInt(PARAMETER_cumsum_numvals, 0);
			offsets.clear();
			for (int i = 0; i < numvals; i++) {
				String val = conf.get(PARAMETER_cumsum_nthvalue + i);
				String[] parts = val.split("\t");
				offsets.put(parts[0], Long.parseLong(parts[2]));
			}
			String [] colnames = new String[]{"column", "value", "count"};
			metadata = new GenericRecordMetadata("masterkey", colnames);
			genrec = new GenericRecord(metadata);
			genrec.set(colnames);
			// We do *not* put the column names at the top of the file, since there may be more than
			// map task.
		} // setup()

		public void map(Text key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] parts = key.toString().split(";");
			long newid = Long.parseLong(parts[1]) + offsets.get(parts[0]);
			keytext.set(Long.toString(newid));
			String [] valpieces = value.toString().split("\t");
			genrec.init();
			genrec.set("column", parts[0].split(":")[1]);
			genrec.set("value", valpieces[1]);
			genrec.set("count", valpieces[2]);
			context.write(keytext, new Text(genrec.toString()));
		} // map()
	} // class RowNumberPhase2Map


	public static class RowNumberPhase2Reduce extends Reducer<Text, Text, Text, Text> {
		public void setup(Context context) {
			try {
				context.write(new Text("id"), new Text("column\tvalue\tcount"));
			}
			catch (Exception e) {
				e.printStackTrace();
			}
		}  // setup()
		
		public void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
			for (Text val : values) {
				context.write(key, val);
			}
		}
	} // RowNumberPhase2Reduce()
	
	// Step 5:  Merges new column value ids into the master file

	public static class MergeMap extends Mapper<Text, GenericRecord, Text, Text> {
		static Text keytext = new Text("");
		static Text valtext = new Text("");
		public void map(Text key, GenericRecord value, Context context)
		throws IOException, InterruptedException {
			String val = value.toString();
			String [] pieces = val.split("\t");
			keytext.set(pieces[0]);
			StringBuffer sb = new StringBuffer();
			for (int i = 1; i < pieces.length; i++) {
				if (i > 1) {
					sb.append("\t");
				}
				sb.append(pieces[i]);
			}
			valtext.set(sb.toString());
			context.write(keytext, valtext);
		} // map()
	} // class MergeMap

	public static class MergeReduce extends Reducer<Text, Text, Text, Text> {
		public void setup(Context context) {
			try {
				context.write(new Text("id"), new Text("column\tvalue\tcount"));
			}
			catch (Exception e) {
				e.printStackTrace();
			}
		}  // setup()
		
		public void reduce(Text key, Iterator<Text> values, Context context)
			throws IOException, InterruptedException {
			while (values.hasNext()) { //values.next();
				context.write(key, values.next());
			}
		}
	} // MergeReduce()
	
	
	// This map denormalizes all columns, with the key being "column:value":
	//     * ismaster flag
	//     * for nonmasters the partitionid/rownumber (to identify the records again) (or empty for the master key)
	//     * for nonmasters the column number
	//     * for masters, the key
	public static class MatchColumnsMap extends Mapper<Text, GenericRecord, Text, Text> {
		static Text keytext = new Text("");
		static Text valtext = new Text("");
		static String partitionid;
		static long rownumber = 0;
		static boolean ismaster = false;
		static HashMap<String, Integer> usecolumns = new HashMap<String, Integer>();
		static int numcolumns = 0;
		
		public void setup (Context context) {
			FileSplit fs = (FileSplit) context.getInputSplit();
			String path = fs.getPath().toString();
			if ((path.indexOf("/master") >= 0) || (path.indexOf("/newmaster") >= 0)) {
				ismaster = true;
			}
			else ismaster = false;
			Configuration conf = context.getConfiguration();
			partitionid = Integer.toString(conf.getInt("mapred.task.partition", 0));
			rownumber = 0;
			numcolumns = conf.getInt(PARAMETER_usecolumns_numvals, 0);
			if (numcolumns > 0) {
				for (int i = 0; i < numcolumns; i++) {
					usecolumns.put(conf.get(PARAMETER_usecolumns_nthvalue+i), i);
				}
			}
		} // setup()

		public void map(Text key, GenericRecord value, Context context)
		throws IOException, InterruptedException {
			for (Map.Entry<String, Integer> kv : value.getMetadata().entrySet()) {
				if (ismaster) {
					keytext.set(value.get("column")+":"+value.get("value"));
					valtext.set("master\t" + value.get("id"));
					context.write(keytext, valtext);
				}
				else {
					String colname = kv.getKey();
					boolean expectmaster = ((numcolumns == 0) || usecolumns.containsKey(colname));
					keytext.set(colname+":"+value.get(kv.getValue()));
					valtext.set((expectmaster ? "expect" : "nomaster") + "\t" +
							    partitionid+":"+Long.toString(rownumber) + "\t" +
								kv.getValue().toString());
					context.write(keytext, valtext);
				}
			}
			if (! ismaster) rownumber++;
		} // map()
	} // class MatchColumnsMap

	
	public static class MatchColumnsReduce extends Reducer<Text, Text, Text, Text> {
		static Text keytext = new Text("");
		static Text valtext = new Text("");

		public void reduce(Text key, Iterable<Text> values, Context context)
		throws IOException, InterruptedException {
			boolean foundmaster = false;
			ArrayList<String> unmatchedvals = new ArrayList<String>();
			String idstr = "";
			String colval = key.toString().split(":")[1];
			for (Text valt : values) {
				String val = valt.toString();
				if (val.startsWith("nomaster")) {
					String [] pieces = val.split("\t");
					keytext.set(pieces[1]);
					valtext.set(pieces[2] + "\t" + colval);
					context.write(keytext, valtext);
				}
				else if (val.startsWith("master")) {
					String [] pieces = val.split("\t");
					idstr = pieces[1];
					foundmaster = true;
				}
				else if (foundmaster) {
					String [] pieces = val.split("\t");
					keytext.set(pieces[1]);
					valtext.set(pieces[2] + "\t" + idstr);
					context.write(keytext, valtext);
				}
				else {
					unmatchedvals.add(new String(val));
				}
			}
			if (foundmaster) {
				for (String val : unmatchedvals) {
					String [] pieces = val.split("\t");
					keytext.set(pieces[1]);
					valtext.set(pieces[2] + "\t" + idstr);
					context.write(keytext, valtext);
				}
			}
			else {
				for (String val : unmatchedvals) {
					String [] pieces = val.split("\t");
					keytext.set(pieces[1]);
					valtext.set(pieces[2] + "\t" + colval);
					context.write(keytext, valtext);
				}
			}
		}
	} // class MatchColumnsReduce


	public static class PutTogetherMap extends Mapper<LongWritable, Text, Text, Text> {
		static Text keytext  = new Text("");
		static Text valtext = new Text("");
		
		public void map(LongWritable key, Text value, Context context)
		throws IOException, InterruptedException {
			String [] pieces = value.toString().split("\t");
			keytext.set(pieces[0]);
			valtext.set(pieces[1] + "\t" + pieces[2]);
			context.write(keytext, valtext);
		} // map()
	} // class PutTogetherMap

	
	public static class PutTogetherReduce extends Reducer<Text, Text, Text, Text> {
		static Text keytext = new Text("");
		static Text valtext = new Text("");
		
		public void setup (Context context) {
			try {
				Configuration conf = context.getConfiguration();
				String fname = conf.get(PARAMETER_original_dataformatfile);
				FileSystem fs = FileSystem.get(conf);
				FSDataInputStream ins = fs.open(new Path(fname));
				String line = ins.readUTF();
				String [] pieces = line.split("\t");
				String outval = "";
				for (int i = 1; i < pieces.length; i++) {
					outval = (i == 1 ? "" : outval + "\t") + pieces[i];
				}
				keytext.set(pieces[0]);
				valtext.set(outval);
				context.write(keytext, valtext);
				ins.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}  // setup()

		public void reduce(Text key, Iterable<Text> values, Context context)
		throws IOException, InterruptedException {
			String [] parts = new String[1000];
			int maxpart = 0;
			for (Text valt : values) {
				String [] pieces = valt.toString().split("\t");
				int pos = Integer.parseInt(pieces[0]);
				if (pos > maxpart) {
					maxpart = pos;
				}
				if (pos == 0) {
					keytext.set(pieces[1]);
				}
				else parts[pos] = new String((pos > 0 ? "\t" : "") + pieces[1]);
			}
			String outstr = "";
			for (int i = 1; i <= maxpart; i++) {
				outstr = outstr + parts[i];
			}
			valtext.set(outstr);
			context.write(keytext, valtext);
		}
	} // class PutTogetherReduce

	
	
	public static void main(String[] args) {
		try {
			Configuration conf = new Configuration();
			FileSystem fs;
			FileStatus[] files;
			int numvals;
			String uniquecolumnvaluespath = "normalize/tmp/newcolumnvalues";
			String nonmatchingcolumnvaluespath = "normalize/tmp/nonmatching";
			String matchingcolumnvaluespath = "normalize/tmp/matching";
			String saverecordsdir = "normalize/tmp/saverecords";
			String keysummaryoutput = "normalize/tmp/keysummaryoutput";
			String unmatchedwithidspath = "normalize/tmp/unmatchedwithids";
			String newidmasterpath = "normalize/tmp/newmaster";
			String formatchingtmppath = "normalize/tmp/formatching";
			String originaldataformatpath = "normalize/tmp/columnnames.txt";
			String lookupmatchespath = args[2];
			String finaloutputpath = args[1];

			// args[3] has the list of columns
			if (args.length > 3) {
				String [] colpieces = args[3].split(",");
				for (int i = 0; i < colpieces.length; i++) {
					conf.set(PARAMETER_usecolumns_nthvalue + i, colpieces[i]);
				}
				conf.setInt(PARAMETER_usecolumns_numvals, colpieces.length);
			}
			conf.set(PARAMETER_original_dataformatfile, originaldataformatpath);
			
			// Before running the job, delete the output files	
			fs = FileSystem.get(conf);
			fs.delete(new Path(originaldataformatpath), true);
			fs.delete(new Path(formatchingtmppath), true);
			fs.delete(new Path(finaloutputpath), true);
			
			fs.delete(new Path(newidmasterpath), true);
			
			fs.delete(new Path(saverecordsdir), true);
			fs.delete(new Path(keysummaryoutput), true);

			fs.delete(new Path(uniquecolumnvaluespath), true);
			fs.delete(new Path(args[1]), true);
			fs.delete(new Path(nonmatchingcolumnvaluespath), true);
			fs.delete(new Path(matchingcolumnvaluespath), true);
			fs.delete(new Path(unmatchedwithidspath), true);
			// master key file is in args[2]
			
			{  // Step 1
				Job j = new Job(conf, "Step 1:  Extract unique column values from appropriate columns");
				j.setJarByClass(Normalize.class);

				j.setOutputKeyClass(Text.class);
				j.setOutputValueClass(GenericRecord.class);
				j.setMapOutputKeyClass(Text.class);
				j.setMapOutputValueClass(LongWritable.class);
				j.setInputFormatClass(GenericRecordInputFormat.class);
				FileInputFormat.addInputPath(j, new Path(args[0]));
				FileOutputFormat.setOutputPath(j, new Path(uniquecolumnvaluespath));
				j.setMapperClass(Normalize.ColumnValueMap.class);
				j.setCombinerClass(Normalize.ColumnValueCombine.class);
				j.setReducerClass(Normalize.ColumnValueReduce.class);

				System.out.println("RUNNING:  "+j.getJobName());
				j.waitForCompletion(true);
			}
			
			{
				Job j = new Job(conf, "Step 2:  Find non-matching values");
				j.setJarByClass(Normalize.class);

				j.setOutputKeyClass(Text.class);
				j.setOutputValueClass(GenericRecord.class);
				j.setMapOutputKeyClass(Text.class);
				j.setMapOutputValueClass(GenericRecord.class);
				j.setInputFormatClass(GenericRecordInputFormat.class);
				FileInputFormat.addInputPath(j, new Path(uniquecolumnvaluespath));
				FileInputFormat.addInputPath(j, new Path(lookupmatchespath));
				FileOutputFormat.setOutputPath(j, new Path(nonmatchingcolumnvaluespath));
				j.setMapperClass(Normalize.GetNonmatchingMap.class);
				j.setReducerClass(Normalize.GetNonmatchingReduce.class);

				System.out.println("RUNNING:  "+j.getJobName());
				j.waitForCompletion(true);
			}

			{
				Job j = new Job(conf, "Step 3:  Find maximum id for each column");
				j.setJarByClass(Normalize.class);
				j.setOutputKeyClass(Text.class);
				j.setOutputValueClass(LongWritable.class);
				j.setMapOutputKeyClass(Text.class);
				j.setMapOutputValueClass(LongWritable.class);
				j.setInputFormatClass(GenericRecordInputFormat.class);
				FileInputFormat.addInputPath(j, new Path(lookupmatchespath));
				FileOutputFormat.setOutputPath(j, new Path(matchingcolumnvaluespath));
				j.setMapperClass(Normalize.CalcMaxIdMap.class);
				j.setCombinerClass(Normalize.CalcMaxIdReduce.class);
				j.setReducerClass(Normalize.CalcMaxIdReduce.class);

				System.out.println("RUNNING:  "+j.getJobName());
				j.waitForCompletion(true);
			}
			
			files = fs.globStatus(new Path(matchingcolumnvaluespath+ "/p*"));
			numvals = 0;
			HashMap<String, Long> hmIdOffsets = new HashMap<String, Long>();
			for (FileStatus fstat : files) {
				FSDataInputStream fsdis = fs.open(fstat.getPath());
				//BufferedReader br = new BufferedReader(new FileReader(fstat.getPath().toString()));
				String line = "";
				while ((line = fsdis.readLine()) != null) {
					conf.set(PARAMETER_maxvals_nthvalue + numvals++, line);
					String [] pieces = line.split("\t");
					hmIdOffsets.put(pieces[0], Long.parseLong(pieces[1]));
				}
				fsdis.close();
			}
			conf.setInt(PARAMETER_maxvals_numvals, numvals);
			
			// This code block just prints out the offsets
			for (int i = 0; i < numvals; i++) {
				System.out.println("maxvals[" + i + "] = "
						+ conf.get("normalize.maxvals." + i));
			}
			
			{ // Step 4, part 1
				conf.set(PARAMETER_saverecordsdir, saverecordsdir);
				Job j = new Job(conf, "Step 4:  Set ids for unmatched columns (part 1)");
				j.setJarByClass(Normalize.class);
				j.setOutputKeyClass(Text.class);
				j.setOutputValueClass(LongWritable.class);
				j.setMapOutputKeyClass(Text.class);
				j.setMapOutputValueClass(LongWritable.class);
				j.setInputFormatClass(GenericRecordInputFormat.class);
				FileInputFormat.addInputPath(j, new Path(nonmatchingcolumnvaluespath));
				FileOutputFormat.setOutputPath(j, new Path(keysummaryoutput));
				j.setMapperClass(Normalize.RowNumberPhase1Map.class);
				j.setReducerClass(Normalize.RowNumberPhase1Reduce.class);

				System.out.println("RUNNING:  "+j.getJobName());
				j.waitForCompletion(true);
			}

			// Now read the results in the key summary file 
			files = fs.globStatus(new Path(keysummaryoutput+ "/p*"));
			HashMap<String, Long> hmcumsums = new HashMap<String, Long>();
			numvals = 0;
			for (FileStatus fstat : files) {
				FSDataInputStream fsdis = fs.open(fstat.getPath());
				String line = "";
				while ((line = fsdis.readLine()) != null) {
					String [] pieces = line.split("\t");
					String colname = pieces[0].split(":")[1];
					long cumsum;
					if (hmcumsums.containsKey(colname)) {
						cumsum = hmcumsums.get(colname);
					}
					else {
						cumsum = (hmIdOffsets.containsKey(colname) ? hmIdOffsets.get(colname) : 0);
					}
					conf.set(PARAMETER_cumsum_nthvalue + numvals++, line + "\t" + cumsum);
					String[] vals = line.split("\t");
					hmcumsums.put(colname, cumsum + Long.parseLong(vals[1]));
				}
			}
			conf.setInt(PARAMETER_cumsum_numvals, numvals);
			
			for (int i = 0; i < numvals; i++) {
				System.out.println("cumsum[" + i + "] = " + conf.get(PARAMETER_cumsum_nthvalue + i));				
			}
			
			{
				Job j = new Job(conf, "Step 4:  Set ids for unmatched columns (part 2)");
				j.setJarByClass(Normalize.class);
				j.setOutputKeyClass(Text.class);
				j.setOutputValueClass(Text.class);
				j.setMapOutputKeyClass(Text.class);
				j.setMapOutputValueClass(Text.class);
				j.setInputFormatClass(SequenceFileInputFormat.class);
				FileInputFormat.addInputPath(j, new Path(saverecordsdir));
				FileOutputFormat.setOutputPath(j, new Path(unmatchedwithidspath));
				j.setMapperClass(Normalize.RowNumberPhase2Map.class);
				j.setReducerClass(Normalize.RowNumberPhase2Reduce.class);
				j.setNumReduceTasks(5);

				System.out.println("RUNNING:  "+j.getJobName());
				j.waitForCompletion(true);
			}
			// Next merge
			{  // Step 5
				Job j = new Job(conf, "Step 5:  Merge results from (4) with master keys");
				j.setJarByClass(Normalize.class);
				j.setOutputKeyClass(Text.class);
				j.setOutputValueClass(Text.class);
				j.setMapOutputKeyClass(Text.class);
				j.setMapOutputValueClass(Text.class);
				j.setInputFormatClass(GenericRecordInputFormat.class);
				FileInputFormat.addInputPath(j, new Path(unmatchedwithidspath));
				FileInputFormat.addInputPath(j, new Path(lookupmatchespath));
				FileOutputFormat.setOutputPath(j, new Path(newidmasterpath));
				j.setMapperClass(Normalize.MergeMap.class);
				j.setReducerClass(Normalize.MergeReduce.class);
				j.setNumReduceTasks(4);
			
				System.out.println("RUNNING:  "+j.getJobName());
				j.waitForCompletion(true);
			}
			
			{  // Step 6, part 1
				Job j = new Job(conf, "Step 6:  Match back the original data, step 1");
				j.setJarByClass(Normalize.class);
				j.setOutputKeyClass(Text.class);
				j.setOutputValueClass(Text.class);
				j.setMapOutputKeyClass(Text.class);
				j.setMapOutputValueClass(Text.class);
				j.setInputFormatClass(GenericRecordInputFormat.class);
				FileInputFormat.addInputPath(j, new Path(newidmasterpath));
				FileInputFormat.addInputPath(j, new Path(args[0]));
				FileOutputFormat.setOutputPath(j, new Path(formatchingtmppath));
				j.setMapperClass(Normalize.MatchColumnsMap.class);
				j.setReducerClass(Normalize.MatchColumnsReduce.class);
			
				System.out.println("RUNNING:  "+j.getJobName());
				j.waitForCompletion(true);
			}

			{  // Step 6, part 2
				Job j = new Job(conf, "Step 6:  Match back the original data, step 2");
				j.setJarByClass(Normalize.class);
				j.setOutputKeyClass(Text.class);
				j.setOutputValueClass(Text.class);
				j.setMapOutputKeyClass(Text.class);
				j.setMapOutputValueClass(Text.class);
				j.setInputFormatClass(TextInputFormat.class);
				FileInputFormat.addInputPath(j, new Path(formatchingtmppath));
				FileOutputFormat.setOutputPath(j, new Path(finaloutputpath));
				j.setMapperClass(Normalize.PutTogetherMap.class);
				j.setReducerClass(Normalize.PutTogetherReduce.class);
			
				System.out.println("RUNNING:  "+j.getJobName());
				j.waitForCompletion(true);
			}

		} catch (Exception e) {
			e.printStackTrace();
		}
	} // main
}  // class Normalize 

