/* To change this template, choose Tools | Templates */ package normalize; import java.util.concurrent.Callable; import java.io.*; import java.net.*; import java.util.zip.*; import java.util.*; import java.util.regex.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.util.*; /** * * @author Gordon S. Linoff (gordon@data-miners.com) * December 2009 */ class GenericRecordRecordReader extends RecordReader { private LineRecordReader lineReader = new LineRecordReader(); private Text lineKey = new Text(""); private GenericRecord lineValue; private GenericRecordMetadata metadata; private long RowNumber = 0; private boolean reuseline = false; private String splitter = "\t"; public GenericRecordRecordReader() { } // GenericRecordRecordReader public void initialize (InputSplit split, TaskAttemptContext context) throws IOException { lineReader.initialize(split, context); // The record format is stored in the context object or filename of the the first l int numcolumns = context.getConfiguration().getInt("genericrecord.numcolumns", 0); String [] colnames; if (numcolumns > 0) { colnames = new String[numcolumns]; for (int i = 0; i < numcolumns; i++) { colnames[i] = context.getConfiguration().get("genericrecord.column."+i, ""); } } else { lineReader.nextKeyValue(); splitter = context.getConfiguration().get("genericrecord.splitter", "\t"); colnames = lineReader.getCurrentValue().toString().split(splitter); } metadata = new GenericRecordMetadata("record", colnames); lineValue = new GenericRecord(metadata); // Skip the first line // if the first value on the line does not match the column name, then // assume the line contains real data. //String val = lineReader.getCurrentValue().toString(); //reuseline = ! (val.startsWith(colnames[0])); } // initialize() public boolean nextKeyValue() throws IOException { // get the next line if (! reuseline) { if (!lineReader.nextKeyValue()) { return false; } } else reuseline = false; RowNumber++; String[] pieces = lineReader.getCurrentValue().toString().split(splitter); if (pieces.length != metadata.numColumns) { throw new IOException("Invalid record received -- expected "+metadata.numColumns+" fields, got " + pieces.length + " on row "+RowNumber + " ("+pieces[0]+")"); } // now that we know we'll succeed, overwrite the output objects lineValue.init(); lineKey.set(Long.toString(RowNumber)); // objName is the output key. lineValue.set(pieces); return true; } // nextKeyValue() public Text createKey() { return new Text(""); } // createKey() public GenericRecord createValue() { return new GenericRecord(metadata); } // createValue() public void close() throws IOException { lineReader.close(); } // close() public float getProgress() throws IOException { return lineReader.getProgress(); } // getProgress() @Override public GenericRecord getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return this.lineValue; } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return this.lineKey; } } // class GenericRecordRecordReader public class GenericRecordInputFormat extends FileInputFormat { public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { return new GenericRecordRecordReader(); } // RecordReader } // class GenericRecordInputFormat