Here are some notes I've taken while watching Cloudera Hadoop Training: MapReduce Algorithms. At least for now, no code will be complementing these map/reduce algorithms.
The tutorial covers the following:
- Sort
- Search
- Index
- Classification
- Join
- TF - IDF
As a side note, it seems that the IdentityMapper and the IdentityReducer are very commonly used. IdentityMapper implements the identity function, mapping inputs directly to outputs while IdentityReducer performs no reduction, writing all input values directly to the output.
Sort (1:55)
~ takes advantage of reducer priorities: (key, value) pairs are processed in order by key. reducers are themselves ordered. This tends to be very short, code wise.
* Mapper:
Identity function for value: (k,v) -> (v, _)
* Reducer:
Identity function: (k', _) -> (k', "")
Choose hash function carefully so you will keep the order of the keys consistent: k1 > k2 -> hash(k1) > hash(k2), this way you will take advantage of the fact that (key, value) pairs are sent to the reducer based on the output of hash(key).
This algorithm is used in the famous TeraSort benchmark. Hadoop seems to be the 2008 winner sorting 1 terabyte in 209 seconds and had some spectacular results in 2009 also: "Within the rules for the 2009 Gray sort, our 500 GB sort set a new record for the minute sort and the 100 TB sort set a new record of 0.578 TB/minute. The 1 PB sort ran after the 2009 deadline, but improves the speed to 1.03 TB/minute. The 62 second terabyte sort would have set a new record, but the terabyte benchmark that we won last year has been retired."
Search (5:30)
* Inputs
- a set of files containing lines of text
- a search pattern
- Map Key: file name, line number
- Map Value: contents of the line
- Search patterns will be sent as a special param
* Mapper:
Given (filename, some text) and "pattern",
if "text" matches "pattern" -> (filename,_)
* Reducer
Identity function
* Combiner (Optimization)
- fold the redundant pairs - this reduces network I/O
Indexing (7:40)
* Inputs
- a set of files containing lines of text
- Map Key: the file name, line number
- Map Value: the content of the line
* Mapper
for each word in (filename, words) -> (word, filename)
* Reducer
Identity function
Bayesian Classification (10:00)
* Inputs
- files containing classification instances are sent to the mappers
- can be seen as a embarrassingly parallel () problem.
* Mapper
(filename, instance) -> (instance, class)
* Reducer
Identity function
Interesting to look at Mahout, Weka.
Joining (12:00)
* Inputs
- you have 2 data types, one includes references to elements of the other. Would like to incorporate data by value not by reference - "de-normalize data"
=> MapReduce Join Pass
* Join Mapper
- read in all values of the joiner (A), joinee (B)
- emit to reducer based on primary key of joiner ( i.e. the reference in the joiner, or the joinee's id)
Let's consider:
A {
a_id;
a_data;
}
B{
a_id;
b_data;
a_data; // fill this field based on a_id
}
We'll end up having records of 2 different types going into the same reducer having the common key as their join key:
- A -> (a_id, A) - for every A we would emit the pair (a_id, A).
- B -> (a_id, B) - for B we would emit the pair using the same key (a_id, B) so it would end up in the same location as the corresponding A records.
* Join Reducer
- joinee objects are emitted as is
- joiner objects have additional fields populated by joinee which comes to the sale reducer as them
- it appears that it's a strong requirement to do a secondary sort in the reducer to read the joinee before emitting only objects which join on to it.
* Algorithm
The real key here is that we are setting up our hashing and comparing functions such that we guarantee that data comes to the same reducer if it needs to be joined together.
Because Hadoop does not necesarily guaranteee the order in which the values get to the Reducer, we might get some B values in the reducer before the actual A records. This will take us to a flat scalability issue because we don't know how many records one Reducer will have to deal with at once. As the number of potential values increases, we might run out of memory, trying to buffer everything. So it's important that we minimize the amount of buffering that we do in the reducer.
The trick here is to tweak the way we sort and partition data, so that the A record will arrive first, and the corresponding B record will come after.
So we need to modify the hash function so that related items arrive at the same Reducer AND the compare function so that one A record comes first and then all the B records associated with it follow. This way we'll bypass any need for buffering, and we'll be able to emit all the Reducer values instantly.
A very light pseudo code would look like this:
- Map:
if type A emit ( PK(a_id), A )
if type B emit ( FK(a_id), B )
// it seems we need to have a way to tell the two apart in the Mapper code
Here we must apply the compare and hash constraints:
- if values are not equal, compare to must yeld different values: A != A' => A < A' (or A > A' )
- PK( A ) and FK( A ) must hash to the same thing, but A items must come before B items.
- Reduce (here you will see values similar to the following A,B,B,B,A',B',B',B',....):
if PK
record a_data,
// save the current A until we get another A because we know that from now on, only it's associated B(s) will follow
emit (a_id, a_data)
// Identity function, optional if you want to preserve this dataset
if FK
emit (id, JOIN[b_data, recorder a_data] )
TF - IDF (22:30)
Term frequency - Inverse Document Frequency. Can be seen as an example of a multi-stage pipeline.
- Relevant to text processing
- Common web analysis algorithm - Variations of the tf–idf weighting scheme are often used by search engines as a central tool in scoring and ranking a document's relevance given a user query.
Definitions
- term frequency: how often a word appears in a given document relative to the size of that document.
- inverse document frequency: 1/ number of documents this word appears in relative to the total number of documents.
Information we need ( # of jobs we need to write )
- number of times term X appears in a given document ( #1 ).
- number of terms in each document ( #2 ).
- number of documents a word X appears in ( #4 ).
- total number of documents.
#1 (word frequency in docs)
* Mapper: In ( docname, contents) -> Out ( (word, docname), 1)
* Reducer:
- Sums counts for words in documents
- Output: ( (word, docname), n )
* Combiner: the same as the reducer
#2 ( word counts for docs)
* Mapper: In ( (word, docname), n) -> Out ( docname, ( word,n) )
* Reducer:
- Sums frequency of individual 'n' s in same doc
- Feeds original data through
- Output: ( (word, docname), (n, N) )
#3 ( word frequency in corpus )
* Mapper: In ( (word, docname), (n,N) ) -> Out (word, (docname, n, N, 1) )
* Reducer
- Sums counts for word in corpus
- Output: ( (word, docname), (n, N, m) )
#4 ( Compute TD - IDF / this could be merged with #3 )
* Mapper: In ( (word, docname), (n, N, m) ) -> Out ( (word, docname), TF * IDF ) - we assume D, the number of documents is known
* Reducer: Identity function
Just a little remainder on the variables:
- n: word frequency in doc
- N: word count for doc
- m: number of docs word appears in
- D: number of docs
Scale advice:
- ignore high frequency words ( maybe by using a stop words list)
- another MapReduce pass ( #2 job - to compute N)
Thursday, March 4, 2010
Wednesday, February 17, 2010
How to get the last value of a PERSISTENT_SEQUENTIAL node in ZooKeeper 3.2
This assumes a running ZooKeeper server. I am curently using the 3.2 version.
Persistent Sequential nodes are nodes that will have a version automatically incremented and appended to a given fixed prefix. For some interesting use-cases take a look at Queues in the Recipes page on the wiki.
So basically you give ZooKeeper the start prefix, such as 'exemple' and it will create an increasing index starting with zero: exemple0000000000, exemple0000000001, exemple0000000002, you get the idea...
This is what you need to do ( all in one line! ):
ZooKeeper will pickup on this node, and every time a client calls the create command, you'll get a new, incremented node.
Now for the interesting part, you create it - it happens auto-magically, but what is the last value (or first, whatever)?
The example provided on the wiki - Producer-Consumer Queues is a bit complicated, as it does a lot of string parsing to get the exact int value of the sequence.
But if you look at it, it's a string (same fixed prefix each time) ending with a big number, why not just use 'compareTo()'?
One think to be aware is that ZooKeeper will not return the children in a obvious order. I was counting on 'list.get(0)' to provide me with the latest one (not sure why though), but as it turned out, you can get them in a very weird order. So just go through the list, compare, and you are done.
Also, for a Produce-Consumer like process, you might be interested in comparing the other way. For my usecase, I needed the latest one each time.
Persistent Sequential nodes are nodes that will have a version automatically incremented and appended to a given fixed prefix. For some interesting use-cases take a look at Queues in the Recipes page on the wiki.
So basically you give ZooKeeper the start prefix, such as 'exemple' and it will create an increasing index starting with zero: exemple0000000000, exemple0000000001, exemple0000000002, you get the idea...
This is what you need to do ( all in one line! ):
zk.create("root/element", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
ZooKeeper will pickup on this node, and every time a client calls the create command, you'll get a new, incremented node.
Now for the interesting part, you create it - it happens auto-magically, but what is the last value (or first, whatever)?
public String getOldestChildValue(String path) throws InterruptedException, KeeperException {
List list = zk.getChildren(path, false);
if (list.size() == 0) {
return null;
}
String node = list.get(0);
for (String s : list) {
if (s.compareTo(node) > 0) {
node = s;
}
}
return zk.getData("root/" + node, null/* watcher */, null/* stat */);
}
The example provided on the wiki - Producer-Consumer Queues is a bit complicated, as it does a lot of string parsing to get the exact int value of the sequence.
But if you look at it, it's a string (same fixed prefix each time) ending with a big number, why not just use 'compareTo()'?
One think to be aware is that ZooKeeper will not return the children in a obvious order. I was counting on 'list.get(0)' to provide me with the latest one (not sure why though), but as it turned out, you can get them in a very weird order. So just go through the list, compare, and you are done.
Also, for a Produce-Consumer like process, you might be interested in comparing the other way. For my usecase, I needed the latest one each time.
Friday, February 12, 2010
Implementing a Producer/Consumer example in Zookeeper 3.2.2 that actually works
Today I wanted to take Zookeeper for a spin.
If you haven't heard about it before, you can take a look at the wiki.
"ZooKeeper is a high-performance coordination service for distributed applications. It exposes common services - such as naming, configuration management, synchronization, and group services - in a simple interface so you don't have to write them from scratch."
That sounds really promising, but sadly here is not much you can actually find about it online.
Also, you have this old video posted on Yahoo! Developer Blog which will present the basic ideas behind it - An Introduction to ZooKeeper Video
What I'll try to do is implement a Producer/Consumer using ZooKeeper 3.2.2 that will actually work.
This is heavily based on the quick tutorial provided on the wiki, but which won't even compile because of several code refactorings. Also, you can't find the version of ZooKeeper it is based on.
This assumes that you have e ZooKeeper server up and running on your computer.
Here goes...
In the conclusion, I leave you with the following advice: give a new framework a chance too, even if the documentation is not great, or missing. Just go with your gut feeling, taking chances leads to progress!
If you haven't heard about it before, you can take a look at the wiki.
"ZooKeeper is a high-performance coordination service for distributed applications. It exposes common services - such as naming, configuration management, synchronization, and group services - in a simple interface so you don't have to write them from scratch."
That sounds really promising, but sadly here is not much you can actually find about it online.
Also, you have this old video posted on Yahoo! Developer Blog which will present the basic ideas behind it - An Introduction to ZooKeeper Video
What I'll try to do is implement a Producer/Consumer using ZooKeeper 3.2.2 that will actually work.
This is heavily based on the quick tutorial provided on the wiki, but which won't even compile because of several code refactorings. Also, you can't find the version of ZooKeeper it is based on.
This assumes that you have e ZooKeeper server up and running on your computer.
Here goes...
public class ProducerConsumerZookeeperTest implements Watcher {
private static ZooKeeper zk = null;
private static Integer mutex;
//
private final String root;
public ProducerConsumerZookeeperTest(String address, String name)
throws IOException, KeeperException, InterruptedException {
// init the ZooKeeper connection
if (zk == null) {
System.out.println("Starting ZK:");
zk = new ZooKeeper(address, 3000, this);
mutex = new Integer(-1);
System.out.println("Finished starting ZK: " + zk);
}
//
this.root = name;
// Create ZK node name
if (zk != null) {
Stat s = zk.exists(name, false);
if (s == null) {
zk.create(name, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
}
}
@Override
public void process(WatchedEvent event) {
synchronized (mutex) {
mutex.notify();
}
}
public boolean produce(int i) throws
KeeperException, InterruptedException {
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
// Add child with value i
b.putInt(i);
value = b.array();
zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
public int consume() throws
KeeperException, InterruptedException {
int retvalue = -1;
Stat stat = null;
// Get the first element available
while (true) {
synchronized (mutex) {
List list = zk.getChildren(root, true);
if (list.size() == 0) {
System.out.println("Going to wait");
mutex.wait();
} else {
String node = list.get(0);
Integer min = new Integer(node.substring(7));
for (String s : list) {
Integer tempValue = new Integer(s.substring(7));
if (tempValue < min) {
node = s;
min = tempValue;
}
}
System.out.println("Temporary value: " + root + "/" + node);
byte[] b = zk.getData(root + "/" + node, false, stat);
zk.delete(root + "/" + node, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
}
The code is a bit big, but I wanted to cram everything in one place, for brevity.
And now for the test part,
public static void main(String[] args)
throws KeeperException, IOException, InterruptedException {
final CountDownLatch cdl = new CountDownLatch(2);
Executor e = Executors.newCachedThreadPool();
Runnable r1 = new Runnable() {
@Override
public void run() {
try {
ProducerConsumerZookeeperTest c =
new ProducerConsumerZookeeperTest("127.0.0.1", "/test-zknode");
System.out.println("consume " + consumer.consume());
cdl.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
};
e.execute(r1);
Runnable r2 = new Runnable() {
@Override
public void run() {
try {
// make the consumer wait a bit...
Thread.sleep(500);
ProducerConsumerZookeeperTest c =
new ProducerConsumerZookeeperTest("127.0.0.1", "/test-zknode");
System.out.println("produce 5 " + producer.produce(5));
cdl.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
};
e.execute(r2);
cdl.await();
System.out.println("all done ");
}
The console output is...
Starting ZK: Finished starting ZK: org.apache.zookeeper.ZooKeeper@137c60d Going to wait produce 5 true found element0000000000 for node: element0000000000, val 0 Temporary value: /test-zknode/element0000000000 consume 5 all done
In the conclusion, I leave you with the following advice: give a new framework a chance too, even if the documentation is not great, or missing. Just go with your gut feeling, taking chances leads to progress!
Tuesday, February 9, 2010
Pig 0.6 / Grunt shell script start problem
This is the email I've sent to the Pig mailing list, I'll post it here too, maybe there are others that have the same problem.
I have a problem starting the grunt shell. I think this affects the 0.6 branch and forward.
This is the error I get when I try to start the shell or when I try to run any script:
A variation of this is also:
Case #1
I you use only the pig.jar file that gets generated from the build PIG_HOME/pig.jar you get java.lang.ClassNotFoundException the second error. This is only if you don't have the 'build' directory in place.
This happened to me while I was trying to move the pig dir from local to a test server. I was trying to cut down on the size.
Case #2
Let's say that you have the pig.jar and the build directory generated by the build. Now you get java.lang.NoClassDefFoundError: jline/ConsoleReaderInputStream
Now my pig install looks like this:
As far as I can see this happens also if you try to copy the dev jars into the main directory( replacing the pig.jar):
I think the problem comes from the pig script that builds the classpath:
pig/bin/pig:
As you can see none of this matches the 'light' version (pig-0.6.0-dev-core.jar) that does not include everything, instead this should match (pig-0.6.0-dev.jar).
This appears to be a change from the pig 0.5 release which works for me ok, and is packing pig-0.5.0-core.jar as the full dependencies version, whereas the pig-0.6.0-dev-core.jar is the light version.
Also this problem goes away if you just rename pig-0.6.0-dev.jar to pig-0.6.0-dev-core.jar, so you can have a fully functioning console.
I hope this helps
I have a problem starting the grunt shell. I think this affects the 0.6 branch and forward.
This is the error I get when I try to start the shell or when I try to run any script:
alex@alex-desktop:~/hadoop/pig/bin$ pig
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/pig/Main
Caused by: java.lang.ClassNotFoundException: org.apache.pig.Main
at java.net.URLClassLoader$1.run(URLClassLoader.java:200)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:188)
at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:252)
at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:320)
Could not find the main class: org.apache.pig.Main. Program will exit.
A variation of this is also:
alex@alex-desktop:~/hadoop/pig/bin$ pig
Exception in thread "main" java.lang.NoClassDefFoundError: jline/ConsoleReaderInputStream
Caused by: java.lang.ClassNotFoundException: jline.ConsoleReaderInputStream
at java.net.URLClassLoader$1.run(URLClassLoader.java:200)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:188)
at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:252)
at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:320)
Could not find the main class: org.apache.pig.Main. Program will exit.
I poked around a bit and I think the 'bin/pig' script does not build the classpath correctlyCase #1
I you use only the pig.jar file that gets generated from the build PIG_HOME/pig.jar you get java.lang.ClassNotFoundException the second error. This is only if you don't have the 'build' directory in place.
This happened to me while I was trying to move the pig dir from local to a test server. I was trying to cut down on the size.
Case #2
Let's say that you have the pig.jar and the build directory generated by the build. Now you get java.lang.NoClassDefFoundError: jline/ConsoleReaderInputStream
Now my pig install looks like this:
pig/pig.jar /pig/build/pig-0.6.0-dev.jar pig/build/pig-0.6.0-dev-core.jar ...
As far as I can see this happens also if you try to copy the dev jars into the main directory( replacing the pig.jar):
pig/pig-0.6.0-dev.jar pig/pig-0.6.0-dev-core.jar
I think the problem comes from the pig script that builds the classpath:
pig/bin/pig:
.......
# for releases, add core pig to CLASSPATH
for f in $PIG_HOME/pig-*-core.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
# during development pig jar might be in build
for f in $PIG_HOME/build/pig-*-core.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
......
As you can see none of this matches the 'light' version (pig-0.6.0-dev-core.jar) that does not include everything, instead this should match (pig-0.6.0-dev.jar).
This appears to be a change from the pig 0.5 release which works for me ok, and is packing pig-0.5.0-core.jar as the full dependencies version, whereas the pig-0.6.0-dev-core.jar is the light version.
Also this problem goes away if you just rename pig-0.6.0-dev.jar to pig-0.6.0-dev-core.jar, so you can have a fully functioning console.
I hope this helps
Tuesday, February 2, 2010
Null Handling in Pig 0.5.0
It is interesting to know how Pig actually handles null values.
This is the 'r.csv' file. This is what we'll use as input (\N represents a NULL value):
First note: Pig will import the null values without a problem, even thought I initially defined them as 'int'.
There is more.
Notice the addition of the values results in a null value. So 'NULL + whatever = NULL'.
It seems this is the expected behaviour, almost anything that involves a null, gives back a null. This is true also for
- Comparison operators: ==, !=, >, <, >=, <= - Comparison operator: matches - Arithmetic operators: + , -, *, /, % modulo, ? bincond - Dereference operators: tuple (.) or map (#) - Cast operator - Functions: CONCAT, SIZE - A FILTER function will not let a null value pass through. Pay attention, if the input (or one of the inputs) is null, the result is null. Another example: average AVG(NULL, any_other_value) = val
Maybe a special case? AVG, MIN, MAX, SUM just ignore nulls. You might pay attention to this, if you expect your processed values to have the correct information.
The UDF Solution
I have written some words about UDF(s) in Pig.
My problem is with the functions that would just ignore nulls. Take average (AVG) for example. How can I expect this to output the proper value, if it will just skip some rows.
Let's say we have the following values: (10,NULL) the average should be 5, but according to Pig the average is 10.
Another example is (NULL + 10) / 2 equals NULL.
While I can agree that different people could expect different outcomes, I'd like to be able to choose what I get.
The only solution I have found for this issue is to write a special function (a UDF) that will handle the nulls, and return a proper value, something that makes sense in my working context.
This function will replace nulls with 0, and return a 0 sum, instead of the null value that Pig will give you.
I am wondering if adding some sort of 'replace null by ...' function is possible, that would certainly give you more control over the whole process.
This is the 'r.csv' file. This is what we'll use as input (\N represents a NULL value):
5137 1 "en" 50 50 50 50 5139 1 "en" 10 \N \N \N 5139 1 "en" \N \N \N \N
grunt> A = load 'r.csv' using PigStorage('\t') as (id,hid,locale,r1,r2,r3,r4);
grunt> dump A;
(5137L,1L,"en",50,50,50,50)
(5139L,1L,"en",10,,,)
(5139L,1L,"en",,,,)
First note: Pig will import the null values without a problem, even thought I initially defined them as 'int'.
There is more.
grunt> B = foreach A generate id, hid, (r1 + r2 + r3 + r4)/4 as stats; grunt> dump B; (5137L,1L,50) (5139L,1L,) (5139L,1L,)
Notice the addition of the values results in a null value. So 'NULL + whatever = NULL'.
It seems this is the expected behaviour, almost anything that involves a null, gives back a null. This is true also for
- Comparison operators: ==, !=, >, <, >=, <= - Comparison operator: matches - Arithmetic operators: + , -, *, /, % modulo, ? bincond - Dereference operators: tuple (.) or map (#) - Cast operator - Functions: CONCAT, SIZE - A FILTER function will not let a null value pass through. Pay attention, if the input (or one of the inputs) is null, the result is null. Another example: average AVG(NULL, any_other_value) = val
grunt> B1 = GROUP B BY hid;
grunt> C = FOREACH B1 GENERATE group, COUNT(B) as cnt, AVG(B.stats) as statsavg;
grunt> dump C;
(1L,3L,50.0)
Maybe a special case? AVG, MIN, MAX, SUM just ignore nulls. You might pay attention to this, if you expect your processed values to have the correct information.
The UDF Solution
I have written some words about UDF(s) in Pig.
My problem is with the functions that would just ignore nulls. Take average (AVG) for example. How can I expect this to output the proper value, if it will just skip some rows.
Let's say we have the following values: (10,NULL) the average should be 5, but according to Pig the average is 10.
Another example is (NULL + 10) / 2 equals NULL.
While I can agree that different people could expect different outcomes, I'd like to be able to choose what I get.
The only solution I have found for this issue is to write a special function (a UDF) that will handle the nulls, and return a proper value, something that makes sense in my working context.
This function will replace nulls with 0, and return a 0 sum, instead of the null value that Pig will give you.
I am wondering if adding some sort of 'replace null by ...' function is possible, that would certainly give you more control over the whole process.
A quick word on writing UDFs in Pig
A bit of steam-blowing about user UDF(s) defined functions in Pig 0.5.0
While I started to look at writing my own UDF, and what I have to implement, I had my own expectations about how this was supposed to work. As it turns out, I was a bit off. I am still in the dark as to what to expect from the function's input parameter.
It seems easy enough to write and use a UDF, but the docs aren't really where they should be.
This is my stab at a custom function. It is supposed to compute the average of the numbers you feed into it, treating nulls as integers with the value 0.
And this is how you call it (r1,r2,r3,r4 are just columns/fields from another variable)
Just make sure you pack this into a jar and run this first:
The number one question here is, how do you iterate through the values you can receive? I can obviously push more fields into this function.
If it's a one to one function (for one value of input you get one value of output) you can look at pig-release-0.5.0/tutorial/src/org/apache/pig/tutorial/ExtractHour.java:
If it's a many to one (just like my usecase) :
What do we have here? a DataBag that has an Iterator as a first element. Iterator of Touple(s) that have your value as a first element...wow...
Next, we have the one to many functions. Luckily we can use pig-release-0.5.0/tutorial/src/org/apache/pig/tutorial/NGramGenerator.java as a reference.
So I guess that a many to many function seems like a breeze now. I'll just leave that as an exercise for the next time ;)
While I started to look at writing my own UDF, and what I have to implement, I had my own expectations about how this was supposed to work. As it turns out, I was a bit off. I am still in the dark as to what to expect from the function's input parameter.
It seems easy enough to write and use a UDF, but the docs aren't really where they should be.
This is my stab at a custom function. It is supposed to compute the average of the numbers you feed into it, treating nulls as integers with the value 0.
public class ComputeAverage extends EvalFunc{ public Integer exec(Tuple input) throws IOException { if (input == null || input.size() == 0) return null; int averageRatingPercent = 0; try { int count = 0; int sum = 0; DataBag bag = (DataBag) input.get(0); Iterator it = bag.iterator(); while (it.hasNext()) { Tuple t = it.next(); count++; // just insert your String to int conversion logic here; sum += stringToInt(t.get(0)); } if (count > 0) { averageRatingPercent = sum / count; } } catch (Exception e) { System.err.println( "Failed to process input: "+ e.getMessage()); } return averageRatingPercent; } @Override public Schema outputSchema(Schema input) { return new Schema(new Schema.FieldSchema(null, DataType.INTEGER)); } }
And this is how you call it (r1,r2,r3,r4 are just columns/fields from another variable)
grunt> B = foreach A generate id, hid, com.pfalabs.test.ComputeAverage(r1,r2,r3,r4);
Just make sure you pack this into a jar and run this first:
grunt> register /path/to/your/jar/my-udfs.jar;
The number one question here is, how do you iterate through the values you can receive? I can obviously push more fields into this function.
If it's a one to one function (for one value of input you get one value of output) you can look at pig-release-0.5.0/tutorial/src/org/apache/pig/tutorial/ExtractHour.java:
String timestamp = (String)input.get(0);
If it's a many to one (just like my usecase) :
.... DataBag bag = (DataBag) input.get(0); Iteratorit = bag.iterator(); while (it.hasNext()) { Tuple t = it.next(); String actualValue = (String)t.get(0); } .....
What do we have here? a DataBag that has an Iterator as a first element. Iterator of Touple(s) that have your value as a first element...wow...
Next, we have the one to many functions. Luckily we can use pig-release-0.5.0/tutorial/src/org/apache/pig/tutorial/NGramGenerator.java as a reference.
....
// take the value
String query = (String)input.get(0);
// generate the output and push it to the return value
DataBag output = DefaultBagFactory.getInstance().newDefaultBag();
// its a DataBag, so feel free to fill that up!
for (String ngram : ngrams) {
Tuple t = DefaultTupleFactory.getInstance().newTuple(1);
t.set(0, ngram);
output.add(t);
}
So I guess that a many to many function seems like a breeze now. I'll just leave that as an exercise for the next time ;)
Monday, February 1, 2010
Test driving Pig 0.5.0 with Hadoop 0.20.1
Continuing my little Hadoop study, I have come across Pig and Pig Latin as a pipeline processing scripting language. I have to admit that it looks very promising, especially for the fact that if you want to build something a tiny bit more complex, you end up writing a lot of chained map/reduce jobs.
I started with this video from Cloudera: Cloudera Hadoop Training: Introduction to Pig. It was enough to get me interested, but I was a bit weary about the performance.
Even Alan Gates, an architect for the Pig team at Yahoo (he is the guy that is doing the training session) warns about a 1.6x increase in duration if you are using Pig vs actual Map/Reduce jobs. He adds that they are aiming at 1.2x but that's in the future. I personally think that 1.6 is not that bad, if it can help me get those Map/Reduce jobs out of the door fast.
So what better test than to build my small Hadoop on-going experiment using Pig Latin. Just a little background info: I'm trying to build something more complicated than the usual hello world, it's a rating aggregation job.
I'm trying to go through a dataset of 1.5 million rows - not much for Hadoop but enough for a quick stab, and crunch the numbers so the output is an average rating and a count.
Before, a bit of preparation: export the csv file from mysql, then import it into Hadoop:
Next I am going to buid this as a Pig Latin script:
Step 1: I tried taking some advice from the Pig Cookbook Performance Enhancers (http://hadoop.apache.org/pig/docs/r0.5.0/cookbook.html), like to use types, but it did not make the script faster.
Step 2: is pretty self-explanatory: compute the average rating of each review
Step 3: is a demo of how you can do a group by. You can find a very good explanation of how this works in the Reference Manual(http://hadoop.apache.org/pig/docs/r0.5.0/piglatin_reference.html#GROUP).
The GROUP BY aggregation is a two step ( two instructions?) operation.
First you assign the GROUP BY to a variable.
This translates into: B1: {group: int, B: {id:int, hid:int, stats: float}}, this way you can reference the group column, and use any of the existing columns in a aggregation expression or as they are called in the manual, Relational Operators.
Next you iterate through that variable, and build another one with the final output. In my case the count and the average.
Please note that for aggregation you use the initial variable (B) if you need to reference fields, not the GROUP BY one (B1), hence the 'COUNT(B), AVG(B.stats)'.
Last Step: just export to a file, still into the hdfs.
Finally if you just want to make sure that this worked, just peek at the file
Free Takeaway
To run a Pig script as a Map/Reduce job you just need to type
where $PIGDIR is the directory where Pig resides and $HADOOPDIR is the location of the hadoop conf directory (such as $HADOOP_HOME/conf).
One thing that I've found missing is that there are no aggregation options for rows. I could have worked with an average(col1, col2, ...), instead of having to add and divide. But this is a minor issue, especially taking into account how easy it is to write a custom function and call it from Pig. Well, that may very well be the subject of another blog post.
Now for the sad part: one run lasts around 1m 30sec compared to ~20 seconds that my initial Hadoop Map/Reduce test lasts, and this is for a basic use-case, I did not even get to joins and stuff.
Having said that, I really like Pig, and I can't wait to see how it compares to Hive for simplicity /learning curve and ease of development.
I started with this video from Cloudera: Cloudera Hadoop Training: Introduction to Pig. It was enough to get me interested, but I was a bit weary about the performance.
Even Alan Gates, an architect for the Pig team at Yahoo (he is the guy that is doing the training session) warns about a 1.6x increase in duration if you are using Pig vs actual Map/Reduce jobs. He adds that they are aiming at 1.2x but that's in the future. I personally think that 1.6 is not that bad, if it can help me get those Map/Reduce jobs out of the door fast.
So what better test than to build my small Hadoop on-going experiment using Pig Latin. Just a little background info: I'm trying to build something more complicated than the usual hello world, it's a rating aggregation job.
I'm trying to go through a dataset of 1.5 million rows - not much for Hadoop but enough for a quick stab, and crunch the numbers so the output is an average rating and a count.
Before, a bit of preparation: export the csv file from mysql, then import it into Hadoop:
$HADOOP_HOME/bin/hadoop fs -copyFromLocal reviews_export_01.02.10.csv input/hotels/reviews_export_01.02.10.csv
Next I am going to buid this as a Pig Latin script:
-- step 1, load the csv file
A = load 'input/hotels/reviews_export_01.02.10.csv'
using PigStorage('\t') as
(id: int, hid: int, locale: chararray, r1: int, r2: int, r3: int, r4: int);
-- step 2 fold each row by computing the average rating for each review
B = foreach A generate id, hid, (r1 + r2 + r3 + r4)/4 as stats;
-- Step 3 group by hotelid, compute average and count
B1 = GROUP B BY hid;
C = FOREACH B1 GENERATE group, COUNT(B), AVG(B.stats);
-- Step 4 save the results into an output file in the hdfs
store C into 'output/pig/stats' using PigStorage('\t');
Step 1: I tried taking some advice from the Pig Cookbook Performance Enhancers (http://hadoop.apache.org/pig/docs/r0.5.0/cookbook.html), like to use types, but it did not make the script faster.
Step 2: is pretty self-explanatory: compute the average rating of each review
Step 3: is a demo of how you can do a group by. You can find a very good explanation of how this works in the Reference Manual(http://hadoop.apache.org/pig/docs/r0.5.0/piglatin_reference.html#GROUP).
The GROUP BY aggregation is a two step ( two instructions?) operation.
First you assign the GROUP BY to a variable.
B1 = GROUP B BY hid;
This translates into: B1: {group: int, B: {id:int, hid:int, stats: float}}, this way you can reference the group column, and use any of the existing columns in a aggregation expression or as they are called in the manual, Relational Operators.
Next you iterate through that variable, and build another one with the final output. In my case the count and the average.
Please note that for aggregation you use the initial variable (B) if you need to reference fields, not the GROUP BY one (B1), hence the 'COUNT(B), AVG(B.stats)'.
Last Step: just export to a file, still into the hdfs.
Finally if you just want to make sure that this worked, just peek at the file
$HADOOP_HOME/bin/hadoop dfs -cat output/pig/stats/part-00000 | head
Free Takeaway
To run a Pig script as a Map/Reduce job you just need to type
java -cp $PIGDIR/pig.jar:.:$HADOOPDIR org.apache.pig.Main test.pig
where $PIGDIR is the directory where Pig resides and $HADOOPDIR is the location of the hadoop conf directory (such as $HADOOP_HOME/conf).
One thing that I've found missing is that there are no aggregation options for rows. I could have worked with an average(col1, col2, ...), instead of having to add and divide. But this is a minor issue, especially taking into account how easy it is to write a custom function and call it from Pig. Well, that may very well be the subject of another blog post.
Now for the sad part: one run lasts around 1m 30sec compared to ~20 seconds that my initial Hadoop Map/Reduce test lasts, and this is for a basic use-case, I did not even get to joins and stuff.
Having said that, I really like Pig, and I can't wait to see how it compares to Hive for simplicity /learning curve and ease of development.
Subscribe to:
Posts (Atom)