package generictester;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class GenericRecordTester {
public static class SplitMap
extends Mapper {
static Text keytext = new Text("");
static final LongWritable valone = new LongWritable(1);
public void map(Text key, GenericRecord value, Context context)
throws IOException, InterruptedException {
for (Map.Entry kv : value.getMetadata().entrySet()) {
keytext.set(kv.getKey()+":"+value.get(kv.getValue()));
context.write(keytext, valone);
}
} // map()
} // class SplitMap
public static class SplitCombine extends Reducer {
static LongWritable longval = new LongWritable();
public void combine(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
Long count = 0L;
for (LongWritable val : values) {
count += val.get();
}
longval.set(count);
context.write(key, longval);
}
} // SplitCombine()
// This reduce counts the number of records in each partition by
// taking the maximum of the row numbers. This reduce function is
// used both as a combiner and reducer.
public static class SplitReduce extends
Reducer {
private static GenericRecordMetadata metadata;
private static GenericRecord genrec;
private static Text keytext = new Text();
private static boolean firsttime = false;
public void setup(Context context) {
String [] colnames = new String[] {"value", "count"};
metadata = new GenericRecordMetadata("summary", colnames);
genrec = new GenericRecord(metadata);
genrec.init();
genrec.set(colnames);
keytext.set("column");
firsttime = true;
} // setup()
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
if (firsttime) {
context.write(keytext, genrec);
firsttime = false;
}
Long count = 0L;
for (LongWritable val : values) {
count += val.get();
}
genrec.init();
String [] parts = key.toString().split(":");
genrec.set(0, parts[1]);
genrec.set(1, count.toString());
keytext.set(parts[0]);
context.write(keytext, genrec);
}
} // SplitReduce()
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
FileSystem fs;
// Before running the job, delete the output files
fs = FileSystem.get(conf);
fs.delete(new Path(args[1]), true);
Job genrecjob = new Job(conf, "Generic Tester");
genrecjob.setJarByClass(GenericRecordTester.class);
genrecjob.setOutputKeyClass(Text.class);
genrecjob.setOutputValueClass(GenericRecord.class);
genrecjob.setMapOutputKeyClass(Text.class);
genrecjob.setMapOutputValueClass(LongWritable.class);
genrecjob.setInputFormatClass(GenericRecordInputFormat.class);
FileInputFormat.addInputPath(genrecjob, new Path(args[0]));
FileOutputFormat.setOutputPath(genrecjob, new Path(args[1]));
genrecjob.setMapperClass(GenericRecordTester.SplitMap.class);
genrecjob.setCombinerClass(GenericRecordTester.SplitCombine.class);
genrecjob.setReducerClass(GenericRecordTester.SplitReduce.class);
genrecjob.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
} // main
} // class GenericRecordTester