mapreduce执行详细流程

回复

wolverine 发起了问题 • 0 人关注 • 0 个回复 • 1404 次浏览 • 2017-06-11 12:35 • 来自相关话题

二期学员报道贴

linchao 回复了问题 • 8 人关注 • 7 个回复 • 1368 次浏览 • 2017-06-07 15:26 • 来自相关话题

hadoop_streaming开发要点_提交作业jobconf常用配置

everything 发表了文章 • 0 个评论 • 1463 次浏览 • 2017-06-05 15:18 • 来自相关话题

Hadoop streaming是和hadoop一起发布的实用程序。它允许用户创建和执行使用任何程序或者脚本(例如: python,ruby,shell,php)编写的map或者reduce的mapreduce jobs。

# 脚本程序run.sh
[code]$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FI 查看全部


Hadoop streaming是和hadoop一起发布的实用程序。它允许用户创建和执行使用任何程序或者脚本(例如: python,ruby,shell,php)编写的map或者reduce的mapreduce jobs。


# 脚本程序run.sh
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py" \
-reducer "python reduce.py" \
-jobconf "mapred.reduce.tasks=2" \ # reduce task的数量由mapred.reduce.tasks这个参数设定,默认值是1。
-file ./map.py \
-file ./reduce.py
input: 指定作业的输入文件的HDFS路径,支持使用*通配符,支持指定多个文件或目录,可多次使用

output: 指定作业的输出文件的HDFS路径,路径必须存在,执行作业用户必须有创建该目录的权限,只能使用一次

mapper: 用户自己写的mapper程序

reducer: 用户自己写的reduce程序

file: 允许用户设置task的文件和文件档案,类似的配置还有-cacheFile, -cacheArchive分别用于向计算节点分发HDFS文件和HDFS压缩文件
分发的文件有如下:
  • map和reduce的执行文件
  • map和reduce要用到的数据文件、配置文件


jobconf: 提交作业的一些配置属性
  • mapred.map.tasks:map task数目
  • mapred.reduce.tasks:reduce task数目
  • stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
  • num.key.fields.for.partition指定对key分出来的前几部分做partition而不是整个key

?
#?jobconf常用配置
?
mapred.job.name 作业名
mapred.job.priority 作业优先级
mapred.job.map.capacity 最多同时运行map任务数
mapred.job.reduce.capacity 最多同时运行reduce任务数
mapred.task.timeout 任务没有响应(输入输出)的最大时间
mapred.compress.map.output map的输出是否压缩
mapred.map.output.compression.codec map的输出压缩方式
mapred.output.compress reduce的输出是否压缩
mapred.output.compression.codec reduce的输出压缩方式
stream.map.output.field.separator map输出分隔符

使用mapreduce计算白名单内单词的wordcount以及文件分发-file使用

everything 发表了文章 • 0 个评论 • 768 次浏览 • 2017-05-25 14:38 • 来自相关话题

-------------------------------------------
前提:?
hadoop集群搭建好
hadoop版本: 1.2.x
python版本: 2.6.x或2.7.x
-------------------------------------------
?
1.文件分发与打包?
?
如果程序运行所需要的可执行文件、脚本或者配置文件在Hadoop集群的 查看全部
-------------------------------------------
前提:?
hadoop集群搭建好
hadoop版本: 1.2.x
python版本: 2.6.x或2.7.x
-------------------------------------------

?
1.文件分发与打包?
?
如果程序运行所需要的可执行文件、脚本或者配置文件在Hadoop集群的计 算节点上不存在,则首先需要将这些文件分发到集群上才能成功进行计算。?Hadoop提供了自动分发文件和压缩包的机制,只需要在启动Streaming作 业时配置相应的参数。
?
2.文件分发与打包(-file)?
?
如果要分发的文件在本地且没有目录结构,可以使用-file /path/to/FILENAME选项分发文件,将本地文件/path/to/FILENAME分发 到每个计算节点。

