/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */

package demonormalize;
import java.util.concurrent.Callable;
import java.io.*;
import java.net.*;
import java.util.zip.*;
import java.util.*;
import java.util.regex.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.*;

/**
 *
 * @author appadmin
 */


class ZipCensusRecordReader extends RecordReader<Text, ZipCensus> {

    private LineRecordReader lineReader = new LineRecordReader();
    private Text lineKey = new Text("");
    private ZipCensus lineValue;
    private static int RowNumber = 0;

    public ZipCensusRecordReader() throws IOException {
        //lineReader = new LineRecordReader(conf, split);

    }  // ZipCensusRecordReader
    
    public void initialize (InputSplit split, TaskAttemptContext context) throws IOException {
    	lineReader.initialize(split, context);
    	
        // Skip the first line
        lineReader.nextKeyValue();
    }  // initialize()


    public boolean nextKeyValue() throws IOException {
        // get the next line
        if (!lineReader.nextKeyValue()) {
            return false;
        }
        //lineKey = lineReader.getCurrentKey();
         RowNumber++;
        String[] pieces = lineReader.getCurrentValue().toString().split("\t");
        if (pieces.length != 113) {
            throw new IOException("Invalid record received -- expected 113 fields, got " + pieces.length + " on row "+RowNumber + " ("+pieces[0]+")");
        }

        // now that we know we'll succeed, overwrite the output objects

        lineKey.set(pieces[0].trim()); // objName is the output key.
        int i = 0;
        lineValue = new ZipCensus();
        lineValue.zipcode = pieces[i++].trim();
        lineValue.state = pieces[i++].trim();
        lineValue.population = (pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim())); i++;
        lineValue.housingunits = (pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim())); i++;
        lineValue.landareameters = Long.parseLong(pieces[i++].trim());
        lineValue.waterareameters = Long.parseLong(pieces[i++].trim());
        lineValue.landareamiles = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.waterareamiles = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.latitude = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.longitude = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.numstates = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++;
        lineValue.state1 = pieces[i++].trim();
        lineValue.state2 = pieces[i++].trim();
        lineValue.state3 = pieces[i++].trim();
        lineValue.partpop1 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++;
        lineValue.partpop2 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++;
        lineValue.partpop3 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++;
        lineValue.pop = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++;
        lineValue.popedu = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++;
        lineValue.hh = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++;
        lineValue.fam = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++;
        lineValue.hhunits = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++;
        lineValue.hhuoccupied = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++;
        lineValue.purban = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.pinurbanarea = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.pinurbancluster = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.prural = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.pruralfarm = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.pruralnonfarm = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.pracewhite = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.praceblack = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.praceamerind = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.praceasian = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.pracepacind = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.praceother = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.pracemulti = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson1 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson1m = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson1f = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2pl = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2family = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2married = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2marriedkids = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2marriedonly = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2other = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2m = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2mkids = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2mnokids = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2f = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2fkids = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2fnokids = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2nonfamily = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2mnonfamily = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhperson2fnonfamily = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.popedunone = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.popedunohs = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.popedusomehs = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.popeduhsgrad = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.popedusomecol = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.popeduassoc = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.popedubach = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.popedumast = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.popeduprofdoct = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhmedincome = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhssincome = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhnossincome = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhpubassist = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhnopubassist = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc000_010 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc010_015 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc015_020 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc020_025 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc025_030 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc030_035 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc035_040 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc040_045 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc045_050 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc050_060 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc060_075 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc075_100 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc100_125 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc125_150 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc150_200 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.faminc200 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.fammedincome = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuowner = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuorenter = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuo1per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuo2per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuo3per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuo4per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuo5per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuo6per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuo7per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhumedianyear = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuofuelutilgas = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuofuelbotgas = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuofuelelec = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuofueloil = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuofuelcoal = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuofuelwood = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuofuelsolar = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuofuelother = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuofuelnone = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuoplumbingcomplete = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhuoplumbinglacking = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhumediancashrent = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhunmarried = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhunmarriedmm = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhunmarriedmf = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhunmarriedff = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhunmarriedfm = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        lineValue.hhother = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++;
        return true;
    }  // class ObjPosRecordReader

    public Text createKey() {
        return new Text("");
    }  // createKey()

    public ZipCensus createValue() {
        return new ZipCensus();
    } // createValue()
    
    public void close() throws IOException {
        lineReader.close();
    }  // close()

    public float getProgress() throws IOException {
        return lineReader.getProgress();
    }  // getProgress()


	@Override
	public ZipCensus getCurrentValue() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return this.lineValue;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return this.lineKey;
	}
}  // class ZipCensusRecordReader

public class ZipCensusInputFormat extends FileInputFormat<Text, ZipCensus> {

    public RecordReader<Text, ZipCensus> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException {

        return new ZipCensusRecordReader();
    }  // RecordReader<Text, ZipCensus>
}  // class ZipCensusInputFormat


