Saturday, January 9, 2010

Hadoop and Parallel Dataflow Programming

Over the past three months, I have been teaching myself enough Hadoop to get comfortable with using the environment for analytic purposes.

There has been a lot of commentary about Hadoop/MapReduce versus relational databases (such as the articles referenced in my previous post on the subject). I actually think this discussion is misplaced because comparing open-source software with commercial software aligns people on "religious" grounds. Some people will like anything that is open-source. Some people will attack anything that is open-source (especially people who work for commercial software vendors). And, the merits of real differences get lost. Both Hadoop and relational databases are powerful systems for analyzing data, and each has its own distinct set of advantages and disadvantages.

Instead, I think that Hadoop should be compared to a parallel dataflow style of programming. What is a dataflow style of programming? It is a style where we watch the data flow through different operations, forking and combining along the way, to achieve the desired goal. Not only is a dataflow a good way to understand relational databases (which is why I introduce it in Chapter 1 of Data Analysis Using SQL and Excel), but the underlying engines that run SQL queries are dataflow engines.

Parallel dataflows extend dataflow processing to grid computing. To my knowledge, the first commercial tool that implements parallel dataflows was developed by Ab Initio. This company was a spin-off from a bleeding edge parallel supercomputer vendor called Thinking Machines that went bankrupt in 1994. As a matter of full disclosure: Ab Initio was actually formed from the group that I worked for at Thinking Machines. Although they are very, very, very resistant to sharing information about their technology, I am rather familiar it. I believe that the only publicly available information about them (including screen shots) is published in our book Mastering Data Mining: The Art and Science of Customer Relationship Management.

I am confident that Apache has at least one dataflow project, since when I google "dataflow apache" I get a pointer to the Dapper project. My wish, however, is that Hadoop were the parallel dataflow project.

Much of what Hadoop does goes unheralded by the typical MapReduce user. On a massively parallel system, Hadoop keeps track of the different parts of an HDFS file and, when the file is being used for processing, Hadoop does its darndest to keep the processing local to each file part being processed. This is great, since data locality is key to achieving good performance.

Hadoop also keeps track of which processors and disk systems are working. When there is a failure, Hadoop tries again, insulating the user from sporadic hardware faults.

Hadoop also does a pretty good job of shuffling data around, between the map and reduce operations. The shuffling method -- sorting, send, and sort again -- may not be the most efficient but it is quite general.

Alas, there are several things that Hadoop does not do, at least when accessed through the MapReduce interface. Supporting these features would allow it move beyond the MapReduce paradigm, giving it the power to support more general parallel dataflow constructs.

The first thing that bothers me about Hadoop is that I cannot easily take a text file and just copy it with the Map/Reduce primitives. Copying a file seems like something that should be easy. The problem is that a key gets generated during the map processing. The original data gets output with a key prepended, unless I do a lot of work to parse out the first field and use it as a key.

Could the context.write() function be overloaded with a version that does not output a key? Perhaps this would only be possible in the reduce phase, since I understand the importance of the key for going from map to reduce.

A performance issue with Hadoop is the shuffle phase between the map and the reduce. As I mentioned earlier, the sort-send-sort process is quite general. Alas, though, it requires a lot of work. An alternative that often works well is simply hashing. To maintain the semantics of map-reduce, I think this would be hash-send-combine or hash-send-sort. The beauty of using hashing is that the data can be sent to its destination while the map is still processing it. This allows concurrent use of the processing and network during this operation.

And, speaking of performance, why does the key have to go before the data? Why can't I just point to a sequence of bytes and use that for the key? This would enable a programming style that doesn't spend so much time parsing keys and duplicating information between values and keys.

Perhaps the most frustrating aspect of Hadoop is the MapReduce framework itself. The current version allows processing like (M+)(R)(M*). What this notation means is that the processing starts with one or more map jobs, goes to a reduce, and continues with zero or more map jobs.

THIS IS NOT GENERAL ENOUGH! I would like to have an arbitrary number of maps and reduces connected however I like. So, one map could feed two different reduces, each having different keys. At the same time, one of the reduces could feed another reduce without having to go through an intermediate map phase.

This would be a big step toward parallel dataflow parallel programming, since Map and Reduce are two very powerful primitives for this purpose.

There are some other primitives that might be useful. One would be broadcast. This would take the output from one processing node during one phase and send it to all the other nodes (in the next phase). Let's just say that using broadcast, it would be much easier to send variables around for processing. No more defining weird variables using "set" in the main program, and then parsing them in setup() functions. No more setting up temporary storage space, shared by all the processors. No more using HDFS to store small serial files, local to only one node. Just send data through a broadcast, and it goes everywhere. (If the broadcast is running on more than one node, then the results would be concatenated together, everywhere.)

And, if I had a broadcast, then my two-pass row number code (here) would only require one pass.

I think Hadoop already supports having multiple different input files into one reduce operator. This is quite powerful, and a much superior way of handling join processing.

It would also be nice to have a final sort operator. In the real world, people often do want sorted results.

In conclusion, parallel dataflows are a very powerful, expressive, and efficient way of implementing complex data processing tasks. Relational databases use dataflow engines for their processing. Using non-procedural languages such as SQL, the power of dataflows are hidden from the user -- and, some relatively simple dataflow constructs can be quite difficult to express in SQL.

Hadoop is a powerful system that emulates parallel dataflow programming. Any step in a dataflow can be implemented using a MapReduce pass -- but this requires reading, writing, sorting, and sending the data multiple times. With a few more features, Hadoop could efficiently implement parallel dataflows. I feel this would be a big boost to both performance and utility, and it would leverage the power already provided by the Hadoop framework.

Labels: , , ,

Tuesday, January 5, 2010

MapReduce versus Relational Databases?

The current issue of Communications of the ACM has articles on MapReduce and relational databases. One, MapReduce a Flexible Data Processing Tool, explains the utility of MapReduce by two Google fellows -- appropriate authors, since Google invented the parallel MapReduce paradigm.

The second article, MapReduce and Parallel DBMSs: Friend or Foe, is written by a team of authors, with Michael Stonebraker listed as the first author. I am uncomfortable with this article, because the article purports to show the superiority of a particular database system, Vertica, without mentioning -- anywhere -- that Michael Stonebraker is listed as the CTO and Co-Founder on Vertica's web site. For this reason, I believe that this article should be subject to much more scrutiny.

