package generictester; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class GenericRecordTester { public static class SplitMap extends Mapper { static Text keytext = new Text(""); static final LongWritable valone = new LongWritable(1); public void map(Text key, GenericRecord value, Context context) throws IOException, InterruptedException { for (Map.Entry kv : value.getMetadata().entrySet()) { keytext.set(kv.getKey()+":"+value.get(kv.getValue())); context.write(keytext, valone); } } // map() } // class SplitMap public static class SplitCombine extends Reducer { static LongWritable longval = new LongWritable(); public void combine(Text key, Iterable values, Context context) throws IOException, InterruptedException { Long count = 0L; for (LongWritable val : values) { count += val.get(); } longval.set(count); context.write(key, longval); } } // SplitCombine() // This reduce counts the number of records in each partition by // taking the maximum of the row numbers. This reduce function is // used both as a combiner and reducer. public static class SplitReduce extends Reducer { private static GenericRecordMetadata metadata; private static GenericRecord genrec; private static Text keytext = new Text(); private static boolean firsttime = false; public void setup(Context context) { String [] colnames = new String[] {"value", "count"}; metadata = new GenericRecordMetadata("summary", colnames); genrec = new GenericRecord(metadata); genrec.init(); genrec.set(colnames); keytext.set("column"); firsttime = true; } // setup() public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { if (firsttime) { context.write(keytext, genrec); firsttime = false; } Long count = 0L; for (LongWritable val : values) { count += val.get(); } genrec.init(); String [] parts = key.toString().split(":"); genrec.set(0, parts[1]); genrec.set(1, count.toString()); keytext.set(parts[0]); context.write(keytext, genrec); } } // SplitReduce() public static void main(String[] args) { try { Configuration conf = new Configuration(); FileSystem fs; // Before running the job, delete the output files fs = FileSystem.get(conf); fs.delete(new Path(args[1]), true); Job genrecjob = new Job(conf, "Generic Tester"); genrecjob.setJarByClass(GenericRecordTester.class); genrecjob.setOutputKeyClass(Text.class); genrecjob.setOutputValueClass(GenericRecord.class); genrecjob.setMapOutputKeyClass(Text.class); genrecjob.setMapOutputValueClass(LongWritable.class); genrecjob.setInputFormatClass(GenericRecordInputFormat.class); FileInputFormat.addInputPath(genrecjob, new Path(args[0])); FileOutputFormat.setOutputPath(genrecjob, new Path(args[1])); genrecjob.setMapperClass(GenericRecordTester.SplitMap.class); genrecjob.setCombinerClass(GenericRecordTester.SplitCombine.class); genrecjob.setReducerClass(GenericRecordTester.SplitReduce.class); genrecjob.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } } // main } // class GenericRecordTester