本文共 3526 字,大约阅读时间需要 11 分钟。
主要使用的Hadoop提供的Hadoop Streaming,首先,介绍一下Hadoop Stream
Streaming默认只能处理文本数据Textfile,对于二进制数据,比较好的方法是将二进制的key, value进行base64编码,转化为文本;
Mapper和reducer的前后都要进行标准输入和标准输出的转化,涉及数据拷贝和解析,带来了一定的开销。# hadoop jar hadoop-streaming-2.6.5.jar \ [普通选项] [Streaming选项]
普通选项和Stream选项可以参考如下网址:
#!/usr/bin/env python import sys # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print '%s\t%s' % (word, 1)
在这个脚本中,并不计算出单词出现的总数,它将输出 " 1" 迅速地,尽管可能会在输入中出现多次,计算是留给后来的Reduce步骤(或叫做程序)来实现。记住为mapper.py赋予可执行权限:chmod 777
#!/usr/bin/env python from operator import itemgetter import sys current_word = None current_count = 0 word = None # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = int(count) except ValueError: # count was not a number, so silently # ignore/discard this line continue # this IF-switch only works because Hadoop sorts map output # by key (here: word) before it is passed to the reducer if current_word == word: current_count += count else: if current_word: # write result to STDOUT print '%s\t%s' % (current_word, current_count) current_count = count current_word = word # do not forget to output the last word if needed! if current_word == word: print '%s\t%s' % (current_word, current_count)
将代码存储在/usr/local/hadoop/reducer.py 中, 的STDIN中读取结果,然后计算每个单词出现次数的总和,并输出结果到STDOUT。
同样,要注意脚本权限:chmod 777root@localhost:/root/pythonHadoop$ echo "foo foo quux labs foo bar quux" | ./mapper.py foo 1 foo 1 quux 1 labs 1 foo 1 bar 1 quux 1 root@localhost:/root/pythonHadoop$ echo "foo foo quux labs foo bar quux" |./mapper.py | sort |./reducer.py bar 1 foo 3 labs 1 quux 2
如果执行效果如上,则证明可行。可以运行MapReduce了。
[root@node01 pythonHadoop] hadoop jar contrib/hadoop-streaming-2.6.5.jar -mapper mapper.py -file mapper.py -reducer reducer.py -file reducer.py -input /ooxx/* -output /ooxx/output/
参考文章:
转载地址:http://qscrn.baihongyu.com/