? 在Streaming程序中通过./FILENAME就可以访问该文件
? 对于本地可执行的文件,除了指定的mapper或reducer程序外,可能分发后没有可执行权限,所以需要在包装程序如mapper.sh中运行chmod +x ./FILENAME设置可执行权限,然后设置-mapper “mapper.sh”。
#代码和测试数据放在附件



3.代码实例


目录mr_file_broadcast


├── map.py  # map程序 实现指定白名单内单词的分割、记录
├── reduce.py # reduce程序 实现白名单内单词数的统计
├── run.sh # 调用hadoop的运算
├── The_Man_of_Property.txt # 测试数据源
└── white_list # 测试白名单单词数据




map.py


#!/usr/bin/python
# -*- coding:utf-8 -*-
import sys
def read_loacl_file_func(f):
word_set = set()
file_in = open(f, 'r')
for line in file_in:
word = line.strip()
word_set.add(word)
return word_set
def mapper_func(white_list_fd):
word_set = read_loacl_file_func(white_list_fd)
for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
print "%s\t%s" % (s, 1)
if __name__ == '__main__':
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)


本地测试:


[root@master mr_file_broadcast]# cat The_Man_of_Property.txt | python map.py mapper_func white_list 


使用uniq统计单词的出现次数:


cat The_Man_of_Property.txt | python map.py mapper_func white_list | sort | uniq -c


reduce.py


#!/usr/bin/python
#-*- coding:utf-8 -*-
import sys
def reduce_func():
current_word = None
count_pool =
sum = 0
for line in sys.stdin:
word, val = line.strip().split('\t')
if current_word == None:
current_word = word
if current_word != word:
for count in count_pool:
sum += count
print "%s\t%s" % (current_word, sum)
current_word = word
count_pool =
sum = 0
count_pool.append(int(val))
for count in count_pool:
sum += count
print "%s\t%s" % (current_word, str(sum))
if __name__ == '__main__':
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)


本地测试:


cat The_Man_of_Property.txt | python map.py mapper_func white_list | python reduce.py reduce_func | sort


run.sh


#!/bin/bash
HADOOP_CMD="/usr/local/hadoop-1.2.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
OUTPUT_PATH="/output_file_broadcast"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py mapper_func white_list" \
-reducer "python reduce.py reduce_func" \
-jobconf "mapred.reduce.tasks=2" \ # reduce task的数量由mapred.reduce.tasks这个参数设定,默认值是1。
-file ./map.py \
-file ./reduce.py \
-file ./white_list

