/* To change this template, choose Tools | Templates
*/
package normalize;
import java.util.concurrent.Callable;
import java.io.*;
import java.net.*;
import java.util.zip.*;
import java.util.*;
import java.util.regex.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.*;
/**
*
* @author Gordon S. Linoff ([email protected])
* December 2009
*/
class GenericRecordRecordReader extends RecordReader {
private LineRecordReader lineReader = new LineRecordReader();
private Text lineKey = new Text("");
private GenericRecord lineValue;
private GenericRecordMetadata metadata;
private long RowNumber = 0;
private boolean reuseline = false;
private String splitter = "\t";
public GenericRecordRecordReader() {
} // GenericRecordRecordReader
public void initialize (InputSplit split, TaskAttemptContext context) throws IOException {
lineReader.initialize(split, context);
// The record format is stored in the context object or filename of the the first l
int numcolumns = context.getConfiguration().getInt("genericrecord.numcolumns", 0);
String [] colnames;
if (numcolumns > 0) {
colnames = new String[numcolumns];
for (int i = 0; i < numcolumns; i++) {
colnames[i] = context.getConfiguration().get("genericrecord.column."+i, "");
}
}
else {
lineReader.nextKeyValue();
splitter = context.getConfiguration().get("genericrecord.splitter", "\t");
colnames = lineReader.getCurrentValue().toString().split(splitter);
}
metadata = new GenericRecordMetadata("record", colnames);
lineValue = new GenericRecord(metadata);
// Skip the first line
// if the first value on the line does not match the column name, then
// assume the line contains real data.
//String val = lineReader.getCurrentValue().toString();
//reuseline = ! (val.startsWith(colnames[0]));
} // initialize()
public boolean nextKeyValue() throws IOException {
// get the next line
if (! reuseline) {
if (!lineReader.nextKeyValue()) {
return false;
}
}
else reuseline = false;
RowNumber++;
String[] pieces = lineReader.getCurrentValue().toString().split(splitter);
if (pieces.length != metadata.numColumns) {
throw new IOException("Invalid record received -- expected "+metadata.numColumns+" fields, got " + pieces.length + " on row "+RowNumber + " ("+pieces[0]+")");
}
// now that we know we'll succeed, overwrite the output objects
lineValue.init();
lineKey.set(Long.toString(RowNumber)); // objName is the output key.
lineValue.set(pieces);
return true;
} // nextKeyValue()
public Text createKey() {
return new Text("");
} // createKey()
public GenericRecord createValue() {
return new GenericRecord(metadata);
} // createValue()
public void close() throws IOException {
lineReader.close();
} // close()
public float getProgress() throws IOException {
return lineReader.getProgress();
} // getProgress()
@Override
public GenericRecord getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return this.lineValue;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return this.lineKey;
}
} // class GenericRecordRecordReader
public class GenericRecordInputFormat extends FileInputFormat {
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException {
return new GenericRecordRecordReader();
} // RecordReader
} // class GenericRecordInputFormat