/* * To change this template, choose Tools | Templates * and open the template in the editor. */ package demonormalize; import java.util.concurrent.Callable; import java.io.*; import java.net.*; import java.util.zip.*; import java.util.*; import java.util.regex.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.util.*; /** * * @author appadmin */ class ZipCensusRecordReader extends RecordReader { private LineRecordReader lineReader = new LineRecordReader(); private Text lineKey = new Text(""); private ZipCensus lineValue; private static int RowNumber = 0; public ZipCensusRecordReader() throws IOException { //lineReader = new LineRecordReader(conf, split); } // ZipCensusRecordReader public void initialize (InputSplit split, TaskAttemptContext context) throws IOException { lineReader.initialize(split, context); // Skip the first line lineReader.nextKeyValue(); } // initialize() public boolean nextKeyValue() throws IOException { // get the next line if (!lineReader.nextKeyValue()) { return false; } //lineKey = lineReader.getCurrentKey(); RowNumber++; String[] pieces = lineReader.getCurrentValue().toString().split("\t"); if (pieces.length != 113) { throw new IOException("Invalid record received -- expected 113 fields, got " + pieces.length + " on row "+RowNumber + " ("+pieces[0]+")"); } // now that we know we'll succeed, overwrite the output objects lineKey.set(pieces[0].trim()); // objName is the output key. int i = 0; lineValue = new ZipCensus(); lineValue.zipcode = pieces[i++].trim(); lineValue.state = pieces[i++].trim(); lineValue.population = (pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim())); i++; lineValue.housingunits = (pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim())); i++; lineValue.landareameters = Long.parseLong(pieces[i++].trim()); lineValue.waterareameters = Long.parseLong(pieces[i++].trim()); lineValue.landareamiles = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.waterareamiles = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.latitude = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.longitude = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.numstates = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++; lineValue.state1 = pieces[i++].trim(); lineValue.state2 = pieces[i++].trim(); lineValue.state3 = pieces[i++].trim(); lineValue.partpop1 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++; lineValue.partpop2 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++; lineValue.partpop3 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++; lineValue.pop = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++; lineValue.popedu = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++; lineValue.hh = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++; lineValue.fam = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++; lineValue.hhunits = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++; lineValue.hhuoccupied = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Integer.parseInt(pieces[i].trim()); i++; lineValue.purban = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.pinurbanarea = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.pinurbancluster = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.prural = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.pruralfarm = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.pruralnonfarm = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.pracewhite = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.praceblack = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.praceamerind = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.praceasian = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.pracepacind = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.praceother = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.pracemulti = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson1 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson1m = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson1f = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2pl = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2family = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2married = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2marriedkids = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2marriedonly = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2other = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2m = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2mkids = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2mnokids = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2f = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2fkids = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2fnokids = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2nonfamily = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2mnonfamily = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhperson2fnonfamily = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.popedunone = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.popedunohs = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.popedusomehs = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.popeduhsgrad = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.popedusomecol = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.popeduassoc = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.popedubach = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.popedumast = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.popeduprofdoct = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhmedincome = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhssincome = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhnossincome = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhpubassist = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhnopubassist = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc000_010 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc010_015 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc015_020 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc020_025 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc025_030 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc030_035 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc035_040 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc040_045 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc045_050 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc050_060 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc060_075 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc075_100 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc100_125 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc125_150 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc150_200 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.faminc200 = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.fammedincome = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuowner = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuorenter = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuo1per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuo2per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuo3per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuo4per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuo5per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuo6per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuo7per = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhumedianyear = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuofuelutilgas = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuofuelbotgas = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuofuelelec = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuofueloil = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuofuelcoal = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuofuelwood = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuofuelsolar = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuofuelother = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuofuelnone = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuoplumbingcomplete = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhuoplumbinglacking = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhumediancashrent = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhunmarried = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhunmarriedmm = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhunmarriedmf = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhunmarriedff = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhunmarriedfm = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; lineValue.hhother = pieces[i].trim().compareToIgnoreCase("null") == 0 ? 0 : Double.parseDouble(pieces[i].trim()); i++; return true; } // class ObjPosRecordReader public Text createKey() { return new Text(""); } // createKey() public ZipCensus createValue() { return new ZipCensus(); } // createValue() public void close() throws IOException { lineReader.close(); } // close() public float getProgress() throws IOException { return lineReader.getProgress(); } // getProgress() @Override public ZipCensus getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return this.lineValue; } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return this.lineKey; } } // class ZipCensusRecordReader public class ZipCensusInputFormat extends FileInputFormat { public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { return new ZipCensusRecordReader(); } // RecordReader } // class ZipCensusInputFormat