package generictester;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class GenericRecordTester {
	public static class SplitMap
		extends Mapper<Text, GenericRecord, Text, LongWritable> {
		 
		static Text keytext = new Text("");
		static final LongWritable valone = new LongWritable(1);

		public void map(Text key, GenericRecord value, Context context)
				throws IOException, InterruptedException {
			for (Map.Entry<String, Integer> kv : value.getMetadata().entrySet()) {
				keytext.set(kv.getKey()+":"+value.get(kv.getValue()));
				context.write(keytext, valone);
			}
		} // map()
	} // class SplitMap

	
	public static class SplitCombine extends Reducer<Text, LongWritable, Text, LongWritable> {
		static LongWritable longval = new LongWritable();

		public void combine(Text key, Iterable<LongWritable> values, Context context)
			throws IOException, InterruptedException {
			Long count = 0L;
			for (LongWritable val : values) {
				count += val.get();
			}
			longval.set(count);
			context.write(key, longval);
		}
	} // SplitCombine()


	
	// This reduce counts the number of records in each partition by
	// taking the maximum of the row numbers.  This reduce function is
	// used both as a combiner and reducer.
	public static class SplitReduce extends
			Reducer<Text, LongWritable, Text, GenericRecord> {
		private static GenericRecordMetadata metadata;
		private static GenericRecord genrec;
		private static Text keytext = new Text();
		private static boolean firsttime = false;
		
		public void setup(Context context) {
			String [] colnames = new String[] {"value", "count"};
			metadata = new GenericRecordMetadata("summary", colnames);
			genrec = new GenericRecord(metadata);
			genrec.init();
			genrec.set(colnames);
			keytext.set("column");
			firsttime = true;
		}  // setup()
		
		public void reduce(Text key, Iterable<LongWritable> values, Context context)
				throws IOException, InterruptedException {
			if (firsttime) {
				context.write(keytext, genrec);
				firsttime = false;
			}
			Long count = 0L;
			for (LongWritable val : values) {
				count += val.get();
			}
			genrec.init();
			String [] parts = key.toString().split(":");
			genrec.set(0, parts[1]);
			genrec.set(1, count.toString());
			keytext.set(parts[0]);
			context.write(keytext, genrec);
		}
	} // SplitReduce()
	
	
	public static void main(String[] args) {
		try {
			Configuration conf = new Configuration();
			FileSystem fs;
			// Before running the job, delete the output files	
			fs = FileSystem.get(conf);
			fs.delete(new Path(args[1]), true);

			Job genrecjob = new Job(conf, "Generic Tester");
			genrecjob.setJarByClass(GenericRecordTester.class);

			genrecjob.setOutputKeyClass(Text.class);
			genrecjob.setOutputValueClass(GenericRecord.class);
			genrecjob.setMapOutputKeyClass(Text.class);
			genrecjob.setMapOutputValueClass(LongWritable.class);
			genrecjob.setInputFormatClass(GenericRecordInputFormat.class);
			FileInputFormat.addInputPath(genrecjob, new Path(args[0]));
			FileOutputFormat.setOutputPath(genrecjob, new Path(args[1]));
			genrecjob.setMapperClass(GenericRecordTester.SplitMap.class);
			genrecjob.setCombinerClass(GenericRecordTester.SplitCombine.class);
			genrecjob.setReducerClass(GenericRecordTester.SplitReduce.class);
			
			genrecjob.waitForCompletion(true);
		} catch (Exception e) {
			e.printStackTrace();
		}
	} // main
}  // class GenericRecordTester 