4.查看结果
[root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/part-00000 /output_file_broadcast/part-00001
give 47
the 5144
man 143
[root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/{part-00000,part-00001}
give 47
the 5144
man 143
[root@master mr_file_broadcast]# hadoop fs -text /output_file_broadcast/{part-00000,part-00001} | sort -rn -k2
the 5144
man 143
give 47

#END

在windows上如何打开mapreduce的追踪网址?

回复

everything 发起了问题 • 2 人关注 • 0 个回复 • 1110 次浏览 • 2017-05-24 11:53 • 来自相关话题

小白如何使用hadoop1.2实现MapReduce第一个功能wordcount

everything 发表了文章 • 0 个评论 • 685 次浏览 • 2017-05-24 11:31 • 来自相关话题

-------------------------------------------
前提:?
hadoop集群搭建好
hadoop版本: 1.2.x
python版本: 2.6.x或2.7.x
-------------------------------------------
# 在master的根目录建立 learn_hadoop/python_wordcount 并进入目录[code][ro 查看全部
-------------------------------------------
前提:?
hadoop集群搭建好
hadoop版本: 1.2.x
python版本: 2.6.x或2.7.x
-------------------------------------------
# 在master的根目录建立 learn_hadoop/python_wordcount 并进入目录
[root@master learn_hadoop]# mkdir -p ~/learn_hadoop/python_wordcount
[root@master learn_hadoop]# cd ~/learn_hadoop/python_wordcount/


# 准备数据源
cat /etc/passwd | tr ':' ' ' | tr '/' ' ' > test.txt

# shell实现简单的wordcount
cat test.txt | tr ' ' '\n' | sort | uniq -c | sort -rn -k1 | head -10
# 编写map.py?用于数据的分割
#!/usr/local/bin/python
import sys
# 从标准输入流获取数据源 并遍历
for line in sys.stdin:
# strip() 表示删除空白符 相当于 java的String.trim()
# 每一行以空格为分割点 得到数组
ss = line.strip().split(' ')
# 遍历数组
for s in ss:
# 继续strip() 并判断字符串是否为""
if s.strip() != "":
# 输出 单词 和 1(表示次数)
print "%s\t%s" % (s, 1)

# 编写reduce.py?用于数据的合并
#!/usr/local/bin/python
import sys
# 定义一个变量用于存放当前记录的key
current_word = None
# 定义一个数组用于存放获取到key的值统计数组
count_pool =
# 定义一个变量用于存放单词出现的总次数
sum = 0
for line in sys.stdin:
# 每一行记录都切割成键值对
# 例如: the 1 变为 {"the", 1}
word, val = line.strip().split('\t')
# 当前key 为 none
if current_word == None:
# 赋值
current_word = word
# 当前key != 此次获取到key
if current_word != word:
# 遍历统计数组 得到单词出现的总次数 并且打印出来
for count in count_pool:
# 累计单词出现的总次数
sum += count
# 打印出来 当前单词 以及 总次数
print "%s\t%s" % (current_word, sum)
# 将此次得到的单词 赋值给 当前key这个变量
current_word = word
# 初始化统计数组
count_pool =
# 初始化总次数
sum = 0
# 将此次键值对的值追加到数组中
count_pool.append(int(val))
# 最后一行记录 会循环统计并打印出
# 当前单词 以及 出现次数
for count in count_pool:
sum += count
print "%s\t%s" % (current_word, str(sum))

?
# 本地测试的命令 得到输出数据 按倒序 单词 以及 出现次数?
cat test.txt | python map.py | sort -k1 | python reduce.py | sort -k2 -rn

# 编写run.sh 将脚本提交到hadoop运行
# 根据自己的实际hadoop安装目录定义变量
HADOOP_CMD="/usr/local/hadoop-1.2.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
INPUT_FILE_PATH_1="/test.txt"
OUTPUT_PATH="/output"
# hdfs上如果已存在/output目录则需要下面的命令
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# 执行命令
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py" \
-reducer "python reduce.py" \
-file ./map.py \
-file ./reduce.py

# 给run.sh赋予执行权限
chmod -R 775 run.sh
# 执行
./run.sh

?
# 查看hdfs上的输出文件?
[root@master python_wordcount]# hadoop fs -lsr /output
Warning: $HADOOP_HOME is deprecated.
-rw-r--r-- 3 root supergroup 0 2017-04-14 01:28 /output/_SUCCESS
drwxr-xr-x - root supergroup 0 2017-04-14 01:28 /output/_logs
drwxr-xr-x - root supergroup 0 2017-04-14 01:28 /output/_logs/history
-rw-r--r-- 3 root supergroup 16597 2017-04-14 01:28 /output/_logs/history/job_201704110143_0003_1492104524860_root_streamjob2185335538628447165.jar
-rw-r--r-- 3 root supergroup 52682 2017-04-14 01:28 /output/_logs/history/job_201704110143_0003_conf.xml
-rw-r--r-- 3 root supergroup 734 2017-04-14 01:28 /output/part-00000

# 发现hdfs上/output/part-00000?该文件为输出数据结果
# 查看hdfs上的输出数据 按倒序 单词 以及 出现次数
hadoop fs -text /output/part-00000 | sort -k2 -rn
# 输出结果与本地测试结果一致,大功告成
#END
?