package characterize;

/**
*
* @author Gordon S. Linoff (gordon@data-miners.com)
* December 2009
*/

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Characterize {

	public static class ColumnValueMap extends Mapper<Text, GenericRecord, Text, LongWritable> {

		static Text keytext = new Text("");
		static final LongWritable valone = new LongWritable(1);

		public void map(Text key, GenericRecord value, Context context)
		throws IOException, InterruptedException {
			for (Map.Entry<String, Integer> kv : value.getMetadata().entrySet()) {
				keytext.set(kv.getKey()+":"+value.get(kv.getValue()));
				context.write(keytext, valone);
			}
		} // map()
	} // class ColumnValueMap


	public static class ColumnValueCombine extends Reducer<Text, LongWritable, Text, LongWritable> {
		static LongWritable longval = new LongWritable();

		public void combine(Text key, Iterable<LongWritable> values, Context context)
		throws IOException, InterruptedException {
			Long count = 0L;
			for (LongWritable val : values) {
				count += val.get();
			}
			longval.set(count);
			context.write(key, longval);
		}
	} // ColumnValueCombine()

	// This reduce counts the number of records in each partition by
	// taking the maximum of the row numbers.  This reduce function is
	
	public static class ColumnValueReduce extends
	Reducer<Text, LongWritable, Text, GenericRecord> {
		private static GenericRecordMetadata metadata;
		private static GenericRecord genrec;
		private static Text keytext = new Text();
		private static boolean firsttime = false;

		public void setup(Context context) {
			String [] colnames = new String[] {"value", "count"};
			metadata = new GenericRecordMetadata("summary", colnames);
			genrec = new GenericRecord(metadata);
			genrec.init();
			genrec.set(colnames);
			keytext.set("column");
			firsttime = true;
		}  // setup()

		public void reduce(Text key, Iterable<LongWritable> values, Context context)
		throws IOException, InterruptedException {
			if (firsttime) {
				context.write(keytext, genrec);
				firsttime = false;
			}
			Long count = 0L;
			for (LongWritable val : values) {
				count += val.get();
			}
			genrec.init();
			String [] parts = key.toString().split(":");
			genrec.set(0, parts[1]);
			genrec.set(1, count.toString());
			keytext.set(parts[0]);
			context.write(keytext, genrec);
		}
	} // ColumnValueReduce()

	private static GenericRecordMetadata getColumnValueToSummarizeMetadata ()
	{
		String [] colnames = new String[] {"value", "count"};
		return new GenericRecordMetadata("summary", colnames);	
	}  // getColumnValueToSummarizeMetadata()

	public static class SummarizeMap extends Mapper<Text, GenericRecord, Text, GenericRecord> {
		static Text keytext = new Text("");
		static GenericRecord genrec;
		static GenericRecordMetadata metadata;
		static boolean firsttime = false;
		
		public void setup (Context context) {
			metadata = getColumnValueToSummarizeMetadata();
			genrec = new GenericRecord(metadata);
			genrec.init();
		} // setup()
		
		public void map(Text key, GenericRecord value, Context context)
		throws IOException, InterruptedException {
			if (firsttime) {
				//context.write(keytext, genrec);
				firsttime = false;
			}
			keytext.set(value.get("column"));
			genrec.set("value", value.get("value"));
			genrec.set("count", value.get("count"));
			context.write(keytext, genrec);
		} // map()
	} // class SummarizeMap


	public static class SummarizeReduce
	extends Reducer<Text, GenericRecord, Text, GenericRecord> {
		private static GenericRecordMetadata metadata;
		private static GenericRecord genrec;
		private static Text keytext = new Text("");
		private static boolean firsttime = false;
		private static GenericRecordMetadata inmetadata;

		public void setup(Context context) {
			String [] colnames = new String[] {"statistic", "value"};
			metadata = new GenericRecordMetadata("statistics", colnames);
			genrec = new GenericRecord(metadata);
			genrec.init();
			genrec.set(colnames);
			firsttime = true;
			
			inmetadata = getColumnValueToSummarizeMetadata();
		}  // setup()
		
		private void setValue (String statname, String val, Text key, Context context) throws IOException, InterruptedException {
			genrec.set("statistic", statname);
			genrec.set("value", val);
			context.write(key, genrec);
		}  // setValue()

		public void reduce(Text key, Iterable<GenericRecord> values, Context context)
		throws IOException, InterruptedException {
			if (firsttime) {
				context.write(keytext, genrec);
				firsttime = false;
			}
			long numrecords = 0L;
			long numvalues = 0L;
			int minlength = Integer.MAX_VALUE;
			String minlengthval = "";
			long minlengthnumat = 0;
			int maxlength = Integer.MIN_VALUE;
			String maxlengthval = "";
			long maxlengthnumat = 0;
			String minval = "", maxval  = "";
			String mode1 = "", mode2 = "", mode3 = "";
			long mode1count = 0;
			long mode2count = 0;
			long mode3count = 0;
			long minvalnumat = 0, maxvalnumat = 0;
			boolean hasinteger = false;
			long intminval = Long.MAX_VALUE;
			long intmaxval = Long.MIN_VALUE;
			long intrecordcount = 0;
			long intvaluecount = 0;
			long intminvalnumat = 0;
			long intmaxvalnumat = 0;
			double doubleminval = Double.MAX_VALUE;
			double doublemaxval = Double.MIN_VALUE;
			long doublerecordcount = 0;
			long doublevaluecount = 0;
			long doubleminvalnumat = 0;
			long doublemaxvalnumat = 0;
			long doublepositivecount = 0;
			long doublenegativecount = 0;
			long doublezerocount = 0;
			double doublesum = 0;
			for (GenericRecord val : values) {
				val._setMetadata(inmetadata);
				long thiscount = Long.parseLong(val.get("count"));
				numvalues++;
				numrecords += thiscount;
				String valstr = val.get("value");
				if ((valstr.compareTo(minval) < 0) || (minval.compareTo("") == 0)) {
					minval = valstr;
					minvalnumat = thiscount;
				}
				else if (valstr.compareTo(minval) == 0) {
					minvalnumat += thiscount;
				}
				if ((valstr.compareTo(maxval) > 0) || (maxval.compareTo("") == 0)) {
					maxval = valstr;
					maxvalnumat = thiscount;
				}
				else if (valstr.compareTo(maxval) == 0) {
					maxvalnumat += thiscount;
				}
				if (valstr.length() < minlength) {
					minlength = valstr.length();
					minlengthval = valstr;
					minlengthnumat = thiscount;
				}
				else if (minlength == valstr.length()) {
					minlengthnumat += thiscount;
				}
				if (valstr.length() > maxlength) {
					maxlength = valstr.length();
					maxlengthval = valstr;
					maxlengthnumat = 1;
				}
				else if (maxlength == valstr.length()) {
					maxlengthnumat += thiscount;
				}
				if ((mode1count == 0) || thiscount > mode1count) {
					mode3 = mode2;
					mode3count = mode2count;
					mode2 = mode1;
					mode2count = mode1count;
					mode1count = thiscount;
					mode1 = valstr;
				}
				else if ((mode2count == 0) || (thiscount > mode2count)) {
					mode3 = mode2;
					mode3count = mode2count;
					mode2 = valstr;
					mode2count = thiscount;
				}
				else if ((mode3count == 0) || (thiscount > mode3count)) {
					mode3 = valstr;
					mode3count = thiscount;
				}
				try {
					long intval = Integer.parseInt(valstr);
					hasinteger = true;
					intvaluecount++;
					intrecordcount += Long.parseLong(val.get("count"));
					if (intval < intminval) {
						intminval = intval;
						intminvalnumat = thiscount;
					}
					else if (intval == intminval) {
						intminvalnumat += thiscount;
					}
					if (intval > intmaxval) {
						intmaxval = intval;
						intmaxvalnumat = thiscount;
					}
					else if (intval == intmaxval) {
						intmaxvalnumat += thiscount;
					}
				}
				catch (Exception NumberFormatException) {
					// we don't have to do anything here
					hasinteger = false;
					
				}
				try {
					double doubleval = Double.parseDouble(valstr);
					doublevaluecount++;
					doublerecordcount += thiscount;
					if (doubleval < doubleminval) {
						doubleminval = doubleval;
						doubleminvalnumat = thiscount;
					}
					else if (doubleval == doubleminval) {
						doubleminvalnumat += thiscount;
					}
					if (doubleval > doublemaxval) {
						doublemaxval = doubleval;
						doublemaxvalnumat = thiscount;
					}
					else if (doubleval == doublemaxval) {
						doublemaxvalnumat += thiscount;
					}
					if (doubleval > 0) {
						doublepositivecount += thiscount;
					}
					else if (doubleval < 0) {
						doublenegativecount += thiscount;
					}
					else doublezerocount += thiscount;
					doublesum += thiscount*doubleval;
				}
				catch (Exception NumberFormatException) {
					// we don't have to do anything here
					hasinteger = false;
					
				}
			}
			setValue("numberOfRecords", Long.toString(numrecords), key, context);
			setValue("numberOfValues", Long.toString(numvalues), key, context);
			setValue("minimumValue", minval, key, context);
			setValue("minimumValueCount", Long.toString(minvalnumat), key, context);
			setValue("maximumValue", maxval, key, context);
			setValue("maximumValueCount", Long.toString(maxvalnumat), key, context);
			setValue("minimumLength", Integer.toString(minlength), key, context);
			setValue("minimumLengthCount", Long.toString(minlengthnumat), key, context);
			setValue("minimumLengthExample", minlengthval, key, context);
			setValue("maximumLength", Integer.toString(maxlength), key, context);
			setValue("maximumLengthCount", Long.toString(maxlengthnumat), key, context);
			setValue("maximumLengthExample", maxlengthval, key, context);
			setValue("mode1Value", mode1, key, context);
			setValue("mode1Count", Long.toString(mode1count), key, context);
			setValue("mode2Value", mode2, key, context);
			setValue("mode2Count", Long.toString(mode2count), key, context);
			setValue("mode3Value", mode3, key, context);
			setValue("mode3Count", Long.toString(mode3count), key, context);
			if (intrecordcount > 0) {
				setValue("intNumberOfRecords", Long.toString(intrecordcount), key, context);
				setValue("intNumberOfValues", Long.toString(intvaluecount), key, context);
				setValue("intMinimumValue", Long.toString(intminval), key, context);
				setValue("intMinimumValueCount", Long.toString(intminvalnumat), key, context);
				setValue("intMaximumValue", Long.toString(intmaxval), key, context);
				setValue("intMaximumValueCount", Long.toString(intmaxvalnumat), key, context);
			}
			if (doublerecordcount > 0) {
				setValue("doubleNumberOfRecords", Long.toString(doublerecordcount), key, context);
				setValue("doubleNumberOfValues", Long.toString(doublevaluecount), key, context);
				setValue("doubleMinimumValue", Double.toString(doubleminval), key, context);
				setValue("doubleMinimumValueCount", Long.toString(intminvalnumat), key, context);
				setValue("doubleMaximumValue", Double.toString(doublemaxval), key, context);
				setValue("doubleMaximumValueCount", Long.toString(intmaxvalnumat), key, context);
				setValue("doublePositiveValueCount", Long.toString(doublepositivecount), key, context);
				setValue("doubleZeroValueCount", Long.toString(doublezerocount), key, context);
				setValue("doubleNegativeValueCount", Long.toString(doublenegativecount), key, context);
				setValue("doubleAverage", Double.toString(doublesum / doublevaluecount), key, context);
			}
		} // reduce()
	} // class SummarizeReduce

	public static void main(String[] args) {
		try {
			Configuration conf = new Configuration();
			FileSystem fs;
			String columnvaluesummary = "characterize/tmp";
			// Before running the job, delete the output files	
			fs = FileSystem.get(conf);
			fs.delete(new Path(columnvaluesummary), true);
			fs.delete(new Path(args[1]), true);

			Job genrecjob = new Job(conf, "Split records into fields");
			genrecjob.setJarByClass(Characterize.class);

			genrecjob.setOutputKeyClass(Text.class);
			genrecjob.setOutputValueClass(GenericRecord.class);
			genrecjob.setMapOutputKeyClass(Text.class);
			genrecjob.setMapOutputValueClass(LongWritable.class);
			genrecjob.setInputFormatClass(GenericRecordInputFormat.class);
			FileInputFormat.addInputPath(genrecjob, new Path(args[0]));
			FileOutputFormat.setOutputPath(genrecjob, new Path(columnvaluesummary));
			genrecjob.setMapperClass(Characterize.ColumnValueMap.class);
			genrecjob.setCombinerClass(Characterize.ColumnValueCombine.class);
			genrecjob.setReducerClass(Characterize.ColumnValueReduce.class);

			genrecjob.waitForCompletion(true);
			
			Job summaryjob = new Job(conf, "Summarize job");
			summaryjob.setJarByClass(Characterize.class);
			summaryjob.setMapOutputKeyClass(Text.class);
			summaryjob.setMapOutputValueClass(GenericRecord.class);
			summaryjob.setOutputKeyClass(Text.class);
			summaryjob.setOutputValueClass(GenericRecord.class);
			summaryjob.setInputFormatClass(GenericRecordInputFormat.class);
			FileInputFormat.addInputPath(summaryjob, new Path(columnvaluesummary));
			FileOutputFormat.setOutputPath(summaryjob, new Path(args[1]));
			summaryjob.setMapperClass(Characterize.SummarizeMap.class);
			summaryjob.setReducerClass(Characterize.SummarizeReduce.class);
			
			summaryjob.waitForCompletion(true);

		} catch (Exception e) {
			e.printStackTrace();
		}
	} // main
}  // class Characterize 

