/* To change this template, choose Tools | Templates
 */
package normalize;
import java.util.concurrent.Callable;
import java.io.*;
import java.net.*;
import java.util.zip.*;
import java.util.*;
import java.util.regex.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.*;

/**
*
* @author Gordon S. Linoff (gordon@data-miners.com)
* December 2009
*/

class GenericRecordRecordReader extends RecordReader<Text, GenericRecord> {
    private LineRecordReader lineReader = new LineRecordReader();
    private Text lineKey = new Text("");
    private GenericRecord lineValue;
    private GenericRecordMetadata metadata;
    private long RowNumber = 0;
    private boolean reuseline = false;
    private String splitter = "\t";

    public GenericRecordRecordReader() {
    }  // GenericRecordRecordReader
    
    public void initialize (InputSplit split, TaskAttemptContext context) throws IOException {
    	lineReader.initialize(split, context);
    	// The record format is stored in the context object or filename of the the first l
    	int numcolumns = context.getConfiguration().getInt("genericrecord.numcolumns", 0);
    	String [] colnames;
    	if (numcolumns > 0) {
    		colnames = new String[numcolumns];
    		for (int i = 0; i < numcolumns; i++) {
    			colnames[i] = context.getConfiguration().get("genericrecord.column."+i, "");
    		}
    	}
    	else {
            lineReader.nextKeyValue();
    		splitter = context.getConfiguration().get("genericrecord.splitter", "\t");
            colnames = lineReader.getCurrentValue().toString().split(splitter);
    	}
    	metadata = new GenericRecordMetadata("record", colnames);
    	lineValue = new GenericRecord(metadata);
    	
        // Skip the first line

        // if the first value on the line does not match the column name, then
        // assume the line contains real data.
        //String val = lineReader.getCurrentValue().toString(); 
        //reuseline = ! (val.startsWith(colnames[0]));
    }  // initialize()


    public boolean nextKeyValue() throws IOException {
        // get the next line
    	if (! reuseline) {
    		if (!lineReader.nextKeyValue()) {
    			return false;
    		}
    	}
    	else reuseline = false;
        RowNumber++;
        String[] pieces = lineReader.getCurrentValue().toString().split(splitter);
        if (pieces.length != metadata.numColumns) {
            throw new IOException("Invalid record received -- expected "+metadata.numColumns+" fields, got " + pieces.length + " on row "+RowNumber + " ("+pieces[0]+")");
        }

        // now that we know we'll succeed, overwrite the output objects

        lineValue.init();
        lineKey.set(Long.toString(RowNumber)); // objName is the output key.
        lineValue.set(pieces);
        return true;
    }  // nextKeyValue()

    public Text createKey() {
        return new Text("");
    }  // createKey()

    public GenericRecord createValue() {
        return new GenericRecord(metadata);
    } // createValue()
    
    public void close() throws IOException {
        lineReader.close();
    }  // close()

    public float getProgress() throws IOException {
        return lineReader.getProgress();
    }  // getProgress()


	@Override
	public GenericRecord getCurrentValue() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return this.lineValue;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return this.lineKey;
	}
}  // class GenericRecordRecordReader

public class GenericRecordInputFormat extends FileInputFormat<Text, GenericRecord> {

    public RecordReader<Text, GenericRecord> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException {

        return new GenericRecordRecordReader();
    }  // RecordReader<Text, ZipCensus>
}  // class GenericRecordInputFormat


