Thursday, March 4, 2010

Notes on MapReduce Algorithms from Cloudera Hadoop Training

Here are some notes I've taken while watching Cloudera Hadoop Training: MapReduce Algorithms. At least for now, no code will be complementing these map/reduce algorithms.

The tutorial covers the following:
- Sort
- Search
- Index
- Classification
- Join
- TF - IDF

As a side note, it seems that the IdentityMapper and the IdentityReducer are very commonly used. IdentityMapper implements the identity function, mapping inputs directly to outputs while IdentityReducer performs no reduction, writing all input values directly to the output.

Sort (1:55)

~ takes advantage of reducer priorities: (key, value) pairs are processed in order by key. reducers are themselves ordered. This tends to be very short, code wise.

* Mapper:
Identity function for value: (k,v) -> (v, _)

* Reducer:
Identity function: (k', _) -> (k', "")

Choose hash function carefully so you will keep the order of the keys consistent: k1 > k2 -> hash(k1) > hash(k2), this way you will take advantage of the fact that (key, value) pairs are sent to the reducer based on the output of hash(key).

This algorithm is used in the famous TeraSort benchmark. Hadoop seems to be the 2008 winner sorting 1 terabyte in 209 seconds  and had some spectacular results in 2009 also: "Within the rules for the 2009 Gray sort, our 500 GB sort set a new record for the minute sort and the 100 TB sort set a new record of 0.578 TB/minute. The 1 PB sort ran after the 2009 deadline, but improves the speed to 1.03 TB/minute. The 62 second terabyte sort would have set a new record, but the terabyte benchmark that we won last year has been retired."


Search (5:30)

* Inputs
  - a set of files containing lines of text
  - a search pattern
  - Map Key: file name, line number
  - Map Value: contents of the line
  - Search patterns will be sent as a special param

* Mapper:
    Given (filename, some text) and "pattern",
    if "text" matches "pattern" -> (filename,_)

* Reducer
    Identity function

* Combiner (Optimization)
    - fold the redundant pairs - this reduces network I/O


Indexing (7:40)

* Inputs
  - a set of files containing lines of text
  - Map Key: the file name, line number
  - Map Value: the content of the line

* Mapper
    for each word in (filename, words) -> (word, filename)

* Reducer
    Identity function


Bayesian Classification (10:00)

* Inputs
 - files containing classification instances are sent to the mappers
 - can be seen as a embarrassingly parallel () problem.

 * Mapper
    (filename, instance) -> (instance, class)

 * Reducer
    Identity function

Interesting to look at Mahout, Weka.


Joining (12:00)

* Inputs
 - you have 2 data types, one includes references to elements of the other. Would like to incorporate data by value not by reference  - "de-normalize data"
        => MapReduce Join Pass

* Join Mapper
  - read in all values of the joiner (A), joinee (B)
  - emit to reducer based on primary key of joiner ( i.e. the reference in the joiner, or the joinee's id)


Let's consider:

A {
    a_id;
    a_data;
}

B{
    a_id;
    b_data;
    a_data;   // fill this field based on a_id
}

We'll end up having records of 2 different types going into the same reducer having the common key as their join key:
  - A -> (a_id, A)  - for every A we would emit the pair (a_id, A).
  - B -> (a_id, B)  - for B we would emit the pair using the same key (a_id, B) so it would end up in the same location as the corresponding A records.

* Join Reducer
  - joinee objects are emitted as is
  - joiner objects have additional fields populated by joinee which comes to the sale reducer as them
  - it appears that it's a strong requirement to do a secondary sort in the reducer to read the joinee before emitting only objects which join on to it.

* Algorithm
The real key here is that we are setting up our hashing and comparing functions such that we guarantee that data comes to the same reducer if it needs to be joined together.

Because Hadoop does not necesarily guaranteee the order in which the values get to the Reducer, we might get some B values in the reducer before the actual A records. This will take us to a flat scalability issue because we don't know how many records one Reducer will have to deal with at once. As the number of potential values increases, we might run out of memory, trying to buffer everything. So it's important that we minimize the amount of buffering that we do in the reducer.
The trick here is to tweak the way we sort and partition data, so that the A record will arrive first, and the corresponding B record will come after.

So we need to modify the hash function so that related items arrive at the same Reducer AND the compare function so that one A record comes first and then all the B records associated with it follow. This way we'll bypass any need for buffering, and we'll be able to emit all the Reducer values instantly.


A very light pseudo code would look like this:

 - Map:
    if type A emit ( PK(a_id), A )
    if type B emit ( FK(a_id), B ) 
// it seems we need to have a way to tell the two apart in the Mapper code

Here we must apply the compare and hash constraints:
 - if values are not equal, compare to must yeld different values:  A != A'  => A < A' (or A > A' )
 - PK( A ) and FK( A ) must hash to the same thing, but A items must come before B items.

 - Reduce (here you will see values similar to the following A,B,B,B,A',B',B',B',....):
    if PK
       record a_data,  
         // save the current A until we get another A because we know that from now on, only it's associated B(s) will follow
      emit (a_id, a_data)  
       // Identity function, optional if you want to preserve this dataset

    if FK
      emit (id, JOIN[b_data, recorder a_data] )



TF - IDF (22:30)

Term frequency - Inverse Document Frequency. Can be seen as an example of a multi-stage pipeline.
 - Relevant to text processing
 - Common web analysis algorithm - Variations of the tf–idf weighting scheme are often used by search engines as a central tool in scoring and ranking a document's relevance given a user query.

Definitions
 - term frequency:  how often a word appears in a given document relative to the size of that document.
 - inverse document frequency: 1/ number of documents this word appears in relative to the total number of documents.

Information we need ( # of jobs we need to write )
 - number of times term X appears in a given document ( #1 ).
 - number of terms in each document ( #2 ).
 - number of documents a word X appears in ( #4 ).
 - total number of documents.


#1 (word frequency in docs)

    * Mapper:  In ( docname, contents) -> Out ( (word, docname), 1)
    * Reducer:
        - Sums counts for words in documents
        - Output: ( (word, docname), n )
    * Combiner: the same as the reducer

#2 ( word counts for docs)

    * Mapper: In ( (word, docname), n) -> Out ( docname, ( word,n) )
    * Reducer:
        - Sums frequency of individual 'n' s in same doc
        - Feeds original data through
        - Output: ( (word, docname), (n, N) )

#3 ( word frequency in corpus )

    * Mapper: In ( (word, docname), (n,N) ) -> Out (word, (docname, n, N, 1) )
    * Reducer
        - Sums counts for word in corpus
        - Output: ( (word, docname), (n, N, m) )

#4 ( Compute TD - IDF / this could be merged with #3 )

    * Mapper: In ( (word, docname), (n, N, m) ) -> Out ( (word, docname), TF * IDF ) - we assume D, the number of documents is known
    * Reducer: Identity function


Just a little remainder on the variables:
  - n: word frequency in doc
  - N: word count for doc
  - m: number of docs word appears in
  - D: number of docs

Scale advice:
  - ignore high frequency words ( maybe by using a stop words list)
  - another MapReduce pass ( #2 job - to compute N)

No comments:

Post a Comment