Before starting, let me state that I personally have no major relationships with any of the database vendors or with companies in the Hadoop/MapReduce space. I am an advocate of using relational databases for data analysis and have written a book called Data Analysis Using SQL and Excel. And, over the past three months, I have been learning Hadoop and MapReduce, as attested to by numerous blog postings on the subject. Perhaps because I am a graduate of MIT ('85), I am upset that Michael Stonebraker uses his MIT affiliation for this article, without mentioning his Vertica affiliation.

The first thing I notice about the article is the number of references to Vertica. In the main text, I count nine references to Vertica, as compared to thirteen mentions of other databases:
  • Aster (twice)
  • DataAllegro (once)
  • DB2 (twice)
  • Greenplum (twice)
  • Netezza (once)
  • ParAccel (once)
  • PostgreSQL (once)
  • SQL Server (once)
  • Teradata (once)
The paper describes a study which compares Vertica, another database, and Hadoop on various tasks. The paper never explains how these databases were chosen for this purpose. Configuration issues for the other database and Hadoop are mentioned. The configuration and installation of Vertica -- by the absence of problems -- one assumes is easy and smooth. I have not (yet) read the paper cited, which describes the work in more detail.

Also, the paper never describes costs for the different system, which is a primary driver of MapReduce. The software is free and runs on cheap clusters of computers, rather than expensive servers and hardware. For a given amount of money, MapReduce may provide a much faster solution, since it can support much larger hardware environments.

The paper never describes issues in the loading of data. I assume this is a significant cost for the databases. Loading the data for Hadoop is much simpler . . . since it just reads text files, which is a common format.

From what I can gather, the database systems were optimized specifically for the tasks at hand, although this is not explicitly mentioned anywhere. For instance, the second tasks is a GROUP BY, and I suspect that the data is hash partitioned by the GROUP BY clause.

There are a few statements that I basically disagree with.

"Lastly, the reshuffle that occurs between the Map and Reduce tasks in MR is equivalent to a GROUP BY operation in SQL." The issue here at first seems like a technicality. In a relational database, an input row can only into one group. MR can output multiple records in the map stage, so a single row can go into multiple "groups". This functionality is important for the word count example, which is the canonical MapReduce example. I find it interesting that this example is not included in the benchmark.

"Given this, parallel DBMSs provide the same computing model as MR, with the added benefit of using a declarative language (SQL)." This is not true in several respects. First, MapReduce does have associated projects for supporting declarative languages. Second, in order for SQL to support the level of functionality that the authors claim, they need to use user defined functions. Is that syntax declarative?

More importantly, though, is that the computing model really is not exactly the same. Well, with SQL extensions such as GROUPING SETs and window functions, the functionality does come close. But, consider the ways that you can add a row number to data (assuming that you have no row number function built-in) using MapReduce versus traditional SQL. Using MapReduce you can follow the two-phase program that I described in an earlier posting. With traditional SQL, you have to do a non-equi-self join. MapReduce has a much richer set of built-in functions and capabilities, simply because it uses java, an established programming language with many libraries.

On the other hand, MapReduce does not have a concept of "null" built-in (although users can define their own data types and semantics). And, MapReduce handles non-equijoins poorly, because the key is used to direct both tables to the same node. In effect, you have to limit the MapReduce job to one node. SQL can still parallelize such queries.

"[MapReduce] still requires user code to parse the value portion of the record if it contains multiple attributes." Well, parse is the wrong term, since a Writable class supports binary representations of data types. I describe how to create such types here.

I don't actually feel qualified to comment on many of the operational aspects of optimizing Hadoop code. I do note that the authors do not explain the main benefit of Vertica, which is the support of column partitioning. Each column is stored separate, which makes it possible to apply very strong compression algorithms to the data. In many cases, the Vertica data will fit in memory. This is a huge performance boost (and one that another vendor, Paracel takes advantage of).

In the end, the benchmark may be comparing the in-memory performance of a database to general performance for MapReduce. The benchmark may not be including the ETL time for loading the data, partitioning data, and building indexes. The benchmark may not have allocated optimal numbers of map and reduce jobs for the purpose. And, it is possible that the benchmark is unbiased and relational databases really are better.

A paper that leaves out the affiliations between its authors and the vendors used for a benchmark is only going to invite suspicion.

Labels: , ,

Saturday, January 2, 2010

Hadoop and MapReduce: Normalizing Data Structures

To set out to learn Hadoop and Map/Reduce, I tackled several different problems. The last of these problems is the challenge of normalizing data, a concept from the world of relational databases. The earlier problems were adding sequential row numbers and characterizing values in the data.

This posting describes data normalization, explains how I accomplished it in Hadoop/MapReduce, and some tricks in the code. I should emphasize here that the code is really "demonstration" code, meaning that I have not worked hard on being sure that it always works. My purpose is to demonstrate the idea of using Hadoop to do normalization, rather than producing 100% working code.


What is Normalization and Why Do Want To Do It?

Data normalization is the process of extracting values from a single column and placing them in a reference table. The data used by Hadoop is typically unnormalized, meaning that data used in processing is in a single record, so there is no need to join in reference tables. In fact, doing a join is not obvious using the MapReduce primitives, although my understanding is that Hive and Pig -- two higher level languages based on MapReduce -- do incorporate this functionality.

Why would we want to normalize data? (This is a good place to plug my book Data Analysis Using SQL and Excel, which explains this concept in more detail in the first chapter.) In the relational world, the reason is something called "relational integrity", meaning that any particular value is stored in one, and only one, place. For instance, if the state of California were to its name, we would not want to update every record from California. Instead, we'd rather go to the reference table and just change the name to the new name, and the data field contains a state id rather than the state name itself. Relational integrity is particularly important when data is being updated.

Why would we want to normalize data used by Hadoop? There are two reasons. The first is that we may be using Hadoop processing to load a relational database -- one that is already designed with appropriate reference tables. This is entirely reasonable, relational databases are an attractive way to "publish" results from complex data processing since they are better for creating end-user reports and building interactive GUI interfaces.

The second reason is performance. Extracting long strings and putting them in a separate reference table can significantly reduce the storage requirements for the data files. By far, most of the space taken up in typical log files, for instance, consists of long URIs (what I used to call URLs). When processing the log files, we might want to extract some features from the URIs, but keeping the entire string just occupies a lot of space -- even in a compressed file.


The Process of Normalizing Data

Normalizing data starts with data structures. The input records are assumed to be in a delimited format, with the column names in the first row (or provided separately, although I haven't tested that portion of the code yet). In addition, there is a "master" id file that contains the following columns:
  • id -- a unique id for every value by column.
  • column name -- the name of the column.
  • value -- the id in the column.
  • count -- the total number of times the value as so far occurred.
This is a rudimentary reference file. I could imagine, for instance, having more information than just the count as summary information -- perhaps the first and last date when the value occurs, for instance.

What happens when we normalize data? Basically, we look through the data file to find new values in each column being normalized. We append these new values into the master id file, and then go back to the original data and replace the values with the ids.

Hadoop is a good platform for this for several reasons. First, because the data is often stored as text files, the values and the ids have the same type -- text strings. This means that the file structures remain the same. Second, Hadoop can process multiple columns at the same time. Third, Hadoop can use inexpensive clusters and free software for this task, rather than relying on databases and tools, which are often more expensive.

How To Normalize Data Using Hadoop/MapReduce

The normalization process has six steps. Most of these correspond to a single Map-Reduce pass.

Step 1: Extract the column value pairs from the original data.

This step explodes the data, by creating a new data set with multiple rows for each row in the original data. Each output row contains a column, a value, and the number of times the value appears in the data. Only columns being normalized are included in the output.

This step also saves the column names for the data file in a temporary file. I'll return to why this is needed in Step 6.

Step 2: Extract column-value Pairs Not In Master ID File

This step compares the column-value pairs produced in the first step with those in the master id file. This step is interesting, because it reads data from two different data source formats -- the master id file and the results from Step 1. Both sets of data files use the GenericRecord format.

To identify the master file, the map function looks at the original data to see whether "/master" appears in the path. Alternative methods would be to look at the GenericRecord that is created or to use MultipleInputs (which I didn't use because of a warning on Cloudera's web site).


Step 3: Calculate the Maximum ID for Each Column in the Master File

This is a very simple Map-Reduce step that simply gets the maximum id for each column. New ids that are assigned will be assigned one more than this value.

This is an instance where I would very much like to have two different reduces following a map step. If this were possible, then I could combine this step with step 2.


Step 4: Calculate a New ID for the Unmatched Values

This is a two step process that follows the mechanism for adding row numbers discussed in one of my earlier posts, with one small modification. The final result has the maximum id value from Step 3 added onto it, so the result is a new id rather than just a row number.


Step 5: Merge the New Ids with the Existing Master IDs

This step merges in the results from Step 4 with the existing master id file. Currently, the results are placed into another directly. Eventually, they could simply override the master id file.

Because of the structure of the Hadoop file system, the merge could be as simple as copying the file with the new ids into the appropriate master id data space. However, this would result in an unbalanced master id file, which is probably not desirable for longer term processing.


Step 6: Replace the Values in the Original Data with IDs

This final step replaces the values with ids -- the actual normalization step. This is a two part process. The map phase of the first part takes both the original data and the master key file. All the column value pairs are exploded from the original data, as in Step 1, with the output consisting of:
  • key: :
  • value: <"expect"|"nomaster">, ,
The first part ("expect" or "nomaster") is an indicator of whether this column should be normalized (that is, whether or not to expect a master id). The second field identifies the original data record, which is uniquely identified by the partition id and row number within that partition. The third is the column number in the row.

The master records are placed in the format:
  • key: :
  • value: "master",
The reduce then reads through all the records for a given column-value combination. If one of them is a master, then it outputs the id for all records. Otherwise, it outputs the original value.

The last phase simply puts the records back together again, from their exploded form. The one trick here is that the metadata is read from a local file.


Tricks Used In This Code

The code is available in these files: Normalize.java, GenericRecordInputFormat.java, GenericRecord.java, and GenericRecordMetadata.java. This code uses several tricks along the way.

One trick that I use in Step 4, for the phase 1 map, makes the code more efficient. This phase of the computation extracts the maximum row number for each column. Instead of passing all the row numbers to a combine or reduce function, it saves them in a local hash-map data structure. I then use the cleanup() routine in the map function to output the maximum values.

Often the master code needs to pass variables to the map/reduce jobs. The best way to accomplish this is by using the "set" mechanism in the Configuration object. This allows variables to be assigned a string name. The names of all the variables that I use are stored in constants that start with PARAMETER_, defined at the beginning of the Normalize class.

In some cases, I need to pass arrays in, for instance, when passing in the list of column that are to be normalized. In this case, one variable gives the number of values ("normalize.usecolumns.numvals"). Then each value is stored in a variable such as "normalize.usecolumns.0" and "normalize.usecolumns.1" and so on.

Some of the important processing actually takes place in the master loop, where results are gathered and then passed to subsequent steps using this environment mechanism.

The idea behind the GenericRecord class is pretty powerful, with the column names at the top of the file. GenericRecords make it possible to read multiple types of input in the same map class, for instance, which is critical functionality for combining data from two different input streams.

However, the Map-Reduce framework does not really recognize these column names as being different, once generic records are placed in a sequence file. The metadata has to be passed somehow.

When the code itself generates the metadata, this is simple enough. A function is used to create the metadata, and this function is used in both the map and reduce phases.

A bigger problem arises with the original data. In particular, Step 6 of the above framework re-creates the original records, but it has lost the column names, which poses a conundrum. The solution is to save the original metadata in Step 1, which first reads the records. This metadata is then passed into Step 6.

In this code, this is handled by simply using a file. The first map partition of Step 1 writes this file (this partition is used to guarantee that the file is written exactly once). The last reduce in Step 6 then reads this file.

This mechanism works, but is not actually the preferred mechanism, because all the reduce tasks in Step 6 are competing to read the same file -- a bottleneck.

A better mechanism is for the master program to read the file and to place the contents in variables in the jar file passed to the map reduce tasks. Although I do this for other variables, I don't bother to do this for the file.

Labels: , ,

Sunday, December 27, 2009

Hadoop and MapReduce: Characterizing Data

This posting describes using Hadoop and MapReduce to characterize data -- that is, to summarize the values in various columns to learn about the values in each column.

This post describes how to solve this problem using Hadoop. It also explains why Hadoop is better for this particular problem than SQL.

The code discussed in this post is available in these files: GenericRecordMetadata.java, GenericRecord.java, GenericRecordInputFormat.java, and Characterize.java. This work builds on the classes introduced in my previous post Hadoop and MapReduce: Method for Reading and Writing General Record Structures (the versions here fix some bugs in the earlier versions).

What Does This Code Do?

The purpose of this code is to provide summaries for data in a data file. Being Hadoop, the data is stored in a delimited text format, with one record per line, and the code uses GenericRecord to handle the specific data. The generic record classes are things that I wrote to handle this situation; the Apache java libraries apparently have other approaches to solving this problem.

The specific summaries for each column are:
  • Number of records.
  • Number of values.
  • Minimum and maximum values for string variables, along with the number of times the minimum and maximum values appear in the data.
  • Minimum and maximum lengths for string variables, along with the number of times these appear and an example of the value.
  • First, second, and third most common string values.
  • Number of times the column appears to be an integer.
  • Minimum and maximum values when treating the values as integers, along with the number of times that these appear.
  • Number of times the column appears to contain a real number.
  • Minimum and maximum values when treating the values as doubles, along with the number of times that these appear.
  • Count of negative, zero, and positive values.
  • Average value.
These summaries are arbitrary. The code should be readily extensible to other types and other summaries.

My ultimate intention is to use this code to easily characterize input and result files that I create in the process of writing Hadoop code.


Overview of the Code

The characterize problem is solved in two steps. The first creates a histogram of all the values in all the columns, and the second summarizes the histogram of values, which is handled by two passes of map reduce.

The histogram step takes files with the following format:
  • Key: undetermined
  • Values: text values separated by a delimited (by default a tab)
(This is the GenericRecord format.)
The Map phase produces a file of the format:
  • Key: column name and column value, separated by a colon
  • Value: "1"
Combine and Reduce then add up the "1"s, producing a file of the format:
  • Key: column name
  • Value: column value separated by tab
Using a tab as a separator is a convenience, because this is also the default separator for the key.

The second phase of the Map/Reduce job takes the previous output and uses the reduce function to summarize all the different values in the histogram. This code is quite specific to the particular summaries. The GenericRecord format is quite useful because I can simply add new summaries in the code, without worrying about the layout of the records.

The code makes use of exception processing to handle particular data types. For instance, the following code block handles the integer summaries:

try {
....long intval = Long.parseLong(valstr);
....hasinteger = true;
....intvaluecount++;
....intrecordcount += Long.parseLong(val.get("count"));
}
catch (Exception NumberFormatException) {
....// we don't have to do anything here
}

This block tries to convert the value to an integer (actually to a long). When this works, then the code updates the various variables that characterize integer values. When this fails, the code continues working.

There is a similar block for real numbers, and I could imagine adding more such blocks for other formats, such as dates and times.

Why MapReduce Is Better Than SQL For This Task

Characterizing data is the process of summarizing data along each column, to get an idea of what is in the data. Normally, I think about data processing in terms of SQL (after all, my most recent book is Data Analysis Using SQL and Excel). SQL, however, is particularly poor for this purpose.

First, SQL has precious few functions for this task -- basically MIN(), MAX(), AVG() and judicious use of the CASE statement. Second, SQL generally has lousy support for string functions and inconsistent definitions for date and time functions across different databases.

Worse, though, is that traditional SQL can only summarize one column at a time. The traditional SQL approach would be to summarize each column individually in a query and then connect them using UNION ALL statements. The result is that the database has to do a full-table scan for each column.

Although not supported in all databases, SQL syntax does now support the GROUPING SETS keyword which helps potentially alleviate this problem. However, GROUPING SETS is messy, since the key columns each have to be in separate columns. That is, I want the results in the format "column name, column value". With GROUPING SETS, I get "column1, column2 ... columnN", with NULLs for all unused columns, except for the one with a value.

The final problem with SQL occurs when the data starts out in text files. Much of the problem of characterizing and understanding the data happens outside the database during the load process.

Labels: , ,

Friday, December 18, 2009

Hadoop and MapReduce: Method for Reading and Writing General Record Structures

I'm finally getting more comfortable with Hadoop and java, and I've decided to write a program that will characterize data in parallel files.

To be honest, I find that I am spending a lot of time writing new Writable and InputFormat classes, every time I want to do something. Every time I introduce a new data structure used by the Hadoop framework, I have to define two classes. Yucch!

So, I put together a simple class called GenericRecord that can store a set of column names (as string) and a corresponding set of column values (as strings). These are stored in delimited files, and the various classes understand how to parse these files. In particular, the code can read any tab delimited file that has column names on the first row (and changing the delimitor should be easy). One nice aspect is the ability to use the GenericRecord as the output of a reduce function, which means that the number and names of the output can be specified in the code -- rather than in additional files with additional classes.

I wouldn't be surprised if similar code already exists with more functionality than the code I have here. This effort is also about my learning Hadoop.

This posting provides the code and explains important features on how it works. The code is available in these files GenericRecord.java, GenericRecordMetadata.java, GenericRecordInputFormat.java, and GenericRecordTester.java.

What This Code Does

This code is analogous to the word count code, that must be familiar to anyone starting to learn MapReduce (since it seems to be the first example in all the documentation I've seen). Instead of counting words, this code counts the occurrence of values in the columns.

The code reads input files and produces output records with three columns:
  • A column name in the original data.
  • A value in the column.
  • The number of times the value appears.
Do note that for data with many unique values in many columns, the number of output records is likely to far exceed the number of input records. So, the output file can be bigger than the input file.

The input records are assumed to be in a text file with one record per row. The first row contains the names of the columns, delimited by a tab (although this could easily be changed to another delimiter). The rest of the rows contain values. Note that this assumes that the input files are all read from the beginning; that is, that a single input file is not split among multiple map tasks.

One irony of this code and the Hadoop framework is that the input files do not have to be in the same format. So, I could upload a bunch of different files, with different numbers of columns, and different column names, and run them all in parallel. I would have to be careful that the column names are all different, for this to work well.

Examples of such files are available on the companion page for my book Data Analysis Using SQL and Excel. These are small files by the standards of Hadoop (measures in megabytes) but quite sufficient for testing and demonstrating code.


Overview of Approach

There are four classes defined for this code:
  • GenericRecordMetadata stores the metadata (column names) for a record.
  • GenericRecord stores the values for a particular record.
  • GenericRecordInputFormat provides the interface for reading the data into Hadoop.
  • GenericRecordTester provides the functions for the MapReduce framework.
The metadata consists of the names of the columns, which can be accessed either by a column index or by a column name. The metadata has functions to translate a column name into a column index. Because it uses a HashMap, the functions should run quite fast, although they are not optimal in memory space. This is okay, because the metadata is stored only once, rather than once per row.

The generic record itself stores the data as an array of strings. It also contains a pointer to the metadata object, in order to fetch the names. The array of strings minimizes both memory overhead and time, but does require access using an integer. The other two classes are needed for the Hadoop framework.

One small challenge is getting this to work without repeating the metadata information for each row of data. This is handled by including the column names as the first row in any file created by the Hadoop framework, and not by putting the column names in the output for each row.


Setting Up The Metadata When Reading

The class GenericRecordInputFormat basically does all of its work in a private class called GenericRecordRecordReader. This function has two important functions: initialize() and nextKeyValue().

The initialize() function sets up the metadata, either by reading environment variables in the context object or by parsing the first line of the input file (depending on whether or not the environment variable genericrecord.numcolumns is defined). I haven't tested passing in the metadata using environment variables, because setting up the environment variables poses a challenge. These variables have to be set in the master routine in the configuration before the map function is called.

The nextKeyValue() function reads a line of the text file, parses it using the function split(), and sets the values in the line. The verification on the number of items read matching the number of expected items is handled in the function lineValue.set(), which raises an exception (currently unhandled) when there is a mismatch.


Setting Up The Metadata When Writing

Perhaps more interesting is the ability to set up the metadata dynamically when writing. This is handled mostly in the setup() function of the SplitReduce class, which sets up the metadata using various function calls.

Writing the column names out at the beginning of the results file uses a couple of tricks. First, this does not happen in the setup() function but rather in the reduce() function itself, for the simple reason that the latter handles IOException.

The second trick is that the metadata is written out by putting it into the values of a GenericRecord. This works because the values are all strings, and the record itself does not care if these are actually for the column names.

The third trick is to be very careful with the function GenericRecord.toString(). Each column is separated by a tab character, because the tab is used to separate the key from the value in the Hadoop framework. In the reduce output files, the key appears first (the name of the column in the original data), followed by a tab -- as put there by the Hadoop framework. Then, toString() adds the values separated by tabs. The result is a tab-delimited file that looks like column names and values, although the particular pieces are put there through different mechanisms. I imagine that there is a way to tell Hadoop to use a different character to separate the key and value, but I haven't researched this point.

The final trick is to be careful about the ordering of the columns. The code iterates through the values of the GenericRecord table manually using an index rather than a for-in loop. This is quite intentional, because it allows the code to control the order in which the columns appear -- which is presumably the original ordered in which they were defined. Using the for-in is also perfectly valid, but the columns may appear in a different order (which is fine, because the column names also appear in the same order).

The result of all this machinery is that the reduce function can now return values in a GenericRecord. And, I can specify these in the reduce function itself, without having to mess around with other classes. This is likely to be a big benefit as I attempt to develop more code using Hadoop.

Labels: , ,

Tuesday, December 15, 2009

Hadoop 0.20: Creating Types

In various earlier posts, I wrote code to read and write zip code data (which happens to be part of the companion page to my book Data Analysis Using SQL and Excel). This provides sample data for use in my learning Hadoop and mapreduce.

Originally, I wrote the code using Hadoop 0.18, because I was using the Yahoo virtual machine. I have since switched to the Cloudera virtual machine, which runs the most recent version of Hadoop, V0.20.

I thought switching my code would be easy. The issue is less the difficulty of the switch, then some nuances in Hadoop and java. This post explains some of the differences between the two versions, when adding a new type into the system. I explained my experience with the map, reduce, and job interface in another post.

The structure of the code is simple. I have a java file that implements a class called ZipCode, which contains the ZipCode interface with the Writable interface (which is I include using import org.apache.hadoop.io.*). Another class called ZipCodeInputFormat implements the read/writable version so ZipCode can be used as input and output in MapReduce functions. The input format class uses another, private class called ZipCodeRecordReader, which does all the work. Because of the rules of java, these need to be in two different files, which have the same name as the class. The files are available in ZipCensus.java and ZipCensusInputFormat.java.

These files now use the Apache mapreduce interface rather than the mapred interface, so I must import the right packages into the java code:

import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.InputSplit;


And then I had a problem when defining the ZipCodeInputFormat class using the code:

public class ZipCensusInputFormat extends FileInputFormat {
....public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
........return new ZipCensusRecordReader();
....} // RecordReader
} // class ZipCensusInputFormat


The specific error given by Eclipse/Ganymede is: "The type org.apache.commons.logging.Log cannot be resolved. It is indirectly referenced from required .class files." This is a bug in Eclipse/Ganymede, because the code compiles and runs using javac/jar. At one point, I fixed this by including various Apache commons jars. However, since I didn't need them when compiling manually, I removed them from the Eclipse project.

The interface for the RecordReader class itself has changed. The definition for the class now looks like:

class ZipCensusRecordReader extends RecordReader

Previously, this used the syntax "implements" rather than "extends". For those familiar with java, this is the difference between an interface and an abstract class, a nuance I don't yet fully appreciate.

The new interface (no pun intended) includes two new functions, initialize() and cleanup() . I like this change, because it follows the same convention used for map and reduce classes.

As a result, I changed the constructor to take no arguments. This has moved to initialize(), which takes two arguments of type InputSplit and TaskAttemptContext. The purpose of this code is simply to skip the first line of the data file, which contains column names.

The most important for the class is now called nextKeyValue() rather than next(). The new function takes no arguments, putting the results in local private variables accessed using getCurrentKey() and getCurrentValue(). The function next() took two arguments, one for the key and one for the value, although the results could be accessed using the same two functions.

Overall the changes are simple modifications to the interface, but they can be tricky for the new user. I did not find a simple explanation for the changes anywhere on the web; perhaps this posting will help someone else.

Labels: , ,

Saturday, December 5, 2009

Hadoop and MapReduce: What Country is an IP Address in?

I have started using Hadoop to sessionize web log data. It has surprised me that there is not more written on this subject on the web, since I thought this was one of the more prevalent uses of Hadoop. Because I'm doing this work for a client, using Amazon EC2, I do not have sample data web log data files to share.

One of the things that I want to do in the sessionization code is to include what country the user is in. Typically, the only source of location information in such logs is the IP address used for connecting to the internet. How can I look up the country the IP address is in?

This posting describes three things: the source of the IP geography information, new things that I'm learning about java, and how to do the lookup in Hadoop.


The Source of IP Geolocation Information

MaxMind is a company that has a specialty in geolocation data. I have no connection to MaxMind, other than a recommendation to use their software from someone at the client where I have been doing this work. There may be other companies with similar products.

One way they make money by offering a product called GeoIp Country which has very, very accurate information about the country where an IP is located (they also offer more detailed geographies, such as regions, states, and cities, but country is sufficient for my purposes). Their claim is that GeoIP Country is 99.8% accurate.

Although quite reasonably priced, I am content to settle for the free version, called GeoLite Country, for which the claim is 99.5% accuracy.

These products come in two parts. The first part is an interface, which is available for many languages, with the java version here. I assume the most recent version is the best, although I happen to be using an older version.

Both the free and paid versions use the same interface, which is highly convenient, in case I want to switch between them. The difference is the database, which is available from this download page. The paid version has more complete coverage and is updated more frequently.

The interface consists of two important components:
  • Creating a LookupService object, which is instantiated with an argument that names the database file.
  • Using LookupService.getCountry() to do the lookup.
Simple enough interface; how do we get it to work in java, and in particular, in java for Hadoop?


New Things I've Learned About Java

As I mentioned a few weeks ago in my first post on learning Hadoop, I had never used java prior to this endavor (although I am familiar with other object oriented programming languages such as C++ and C#). I have been learning java on an "as needed" basis, which is perhaps not the most efficient way overall but has been the fastest way to get started.

When programming java, there are two steps. I am using the javac command to compile code into class files. Then I'm using the jar command to create a jar file. I have been considering this the equivalent of "compiling and linking code", which also takes two steps.

However, the jar file is much more versatile than a regular executable image. In particular, I can put any files there. These files are then available in my application, although java calls them "resources" instead of "files". This will be very important in getting MaxMind's software to work with Hadoop. I can include the IP database in my application jar file, which is pretty cool.

There is a little complexity, though, which involves the paths of where there are located. When using hadoop, I have been using statements such as "org.apache.hadoop.mapreduce" without really understand them. This statement brings in classes associated with the mapreduce package, because three things have happened:
  • The original work (at apache) was done in a directory structure that included ./org/apache/hadoop/mapreduce.
  • The tar file was created in that (higher-level) directory. Note that this could be buried deep down in the directory hierarchy. Everything is relative to the directory where the tar file is created.
  • I am including that tar file explicitly in my javac command, using the -cp argument which specifies a class path.
All of this worked without my having to understand it, because I had some examples of working code. The MaxMind code then poses a new problem. This is the first time that I have to get someone else's code to work. How do we do this?

First, after you uncompress their java code, copy the com directory to the place where you create your java jar file. Actually, you could just link the directories. Or, if you know what you are doing, then you may have another solution.

Next, for compiling the files, I modified the javac command line, so it read: javac -cp .:/opt/hadoop/hadoop-0.20.1-core.jar:com/maxmind/geoip [subdirectory]/*.java. That is, I added the geoip directory to the class path, so java can find the class files.

The class path can accept either a jar file or a directory. When it is a jar file, javac looks for classes in the jar file. When it is a directory, it looks for classes in the directory (but not in subdirectories). That is simple enough. I do have to admit, though, that it wasn't obvious when I started. I don't think of jar files and directories as being equivalent. But they are.

Once the code compiles, just be sure to include the com/maxmind/geoip/* files in the jar command. In addition, I also copied over the GeoLite Country database and included it in the jar file. Do note that the path used to put things in the jar file makes a difference! So, "jar ~/maxmind/*.dat" behaves differently from "jar ./*.dat", when we want to use the data file.


Getting MaxMind to Work With Hadoop

Things are a little complicated in the Hadoop world, because we need to pass in a database file to initialize the MaxMind classes. My first attempt was to initialize the lookup service in the map class using code like:

iplookup = new LookupService("~/maxmind/GeoIP.dat",
.............................LookupService.GEOIP_MEMORY_CACHE |
.............................LookupService.GEOIP_CHECK_CACHE);


This looked right to me and was similar to code that I found in various placed on the internet.

Guess what? It didn't work. And it didn't work for a fundamentally important reason. Map classes are run on the distributed nodes, and the distributed nodes do not have access to the local file system. Duh, this is why the HDFS (hadoop distributed file system) was invented!

But now, I have a problem. There is a reasonably sized data file -- about 1 Mbyte. Copying it to the HDFS does not really solve my problem, because it is not an "input" into the Map routine. I suppose, I could copy it and then figure out how to open it as a sequence file, but that is not the route I took.

Up to this point, I had found three ways to get information into the Map classes:
  1. Compile it in using constants.
  2. Pass small amounts on the Conf structure, using the various set and get functions. I have examples of this in the row number code.
  3. Use the distributed cache. I haven't done this yet, because there is warning about setting it up correctly using configuration xml files. Wow, that is something that I can easily get wrong. I'll learn this when I think it is absolutely necessary, knowing that it might take a few hours to get it right.
But now, I've discovered that java has an amazing fourth way: I can pass files in through the jar file. Remember, when we use Hadoop, we call a function "setJarbyClass()". Well, this function takes the class that is passed in and sends the entire jar file with the class to each of distributed nodes (for both the Map and Reduce classes). Now, if that jar file just happens to contain a data file with ip address to country lookup data, then java has conspired to send my database file exactly where it is needed!

Thank you java! You solved this problem. (Or, should I be thanking Hadoop?)

The only question is how to get the file out of the jar file. Well, the things in the jar file are called "resources". Resources are accessed using uniform resource identifiers (URI). And, the URI is conveniently built out of the file name. Life is not so convenient that the URI is the file name. But, it is close enough. The URI prepends the file name with something (say, "http:").

So, to get the data file out of the jar file (which we put in using the jar command), we need to:
  • figure out the name for the resource in the jar file;
  • convert the resource name to a file name; and then,
  • open this just as we would a regular file (by passing it into the constructor).
The code to do this is:

import com.maxmind.geoip;
...
if (iplookup == null) {
....String filename = getClass().getResource("/GeoIP.dat").toExternalForm().substring(5);
....iplookup = new LookupService(filename, LookupService.GEOIP_MEMORY_CACHE | LookupService.GEOIP_CHECK_CACHE);
}

The import tells the java code where to find the LookupService class. To make this work, we have to include the appropriate directory in the class path, as described earlier.

The first statement creates the file name. The resource name "/GeoIP.dat" says that the resource is a file, located in the directory where the tar file was created. The rest of the statement converts this to a file name. The function "toExternalForm()" creates a URI, which is the filename prepended with something. The substring(5) removes the something (I didn't look, but wouldn't be surprised if it were "http:"). The original example code I found had substring(6), which did not work for me on EC2.

The second statement passes this into the lookup service constructor.

Now the lookup service is available, and I can use it via this code:

this.ipcountry = iplookup.getCountry(sale.ip).getCode();

Voila! From the IP address, I am able to use free code downloaded from the internet to lookup the IP address using the distributed power of Hadoop.

Labels: , ,

Sunday, November 29, 2009

Hadoop and MapReduce: Switching to 0.20 and Cloudera

Recently, I decided to switch from Hadoop 0.18 to 0.20 for several reasons:
  1. I'm getting tired of using deprecated features -- it is time to learn the new interface.
  2. I would like to use some new features, specifically MultipleInputFormats.
  3. The Yahoo! Virtual Machine (which I recommended in my first post) is not maintained, whereas the Cloudera training machine is.
  4. And, for free software, I have so far found the Cloudera community support quite effective.
I chose the Cloudera Virtual Machine for a simple reason: it was recommended by Jeff, who works there and describes himself as "a big fan of [my data mining] books". I do not know if there are other VMs that are available, and I am quite happy with my Cloudera experience so far. Their community support provided answers to key questions, even over the Thanksgiving long weekend.

That said, there are a few downsides to the upgrade:
  • The virtual machine has the most recent version of Eclipse (called Ganymede), which does not work with Hadoop.
  • Hence, the virtual machine requires using command lines for compiling the java code.
  • I haven't managed to get the virtual machine to share disks with the host (instead, I send source files through gmail).
The rest of this post explains how I moved the code that assigns consecutive row numbers (from my previous post) to Hadoop 0.20. It starts with details about the new interface and then talks about updating to the Cloudera virtual machine.


Changes from Hadoop 0.18 to 0.20

The updated code with the Hadoop 0.20 API is in RowNumberTwoPass-0.20.java.

Perhaps the most noticeable change is the packages. Before 0.20, Hadoop used classes in a package called "mapred". Starting with 0.20, it uses classes in "mapreduce". These have a different interface, although it is pretty easy to switch from one to the other.

The reason for this change has to do with future development for Hadoop. This change will make it possible to separate releases of HDFS (the distributed file system) and releases of MapReduce. The following are packages that contain the new interface:

import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.map.*;
import org.apache.hadoop.mapreduce.lib.reduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

In the code itself, there are both subtle and major code differences. I have noticed the following changes in the Map and Reduce classes:
  • The classes now longer need the "implements" syntax.
  • The function called before the map/reduce is now called setup() rather than configure().
  • The function called after the map/reduce is called cleanup().
  • The functions all take an argument whose class is Context; this is used instead of Reporter and OutputCollector.
  • The map and reduce functions can also throw InterruptedException.
The driver function has more changes, caused by the fact that JobConf is no longer part of the interface. Instead, the work is set up using Job. Variables and values are passed into the Map and Reduce class through Conf rather than JobConf. Also, the code for the Map and Reduce classes is added in using the call Job.setJarByClass().

There are a few other minor coding differences. However, the code follows the same logic as in 0.18, and the code ran the first time after I made the changes.


The Cloudera Virtual Machine

First, I should point out that I have no connection to Cloudera, which is a company that makes money (or intends to make money) by providing support and training for Hadoop.

The Cloudera Virtual Machine is available here. It requires running a VMWare virtual machine, which is available here. Between the two, these are about 1.5 Gbytes, so have a good internet connection when you want to download them.

The machine looks different from the Yahoo! VM, because it runs X rather than just a terminal interface. The desktop is pre-configured with a terminal, Eclipse, Firefox, and perhaps some other stuff. When I start the VM, I open the terminal and run emacs in the background. Emacs is a text editor that I know well from my days as a software programmer (more years ago than I care to admit). To use the VM, I would suggest that you have some facility with either emacs or VI.

The version of Hadoop is 0.20.1. Note that as new versions are released, Cloudera will probably introduce new virtual machines. Any work you do on this machine will be lost when you replace the VM with a newer version. As I said, I am sending source files back and forth via gmail. Perhaps you can get the VM to share disks with the host machine. The libraries for Hadoop are in /usr/lib/Hadoop-2.0.

Unfortunately, the version of Eclipse installed in the VM does not fully support Hadoop (if you want to see the bug reports, google something like "Hadoop Ganymede"). Fortunately, you can use Eclipse/Ganymede to write code, and it does full syntax checking. However, you'll have to compile and run the code outside the Eclipse environment. I believe this is a bug in this version of Eclipse, which will hopefully be fixed sometime in the near future.

I suppose that I could download the working version of Eclipse (Europe, which is, I think, version 3.2). But, that was too much of a bother. Instead, I learned to use the command line interface for compiling and running code.


Compiling and Running Programs

To compile and run programs you will need to use command line commands.

To build a new project, create a project in Eclipse by creating a new java project. The one thing it needs is a pointer to the Hadoop 0.20 libraries (actually "jars"). To install a pointer to this library, do the following after creating the project:
  • Right click on the project name and choose "Properties".
  • Click on "Java Build Path" and go to the "Libraries" tab.
  • Click on "Add External JARs".
  • Navigate to /usr/lib/hadoop-0.20 and choose hadoop-0.20.1+133-core.jar.
  • Click "OK" on the windows until you are out.
You'll see the new library in the project listing.

Second, you should create a package and then source code in the package.

After you have created the project, you can compile and run the code from a command line by doing the following.
  • Go to the project directory (~/workshop/).
  • Issue the following command: "javac -cp /usr/lib/hadoop-0.20/hadoop-0.20.1+133-core.jar -d bin/ src/*/*.java" [note: there is a space after "bin/"].
  • Create the jar: "cd bin; jar ../ cvf */*; cd ..". So, for the RowNumberTwoPass command, I use: "cd bin; jar cvf ../RowNumberTwoPass.jar */*; cd ..".
  • Run the code using the command: "hadoop jar RowNumberTwoPass.jar RowNumberTwoPass/rownumbertwopass". The first argument after "hadoop jar" is the jar file with the code. The second is the class and package where the main() function is located.
Although this seems a little bit complicated, it is only cumbersome the first time you run it. After that, you have the commands and running them again is simple.

Labels: , ,

Wednesday, November 25, 2009

Hadoop and MapReduce: A Parallel Program to Assign Row Numbers

This post discusses (and solves) the problem of assigning consecutive row numbers to data, with no holes. Along the way, it also introduces some key aspects of the Hadoop framework:
  • Using the FileSystem package to access HDFS (a much better approach than in my previous posting).
  • Reading configuration parameters in the Map function.
  • Passing parameters from the main program to the Map and Reduce functions.
  • Writing out intermediate results from the Map function.
These are all important functionality for using the Hadoop framework. In addition, I plan on using this technique for assigning unique ids to values in various columns.


The "Typical" Approach

The "typical" approach is to serialize the problem, by creating a Reducer function that adds the row number. By limiting the framework to only a single reducer (using setNumReduceTasks(1) in the JobConf class), this outputs the row number.

There are several problems with this solution. The biggest issue is, perhaps, aesthetic. Shouldn't a parallel framework, such as Hadoop, be able to solve such a simple problem? Enforced serialization is highly inefficient, since the value of Hadoop is in the parallel programming capabilities enabled when multiple copies of maps and reduces are running.

Another issue is the output file. Without some manual coding, the output is a single file, which may perhaps be local to a single cluster node (depending on how the file system is configured). This can slow down subsequent map reduce tasks that use the file.


An Alternative Fully Parallel Approach


There is a better way, a fully parallel approach that uses two passes through the Map-Reduce framework. Actually, the full data is only passed once through the framework, so this is a much more efficient alternative to the first approach.

Let me describe the approach using three passes through the data, since this makes for a simpler explanation (the actual implementation combines the first two steps).

The first pass through the data consists of a Map phase that assigns a new key to each row and no Reduce phase. The key is consists of two parts: the partition id and the row number within the partition.

The second pass counts the number of rows in each partition, by extracting the maximum row number with each partition key.

These counts are then combined to get cumulative sums of counts up to each partition. Although I could do this in the reduce step, I choose not to (which I'll explain below). Instead, I do the work in the main program.

The third pass adds the offset to the row number and outputs the results. Note that the number of map tasks in the first task can be different from the number in subsequent passes, since the code always uses the original partition number for its calculations.


More Detail on the Approach -- Pass 1

The code is available in this file RowNumberTwoPass.java. It contains one class with two Map phases and one Reduce phase. This code assumes that the data is stored in a text file. This assumption simplifies the code, because I do not have to introduce any auxiliary classes to read the data. However, the same technique would work for any data format.

The first map phase, NewKeyOutputMap, does two things. The simpler thing is to output the parition id and the row number within the partition for use in subsequent processing. The second is to save a copy of the data, with this key, for the second pass.

Assigning the Partition ID

How does any Map function figure out its partition id? The partition id is stored in the job configuration, and is accessed using the code:

....partitionid = conf.getInt("mapred.task.partition", 0);

In the version of Hadoop that I'm using (0.18.3, through the Yahoo virtual machine), the job configuration is only visible to a configuration function. This is an optional function that can be defined when implementing an instance of the MapReduceBase class. It gets called once to initialize the environment. The configuration function takes one argument, the job configuration. I just store the result in a static variable local to the NewOutputKeyMap class.

In more recent versions of Hadoop, the configuration is available in the context argument to the map function.

Using Sequence Files in the Map Phase

The second task is to save the original rows with the new key values. For this, I need a sequence file. Or, more specifically, I need a different sequence file for each Map task. Incorporating the partition id into the file name accomplishes this.

Sequence files are data stores specific to the Hadoop framework, which contain key-value pairs. At first, I found them a bit confusing: Did the term "sequence file" refer to a collection of files available to all map tasks or to a single instance of one of these files? In fact, the term refers to a single instance file. To continue processing, we will actually need a collection of sequence files, rather than a single "sequence file".

They are almost as simple to use as any other files, as the following code in the configuration() function shows:

....FileSystem fs = FileSystem.get(conf);
....sfw = SequenceFile.createWriter(fs, conf,
........new Path(saverecordsdir+"/"+String.format("records%05d", partitionid.get())),
........Text.class, Text.class);

The first statement simply retrieves the appropriate file system for creating the file. The second statement uses the SequenceFile.createWriter() function to open the file and save the id in the sfw variable. There are several versions of this function, with various additional options. I've chosen the simplest version. The specific file will go in the directory referred to by the variable saverecordsdir. This will contains a series of files with the names "records#####" where ##### is a five-digit, left-padded number.

This is all enclosed in try-catch logic to catch appropriate exceptions.

Later in the code, the map() writes to the sequence file using the logic:

....sfw.append(outkey, value);

Very simple!

Pass1: Reduce and Combine Functions

The purpose of the reduce function is to count the number of rows in each partition. Instead of counting, the function actually takes the maximum of the partition row count. By taking this approach, I can use the same function for both reducing and combining.

For efficiency purposes, the combine phase is very important to this operation. The way the problem is structured, the combine output should be a single record for each map instance -- and sending this data around for the reduce phase should incur very little overhead.


More Detail on the Approach -- Offset Calculation and Pass 2

At the end of the first phase, the summary key result files contains a single row for each partition, containing the number of rows in each partition. For instance, from my small test data, the data looks like:


0 2265

1 2236

2 3

The first column is the partition id, the second is the count. The offset is the cumulative sum of previous values. So, I want this to be:


0 2265 0

1 2236 2265

2 3 4501

To accomplish this, I read the data in the main loop, after running the first job. The following loop in main() gets the results, does the calculation, and saves the results as parameters in the job configuration:

....int numvals = 0;
....long cumsum = 0;
....FileStatus[] files = fs.globStatus(new Path(keysummaryoutput+ "/p*"));
....for (FileStatus fstat : files) {
........FSDataInputStream fsdis = fs.open(fstat.getPath());
........String line = "";
........while ((line = fsdis.readLine()) != null) {
............finalconf.set(PARAMETER_cumsum_nthvalue + numvals++, line + "\t" + cumsum);
............String[] vals = line.split("\t");
............cumsum += Long.parseLong(vals[1]);
........}
....}
....finalconf.setInt(PARAMETER_cumsum_numvals, numvals);

Perhaps the most interesting part of this code is the use of the function fs.globStatus() to get a list of HDFS files that match wildcards (in this case, anything that starts with "p" in the keysummaryouput directory).


Conclusion

Parallel Map-Reduce is a powerful programming paradigm, that makes it possible to solve many different types of problems using parallel dataflow constructs.

Some problems seem, at first sight, to be inherently serial. Appending a sequential row number onto each each row is one of those problems. After all, don't you have to process the previous row to get the number for the next row? And isn't this a hallmark of inherently serial problems?

The answers to these questions are "no" and "not always". The algorithm described here should scale to very large data sizes and very large machine sizes. For large volumes of data, it is much, much more efficient than the serial version, since all processing is in parallel. That is almost true. The only "serial" part of the algorithm is the calculation of the offsets between the passes. However, this is such a small amount of data, relative to the overall data, that its effect on overall efficiency is negligible.

The offsets are passed into the second pass using the JobConfiguration structure. There are other ways of passing this data. One method would be to use the distributed data cache. However, I have not learned how to use this yet.

Another distribution method would be to do the calculations in the first pass reduce phase (by using only one reducer in this phase). The results would be in a file. This file could then be read by subsequent map tasks to extract the offset data. However, such an approach introduces a lot of contention, because suddenly there will be a host of tasks all trying to open the same file -- contention that can slow processing considerably.

Labels: , ,

Saturday, November 21, 2009

Hadoop and MapReduce: Controlling the Hadoop File System from theMapReduce Program

[This first comment explains that Hadoop really does have a supported interface to the hdfs file system, though the FileSystem package ("import org.apache.hadoop.fs.FileSystem"). Yeah! I knew such an interface should exist -- and even stumbled across it myself after this post. Unfortunately, there is not as simple an interface for the "cat" operation, but you can't have everything.]

In my previous post, I explained some of the challenges in getting a Hadoop environment up and running. Since then, I have succeeding in using Hadoop both on my home machine and on Amazon EC2.

In my opinion, one of the major shortcomings of the programming framework is the lack of access to the HDFS file system from MapReduce programs. More concretely, if you have attempted to run the WordCount program, you may have noticed that you can run it once without a problem. The second time you get an error saying that the output files already exist.

What do you do? You go over to the machine running HDFS -- which may or may not be your development machine -- and you delete the files using the "hadoop fs -rmr" command. Can't java do this?

You may also have noticed that you cannot see the output. Files get created, somewhere. What fun. To see them, you need to use the "hadoop fs -cat" command. Can't java do this?

Why can't we create a simple WordCount program that can be run multiple times in a row, without error, and that prints out the results? And, to further this question, I want to do all the work in java. I don't want to work with an additional scripting language, since I already feel that I've downloaded way too many tools on my machine to get all this to work.

By the way, I feel that both of these are very, very reasonable requests, and the hadoop framework should support them. It does not. For those who debate whether hadoop is better or worse than parallel databases, recognize that the master process in parallel databases typically support functionality similar to what I'm asking for here.

Why is this not easy? Java, Hadoop, and the operating systems seem to conspire to prevent this. But I like challenge. This posting, which will be rather long, is going to explain my solution. Hey, I'll even include some code so other people don't have to suffer through the effort.

I want to do this on the configuration I'm running from home. This configuration consists of:
  • Windows Vista, running Eclipse
  • Ubuntu Linux virtual machine, courtesy of Yahoo!, running Hadoop 0.18
However, I also want the method to be general and work regardless of platform. So, I want it to work if I write the code directly on my virtual machine, or if I write the code on Amazon EC2. Or, if I decide to use Karmasphere instead of Eclipse to write the code, or if I just write the code in a Java IDE. In all honesty, I've only gotten the system to work on my particular configuration, but I think it would not be difficult to get it to work on Unix.


Overview of Solution


The overview of the solution is simple enough. I am going to do the following:
  • Create a command file called "myhadoopfs.bat" that I can call from java.
  • Write a class in java that will run this bat file with the arguments to do what I want.
Boy, that's simple. NOT!

Here are a sample of the problems:
  • Java has to call the batch file without any path. This is because Windows uses the backslash to separate directories whereas Unix uses forward slashes. I lose platform independence if I use full paths.
  • The batch file has to connect to a remote machine. Windows Vista does not have a command to do this. Unix uses the command "rsh".
  • The java method for executing commands (Runtime.getRuntime().exec()) does not execute batch files easily.
  • The java method for executing commands hangs, after a few lines are output. And, the lines could be in either the standard output stream (stdout) or the error output stream (stderr), and it is not obvious how to read both of them at the same time.
This post is going to resolve these problems, step by step.


What You Need

To get started, you need to do a few things to your computer so everything will work.

First, install the program PuTTY (from here). Actually, choose the option for "A Windows installer for everything except PuTTYtel". You can accept all the defaults. As far as I know, this runs on all versions of Windows.

Next, you need to change the system path so it can find two things by default:
  • The PuTTY programs.
  • The batch file you are going to write.
The system path variable specifies where the operating system looks for executable files, when you have a command prompt, or when you execute a command from java.

Decide on the directory where you want the batch file. I chose "c:\users\gordon".

To change the system path, to the the "My Computer" or "Computer" icon on your desktop and right click to get "Properties" and then choose "Advanced System Settings". Click on the "Environment Variables" button. And scroll down to find "Path" in the variables. Edit the "Path" variable.

BE VERY CAREFUL NOT TO DELETE THE PREVIOUS VALUES IN THE PATH VARIABLE!!! ONLY ADD ONTO THEM!!!

At the end of the path variable, I appended the following (without the double quotes): ";c:\Program Files (x86)\PuTTY\;c:\users\gordon". The part after the second semicolon should be where you want to put your batch file. The first part is where the putty commands are located (which may vary on different versions of Windows).

Then, I found that I had to reboot my machine in order for Eclipse to know about the new path. I speculate that this is because there is a java program running somewhere that picks up the path when it starts, and this is where Eclipse gets the path. If I'm correct, all that needs to be done is to restart that program. Rebooting the machine was easier than tracking down a simpler solution.


Test the Newly Installed Software

The equivalent of rsh in this environment is called plink. To see if things work, you need the following:
  • IP address of the other machine. On a Unix system, you can find this using either "ipconfig" or "ifconfig". In my case, the IP address is 192.168.65.128. This is the address of the virtual machine, but this should work even if you are connecting to a real machine.
  • The user name to login as. In my case, this is "hadoop-user", which is provided by the virtual machine.
  • The password. In my case, this is "hadoop".
Here is a test command to see if you get to the right machine:
  • plink -ssh -pw hadoop hadoop-user@192.168.65.128 hostname
If this works by returning the name of the machine you are connecting to, then everything is working correctly. In my case, it returns "hadoop-desk".

Since we are going to be connecting to the hadoop file system, we might as well test that as well. I noticed that the expected command:
  • plink -ssh -pw hadoop hadoop-user@192.168.65.128 hadoop fs -ls
Does not work. This is because the Unix environment is not initializing the environment properly, so it cannot find the command. On the Yahoo! virtual machine, the initializations are in the ".profile" file. So, the correct command is:
  • plink -ssh -pw hadoop hadoop-user@192.168.65.128 source .profile; hadoop fs -ls
Voila! That magically seems to work, indicating that we can, indeed, connect to another machine and run the hadoop commands.


Write the Batch File

I call the batch file "myhadoop.bat". This file contains the following line:

"c:\Program Files (x86)\PuTTY\plink.exe" -ssh -pw %3 %2@%1 source .profile; hadoop fs %4 %5 %6 %7 %8 %9

This file takes the following arguments in the following order:
  • host ip address (or hostname, if it can be resolved)
  • user name
  • password
  • commands to be executed (in arguments %4 though %9)
Yes, the password is in clear text. If this is a problem, learn about PuTTY ssh with security and encryption.

You can test this batch file in the same way you tested plink.


Write a Java Class to Run the Batch File

This is more complicated than it should be for two reasons. First, the available exec() command does not execute batch files. So, you need to use "cmd /q /c myhadoop.bat" to run it. This invokes a command interpreter to run the command (the purpose of the "/c" option). It also does not echo the commands being run, courtesy of the "/q" option.

The more painful part is the issue with stdout and stderr. Windows blocks a process when either of these buffers are full. What that means is that your code hangs, without explanation, rhyme, or reason. This problem, as well as others, are explained and solved in this excellent article, When Runtime.exec() won't.

The solution is to create separate threads to read each of the streams. With the example from the article, this isn't so hard. It is available in this file: HadoopFS.java.

Let me explain a bit how this works. The class HadoopFS has four fields:
  • command is the command that is run.
  • exitvalue is the integer code returned by the running process. Typically, processes return 0 when they are successful and an error code otherwise.
  • stdout is a list of strings containing the standard output.
  • stderr is a list of strings containing the standard error.
Constructing an object requires a string. This is the part of the hadoop command that appears after the "fs". So, for "hadoop fs -ls", this would be "-ls". As you can see, this could be easily modified to run any command, either under Windows or on the remote box, but I'm limiting it to Hadoop fs commands.

This file also contains a private class called threadStreamReader. (Hmmm, I don't think I have the standard java capitalization down, since classes often start with capital letters.) This is quite similar to the StreamGobbler class in the above mentioned article. The difference is that my class stores the strings in a data structure instead of writing them to the console.


Using the HadoopFS Class

At the beginning of this posting, I said that I wanted to do two things: (1) delete the output files before running the Hadoop job and (2) output the results. The full example for the WordCount drive class is in this file:
WordCount.java.

To delete the output files, I use the following code before the job is run:

....HadoopFS hdfs_rmr = new HadoopFS("-rmr "+outputname);
....hdfs_rmr.callCommand();

I've put the name of the output files in the string outputname.

To show the results, I use:

....HadoopFS hdfs_cat = new HadoopFS("-cat "+outputname+"/*");
....hdfs_cat.callCommand();
....for (String line : hdfs_cat.stdout) {
........System.out.println(line);
....}

This is pretty simple and readable. More importantly, they seem to work.


Conclusion


The hadoop framework does not allow us to do some rather simple things. There are typically three computing environments when running parallel code -- the development environment, the master environment, and the grid environment. The master environment controls the grid, but does not provide useful functionality for the development environment. In particular, the master environment does not give the development environment critical access to the parallel distributed files.

I want to develop my code strictly in java, so I need more control over the environment. Fortunately, I can extend the environment to support the "hadoop fs" commands in the development environment. I believe this code could easily be extended for the Unix world (by writing appropriate "cmd" and "myhadoop.bat" files). This code would then be run in exactly the same way from the java MapReduce code.

This mechanism is going to prove much more powerful than merely affecting the aesthetics of the WordCount program. In the next post, I will probably explain how to use this method to return arbitrary data structures between MapReduce runs.

Labels: , ,