package characterize; /** * * @author Gordon S. Linoff (gordon@data-miners.com) * December 2009 */ import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Characterize { public static class ColumnValueMap extends Mapper { static Text keytext = new Text(""); static final LongWritable valone = new LongWritable(1); public void map(Text key, GenericRecord value, Context context) throws IOException, InterruptedException { for (Map.Entry kv : value.getMetadata().entrySet()) { keytext.set(kv.getKey()+":"+value.get(kv.getValue())); context.write(keytext, valone); } } // map() } // class ColumnValueMap public static class ColumnValueCombine extends Reducer { static LongWritable longval = new LongWritable(); public void combine(Text key, Iterable values, Context context) throws IOException, InterruptedException { Long count = 0L; for (LongWritable val : values) { count += val.get(); } longval.set(count); context.write(key, longval); } } // ColumnValueCombine() // This reduce counts the number of records in each partition by // taking the maximum of the row numbers. This reduce function is public static class ColumnValueReduce extends Reducer { private static GenericRecordMetadata metadata; private static GenericRecord genrec; private static Text keytext = new Text(); private static boolean firsttime = false; public void setup(Context context) { String [] colnames = new String[] {"value", "count"}; metadata = new GenericRecordMetadata("summary", colnames); genrec = new GenericRecord(metadata); genrec.init(); genrec.set(colnames); keytext.set("column"); firsttime = true; } // setup() public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { if (firsttime) { context.write(keytext, genrec); firsttime = false; } Long count = 0L; for (LongWritable val : values) { count += val.get(); } genrec.init(); String [] parts = key.toString().split(":"); genrec.set(0, parts[1]); genrec.set(1, count.toString()); keytext.set(parts[0]); context.write(keytext, genrec); } } // ColumnValueReduce() private static GenericRecordMetadata getColumnValueToSummarizeMetadata () { String [] colnames = new String[] {"value", "count"}; return new GenericRecordMetadata("summary", colnames); } // getColumnValueToSummarizeMetadata() public static class SummarizeMap extends Mapper { static Text keytext = new Text(""); static GenericRecord genrec; static GenericRecordMetadata metadata; static boolean firsttime = false; public void setup (Context context) { metadata = getColumnValueToSummarizeMetadata(); genrec = new GenericRecord(metadata); genrec.init(); } // setup() public void map(Text key, GenericRecord value, Context context) throws IOException, InterruptedException { if (firsttime) { //context.write(keytext, genrec); firsttime = false; } keytext.set(value.get("column")); genrec.set("value", value.get("value")); genrec.set("count", value.get("count")); context.write(keytext, genrec); } // map() } // class SummarizeMap public static class SummarizeReduce extends Reducer { private static GenericRecordMetadata metadata; private static GenericRecord genrec; private static Text keytext = new Text(""); private static boolean firsttime = false; private static GenericRecordMetadata inmetadata; public void setup(Context context) { String [] colnames = new String[] {"statistic", "value"}; metadata = new GenericRecordMetadata("statistics", colnames); genrec = new GenericRecord(metadata); genrec.init(); genrec.set(colnames); firsttime = true; inmetadata = getColumnValueToSummarizeMetadata(); } // setup() private void setValue (String statname, String val, Text key, Context context) throws IOException, InterruptedException { genrec.set("statistic", statname); genrec.set("value", val); context.write(key, genrec); } // setValue() public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { if (firsttime) { context.write(keytext, genrec); firsttime = false; } long numrecords = 0L; long numvalues = 0L; int minlength = Integer.MAX_VALUE; String minlengthval = ""; long minlengthnumat = 0; int maxlength = Integer.MIN_VALUE; String maxlengthval = ""; long maxlengthnumat = 0; String minval = "", maxval = ""; String mode1 = "", mode2 = "", mode3 = ""; long mode1count = 0; long mode2count = 0; long mode3count = 0; long minvalnumat = 0, maxvalnumat = 0; boolean hasinteger = false; long intminval = Long.MAX_VALUE; long intmaxval = Long.MIN_VALUE; long intrecordcount = 0; long intvaluecount = 0; long intminvalnumat = 0; long intmaxvalnumat = 0; double doubleminval = Double.MAX_VALUE; double doublemaxval = Double.MIN_VALUE; long doublerecordcount = 0; long doublevaluecount = 0; long doubleminvalnumat = 0; long doublemaxvalnumat = 0; long doublepositivecount = 0; long doublenegativecount = 0; long doublezerocount = 0; double doublesum = 0; for (GenericRecord val : values) { val._setMetadata(inmetadata); long thiscount = Long.parseLong(val.get("count")); numvalues++; numrecords += thiscount; String valstr = val.get("value"); if ((valstr.compareTo(minval) < 0) || (minval.compareTo("") == 0)) { minval = valstr; minvalnumat = thiscount; } else if (valstr.compareTo(minval) == 0) { minvalnumat += thiscount; } if ((valstr.compareTo(maxval) > 0) || (maxval.compareTo("") == 0)) { maxval = valstr; maxvalnumat = thiscount; } else if (valstr.compareTo(maxval) == 0) { maxvalnumat += thiscount; } if (valstr.length() < minlength) { minlength = valstr.length(); minlengthval = valstr; minlengthnumat = thiscount; } else if (minlength == valstr.length()) { minlengthnumat += thiscount; } if (valstr.length() > maxlength) { maxlength = valstr.length(); maxlengthval = valstr; maxlengthnumat = 1; } else if (maxlength == valstr.length()) { maxlengthnumat += thiscount; } if ((mode1count == 0) || thiscount > mode1count) { mode3 = mode2; mode3count = mode2count; mode2 = mode1; mode2count = mode1count; mode1count = thiscount; mode1 = valstr; } else if ((mode2count == 0) || (thiscount > mode2count)) { mode3 = mode2; mode3count = mode2count; mode2 = valstr; mode2count = thiscount; } else if ((mode3count == 0) || (thiscount > mode3count)) { mode3 = valstr; mode3count = thiscount; } try { long intval = Integer.parseInt(valstr); hasinteger = true; intvaluecount++; intrecordcount += Long.parseLong(val.get("count")); if (intval < intminval) { intminval = intval; intminvalnumat = thiscount; } else if (intval == intminval) { intminvalnumat += thiscount; } if (intval > intmaxval) { intmaxval = intval; intmaxvalnumat = thiscount; } else if (intval == intmaxval) { intmaxvalnumat += thiscount; } } catch (Exception NumberFormatException) { // we don't have to do anything here hasinteger = false; } try { double doubleval = Double.parseDouble(valstr); doublevaluecount++; doublerecordcount += thiscount; if (doubleval < doubleminval) { doubleminval = doubleval; doubleminvalnumat = thiscount; } else if (doubleval == doubleminval) { doubleminvalnumat += thiscount; } if (doubleval > doublemaxval) { doublemaxval = doubleval; doublemaxvalnumat = thiscount; } else if (doubleval == doublemaxval) { doublemaxvalnumat += thiscount; } if (doubleval > 0) { doublepositivecount += thiscount; } else if (doubleval < 0) { doublenegativecount += thiscount; } else doublezerocount += thiscount; doublesum += thiscount*doubleval; } catch (Exception NumberFormatException) { // we don't have to do anything here hasinteger = false; } } setValue("numberOfRecords", Long.toString(numrecords), key, context); setValue("numberOfValues", Long.toString(numvalues), key, context); setValue("minimumValue", minval, key, context); setValue("minimumValueCount", Long.toString(minvalnumat), key, context); setValue("maximumValue", maxval, key, context); setValue("maximumValueCount", Long.toString(maxvalnumat), key, context); setValue("minimumLength", Integer.toString(minlength), key, context); setValue("minimumLengthCount", Long.toString(minlengthnumat), key, context); setValue("minimumLengthExample", minlengthval, key, context); setValue("maximumLength", Integer.toString(maxlength), key, context); setValue("maximumLengthCount", Long.toString(maxlengthnumat), key, context); setValue("maximumLengthExample", maxlengthval, key, context); setValue("mode1Value", mode1, key, context); setValue("mode1Count", Long.toString(mode1count), key, context); setValue("mode2Value", mode2, key, context); setValue("mode2Count", Long.toString(mode2count), key, context); setValue("mode3Value", mode3, key, context); setValue("mode3Count", Long.toString(mode3count), key, context); if (intrecordcount > 0) { setValue("intNumberOfRecords", Long.toString(intrecordcount), key, context); setValue("intNumberOfValues", Long.toString(intvaluecount), key, context); setValue("intMinimumValue", Long.toString(intminval), key, context); setValue("intMinimumValueCount", Long.toString(intminvalnumat), key, context); setValue("intMaximumValue", Long.toString(intmaxval), key, context); setValue("intMaximumValueCount", Long.toString(intmaxvalnumat), key, context); } if (doublerecordcount > 0) { setValue("doubleNumberOfRecords", Long.toString(doublerecordcount), key, context); setValue("doubleNumberOfValues", Long.toString(doublevaluecount), key, context); setValue("doubleMinimumValue", Double.toString(doubleminval), key, context); setValue("doubleMinimumValueCount", Long.toString(intminvalnumat), key, context); setValue("doubleMaximumValue", Double.toString(doublemaxval), key, context); setValue("doubleMaximumValueCount", Long.toString(intmaxvalnumat), key, context); setValue("doublePositiveValueCount", Long.toString(doublepositivecount), key, context); setValue("doubleZeroValueCount", Long.toString(doublezerocount), key, context); setValue("doubleNegativeValueCount", Long.toString(doublenegativecount), key, context); setValue("doubleAverage", Double.toString(doublesum / doublevaluecount), key, context); } } // reduce() } // class SummarizeReduce public static void main(String[] args) { try { Configuration conf = new Configuration(); FileSystem fs; String columnvaluesummary = "characterize/tmp"; // Before running the job, delete the output files fs = FileSystem.get(conf); fs.delete(new Path(columnvaluesummary), true); fs.delete(new Path(args[1]), true); Job genrecjob = new Job(conf, "Split records into fields"); genrecjob.setJarByClass(Characterize.class); genrecjob.setOutputKeyClass(Text.class); genrecjob.setOutputValueClass(GenericRecord.class); genrecjob.setMapOutputKeyClass(Text.class); genrecjob.setMapOutputValueClass(LongWritable.class); genrecjob.setInputFormatClass(GenericRecordInputFormat.class); FileInputFormat.addInputPath(genrecjob, new Path(args[0])); FileOutputFormat.setOutputPath(genrecjob, new Path(columnvaluesummary)); genrecjob.setMapperClass(Characterize.ColumnValueMap.class); genrecjob.setCombinerClass(Characterize.ColumnValueCombine.class); genrecjob.setReducerClass(Characterize.ColumnValueReduce.class); genrecjob.waitForCompletion(true); Job summaryjob = new Job(conf, "Summarize job"); summaryjob.setJarByClass(Characterize.class); summaryjob.setMapOutputKeyClass(Text.class); summaryjob.setMapOutputValueClass(GenericRecord.class); summaryjob.setOutputKeyClass(Text.class); summaryjob.setOutputValueClass(GenericRecord.class); summaryjob.setInputFormatClass(GenericRecordInputFormat.class); FileInputFormat.addInputPath(summaryjob, new Path(columnvaluesummary)); FileOutputFormat.setOutputPath(summaryjob, new Path(args[1])); summaryjob.setMapperClass(Characterize.SummarizeMap.class); summaryjob.setReducerClass(Characterize.SummarizeReduce.class); summaryjob.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } } // main } // class Characterize