使用mapreduce计算白名单内单词的wordcount以及文件分发-file使用

-------------------------------------------
前提:?
hadoop集群搭建好
hadoop版本: 1.2.x
python版本: 2.6.x或2.7.x
-------------------------------------------

?
1.文件分发与打包?
?
如果程序运行所需要的可执行文件、脚本或者配置文件在Hadoop集群的计 算节点上不存在,则首先需要将这些文件分发到集群上才能成功进行计算。?Hadoop提供了自动分发文件和压缩包的机制,只需要在启动Streaming作 业时配置相应的参数。
?
2.文件分发与打包(-file)?
?
如果要分发的文件在本地且没有目录结构,可以使用-file /path/to/FILENAME选项分发文件,将本地文件/path/to/FILENAME分发 到每个计算节点。

? 在Streaming程序中通过./FILENAME就可以访问该文件
? 对于本地可执行的文件,除了指定的mapper或reducer程序外,可能分发后没有可执行权限,所以需要在包装程序如mapper.sh中运行chmod +x ./FILENAME设置可执行权限,然后设置-mapper “mapper.sh”。
#代码和测试数据放在附件



3.代码实例


目录mr_file_broadcast


├── map.py  # map程序 实现指定白名单内单词的分割、记录
├── reduce.py # reduce程序 实现白名单内单词数的统计
├── run.sh # 调用hadoop的运算
├── The_Man_of_Property.txt # 测试数据源
└── white_list # 测试白名单单词数据




map.py


#!/usr/bin/python
# -*- coding:utf-8 -*-
import sys
def read_loacl_file_func(f):
word_set = set()
file_in = open(f, 'r')
for line in file_in:
word = line.strip()
word_set.add(word)
return word_set
def mapper_func(white_list_fd):
word_set = read_loacl_file_func(white_list_fd)
for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
print "%s\t%s" % (s, 1)
if __name__ == '__main__':
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)


本地测试:


[root@master mr_file_broadcast]# cat The_Man_of_Property.txt | python map.py mapper_func white_list 


使用uniq统计单词的出现次数:


cat The_Man_of_Property.txt | python map.py mapper_func white_list | sort | uniq -c


reduce.py


#!/usr/bin/python
#-*- coding:utf-8 -*-
import sys
def reduce_func():
current_word = None
count_pool =
sum = 0
for line in sys.stdin:
word, val = line.strip().split('\t')
if current_word == None:
current_word = word
if current_word != word:
for count in count_pool:
sum += count
print "%s\t%s" % (current_word, sum)
current_word = word
count_pool =
sum = 0
count_pool.append(int(val))
for count in count_pool:
sum += count
print "%s\t%s" % (current_word, str(sum))
if __name__ == '__main__':
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)


本地测试:


cat The_Man_of_Property.txt | python map.py mapper_func white_list | python reduce.py reduce_func | sort


run.sh


#!/bin/bash
HADOOP_CMD="/usr/local/hadoop-1.2.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
OUTPUT_PATH="/output_file_broadcast"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py mapper_func white_list" \
-reducer "python reduce.py reduce_func" \
-jobconf "mapred.reduce.tasks=2" \ # reduce task的数量由mapred.reduce.tasks这个参数设定,默认值是1。
-file ./map.py \
-file ./reduce.py \
-file ./white_list

4.查看结果
[root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/part-00000 /output_file_broadcast/part-00001
give 47
the 5144
man 143
[root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/{part-00000,part-00001}
give 47
the 5144
man 143
[root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/{part-00000,part-00001} | sort -rn -k2
the 5144
man 143
give 47

#END

0 个评论

要回复文章请先登录注册