package characterize;
/**
*
* @author Gordon S. Linoff ([email protected])
* December 2009
*/
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Characterize {
public static class ColumnValueMap extends Mapper {
static Text keytext = new Text("");
static final LongWritable valone = new LongWritable(1);
public void map(Text key, GenericRecord value, Context context)
throws IOException, InterruptedException {
for (Map.Entry kv : value.getMetadata().entrySet()) {
keytext.set(kv.getKey()+":"+value.get(kv.getValue()));
context.write(keytext, valone);
}
} // map()
} // class ColumnValueMap
public static class ColumnValueCombine extends Reducer {
static LongWritable longval = new LongWritable();
public void combine(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
Long count = 0L;
for (LongWritable val : values) {
count += val.get();
}
longval.set(count);
context.write(key, longval);
}
} // ColumnValueCombine()
// This reduce counts the number of records in each partition by
// taking the maximum of the row numbers. This reduce function is
public static class ColumnValueReduce extends
Reducer {
private static GenericRecordMetadata metadata;
private static GenericRecord genrec;
private static Text keytext = new Text();
private static boolean firsttime = false;
public void setup(Context context) {
String [] colnames = new String[] {"value", "count"};
metadata = new GenericRecordMetadata("summary", colnames);
genrec = new GenericRecord(metadata);
genrec.init();
genrec.set(colnames);
keytext.set("column");
firsttime = true;
} // setup()
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
if (firsttime) {
context.write(keytext, genrec);
firsttime = false;
}
Long count = 0L;
for (LongWritable val : values) {
count += val.get();
}
genrec.init();
String [] parts = key.toString().split(":");
genrec.set(0, parts[1]);
genrec.set(1, count.toString());
keytext.set(parts[0]);
context.write(keytext, genrec);
}
} // ColumnValueReduce()
private static GenericRecordMetadata getColumnValueToSummarizeMetadata ()
{
String [] colnames = new String[] {"value", "count"};
return new GenericRecordMetadata("summary", colnames);
} // getColumnValueToSummarizeMetadata()
public static class SummarizeMap extends Mapper {
static Text keytext = new Text("");
static GenericRecord genrec;
static GenericRecordMetadata metadata;
static boolean firsttime = false;
public void setup (Context context) {
metadata = getColumnValueToSummarizeMetadata();
genrec = new GenericRecord(metadata);
genrec.init();
} // setup()
public void map(Text key, GenericRecord value, Context context)
throws IOException, InterruptedException {
if (firsttime) {
//context.write(keytext, genrec);
firsttime = false;
}
keytext.set(value.get("column"));
genrec.set("value", value.get("value"));
genrec.set("count", value.get("count"));
context.write(keytext, genrec);
} // map()
} // class SummarizeMap
public static class SummarizeReduce
extends Reducer {
private static GenericRecordMetadata metadata;
private static GenericRecord genrec;
private static Text keytext = new Text("");
private static boolean firsttime = false;
private static GenericRecordMetadata inmetadata;
public void setup(Context context) {
String [] colnames = new String[] {"statistic", "value"};
metadata = new GenericRecordMetadata("statistics", colnames);
genrec = new GenericRecord(metadata);
genrec.init();
genrec.set(colnames);
firsttime = true;
inmetadata = getColumnValueToSummarizeMetadata();
} // setup()
private void setValue (String statname, String val, Text key, Context context) throws IOException, InterruptedException {
genrec.set("statistic", statname);
genrec.set("value", val);
context.write(key, genrec);
} // setValue()
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
if (firsttime) {
context.write(keytext, genrec);
firsttime = false;
}
long numrecords = 0L;
long numvalues = 0L;
int minlength = Integer.MAX_VALUE;
String minlengthval = "";
long minlengthnumat = 0;
int maxlength = Integer.MIN_VALUE;
String maxlengthval = "";
long maxlengthnumat = 0;
String minval = "", maxval = "";
String mode1 = "", mode2 = "", mode3 = "";
long mode1count = 0;
long mode2count = 0;
long mode3count = 0;
long minvalnumat = 0, maxvalnumat = 0;
boolean hasinteger = false;
long intminval = Long.MAX_VALUE;
long intmaxval = Long.MIN_VALUE;
long intrecordcount = 0;
long intvaluecount = 0;
long intminvalnumat = 0;
long intmaxvalnumat = 0;
double doubleminval = Double.MAX_VALUE;
double doublemaxval = Double.MIN_VALUE;
long doublerecordcount = 0;
long doublevaluecount = 0;
long doubleminvalnumat = 0;
long doublemaxvalnumat = 0;
long doublepositivecount = 0;
long doublenegativecount = 0;
long doublezerocount = 0;
double doublesum = 0;
for (GenericRecord val : values) {
val._setMetadata(inmetadata);
long thiscount = Long.parseLong(val.get("count"));
numvalues++;
numrecords += thiscount;
String valstr = val.get("value");
if ((valstr.compareTo(minval) < 0) || (minval.compareTo("") == 0)) {
minval = valstr;
minvalnumat = thiscount;
}
else if (valstr.compareTo(minval) == 0) {
minvalnumat += thiscount;
}
if ((valstr.compareTo(maxval) > 0) || (maxval.compareTo("") == 0)) {
maxval = valstr;
maxvalnumat = thiscount;
}
else if (valstr.compareTo(maxval) == 0) {
maxvalnumat += thiscount;
}
if (valstr.length() < minlength) {
minlength = valstr.length();
minlengthval = valstr;
minlengthnumat = thiscount;
}
else if (minlength == valstr.length()) {
minlengthnumat += thiscount;
}
if (valstr.length() > maxlength) {
maxlength = valstr.length();
maxlengthval = valstr;
maxlengthnumat = 1;
}
else if (maxlength == valstr.length()) {
maxlengthnumat += thiscount;
}
if ((mode1count == 0) || thiscount > mode1count) {
mode3 = mode2;
mode3count = mode2count;
mode2 = mode1;
mode2count = mode1count;
mode1count = thiscount;
mode1 = valstr;
}
else if ((mode2count == 0) || (thiscount > mode2count)) {
mode3 = mode2;
mode3count = mode2count;
mode2 = valstr;
mode2count = thiscount;
}
else if ((mode3count == 0) || (thiscount > mode3count)) {
mode3 = valstr;
mode3count = thiscount;
}
try {
long intval = Integer.parseInt(valstr);
hasinteger = true;
intvaluecount++;
intrecordcount += Long.parseLong(val.get("count"));
if (intval < intminval) {
intminval = intval;
intminvalnumat = thiscount;
}
else if (intval == intminval) {
intminvalnumat += thiscount;
}
if (intval > intmaxval) {
intmaxval = intval;
intmaxvalnumat = thiscount;
}
else if (intval == intmaxval) {
intmaxvalnumat += thiscount;
}
}
catch (Exception NumberFormatException) {
// we don't have to do anything here
hasinteger = false;
}
try {
double doubleval = Double.parseDouble(valstr);
doublevaluecount++;
doublerecordcount += thiscount;
if (doubleval < doubleminval) {
doubleminval = doubleval;
doubleminvalnumat = thiscount;
}
else if (doubleval == doubleminval) {
doubleminvalnumat += thiscount;
}
if (doubleval > doublemaxval) {
doublemaxval = doubleval;
doublemaxvalnumat = thiscount;
}
else if (doubleval == doublemaxval) {
doublemaxvalnumat += thiscount;
}
if (doubleval > 0) {
doublepositivecount += thiscount;
}
else if (doubleval < 0) {
doublenegativecount += thiscount;
}
else doublezerocount += thiscount;
doublesum += thiscount*doubleval;
}
catch (Exception NumberFormatException) {
// we don't have to do anything here
hasinteger = false;
}
}
setValue("numberOfRecords", Long.toString(numrecords), key, context);
setValue("numberOfValues", Long.toString(numvalues), key, context);
setValue("minimumValue", minval, key, context);
setValue("minimumValueCount", Long.toString(minvalnumat), key, context);
setValue("maximumValue", maxval, key, context);
setValue("maximumValueCount", Long.toString(maxvalnumat), key, context);
setValue("minimumLength", Integer.toString(minlength), key, context);
setValue("minimumLengthCount", Long.toString(minlengthnumat), key, context);
setValue("minimumLengthExample", minlengthval, key, context);
setValue("maximumLength", Integer.toString(maxlength), key, context);
setValue("maximumLengthCount", Long.toString(maxlengthnumat), key, context);
setValue("maximumLengthExample", maxlengthval, key, context);
setValue("mode1Value", mode1, key, context);
setValue("mode1Count", Long.toString(mode1count), key, context);
setValue("mode2Value", mode2, key, context);
setValue("mode2Count", Long.toString(mode2count), key, context);
setValue("mode3Value", mode3, key, context);
setValue("mode3Count", Long.toString(mode3count), key, context);
if (intrecordcount > 0) {
setValue("intNumberOfRecords", Long.toString(intrecordcount), key, context);
setValue("intNumberOfValues", Long.toString(intvaluecount), key, context);
setValue("intMinimumValue", Long.toString(intminval), key, context);
setValue("intMinimumValueCount", Long.toString(intminvalnumat), key, context);
setValue("intMaximumValue", Long.toString(intmaxval), key, context);
setValue("intMaximumValueCount", Long.toString(intmaxvalnumat), key, context);
}
if (doublerecordcount > 0) {
setValue("doubleNumberOfRecords", Long.toString(doublerecordcount), key, context);
setValue("doubleNumberOfValues", Long.toString(doublevaluecount), key, context);
setValue("doubleMinimumValue", Double.toString(doubleminval), key, context);
setValue("doubleMinimumValueCount", Long.toString(intminvalnumat), key, context);
setValue("doubleMaximumValue", Double.toString(doublemaxval), key, context);
setValue("doubleMaximumValueCount", Long.toString(intmaxvalnumat), key, context);
setValue("doublePositiveValueCount", Long.toString(doublepositivecount), key, context);
setValue("doubleZeroValueCount", Long.toString(doublezerocount), key, context);
setValue("doubleNegativeValueCount", Long.toString(doublenegativecount), key, context);
setValue("doubleAverage", Double.toString(doublesum / doublevaluecount), key, context);
}
} // reduce()
} // class SummarizeReduce
public static void main(String[] args) {
try {
Configuration conf = new Configuration();
FileSystem fs;
String columnvaluesummary = "characterize/tmp";
// Before running the job, delete the output files
fs = FileSystem.get(conf);
fs.delete(new Path(columnvaluesummary), true);
fs.delete(new Path(args[1]), true);
Job genrecjob = new Job(conf, "Split records into fields");
genrecjob.setJarByClass(Characterize.class);
genrecjob.setOutputKeyClass(Text.class);
genrecjob.setOutputValueClass(GenericRecord.class);
genrecjob.setMapOutputKeyClass(Text.class);
genrecjob.setMapOutputValueClass(LongWritable.class);
genrecjob.setInputFormatClass(GenericRecordInputFormat.class);
FileInputFormat.addInputPath(genrecjob, new Path(args[0]));
FileOutputFormat.setOutputPath(genrecjob, new Path(columnvaluesummary));
genrecjob.setMapperClass(Characterize.ColumnValueMap.class);
genrecjob.setCombinerClass(Characterize.ColumnValueCombine.class);
genrecjob.setReducerClass(Characterize.ColumnValueReduce.class);
genrecjob.waitForCompletion(true);
Job summaryjob = new Job(conf, "Summarize job");
summaryjob.setJarByClass(Characterize.class);
summaryjob.setMapOutputKeyClass(Text.class);
summaryjob.setMapOutputValueClass(GenericRecord.class);
summaryjob.setOutputKeyClass(Text.class);
summaryjob.setOutputValueClass(GenericRecord.class);
summaryjob.setInputFormatClass(GenericRecordInputFormat.class);
FileInputFormat.addInputPath(summaryjob, new Path(columnvaluesummary));
FileOutputFormat.setOutputPath(summaryjob, new Path(args[1]));
summaryjob.setMapperClass(Characterize.SummarizeMap.class);
summaryjob.setReducerClass(Characterize.SummarizeReduce.class);
summaryjob.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
} // main
} // class Characterize