Python使用Hadoop集群

$MapReduce$ 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 $Map$ 函数将数据处理成 $key/value$ 格式的数据集合;然后再创建一个 $Reduce$ 函数用来合并所有的具有相同 $key$ 值的 $value$ 值。

编写MapReduce函数

以下的实例是 $Google$ 在 $MapReduce$ 的论文中提到的一个计算单词个数的例子。使用 $Python$ 编写 $MapReduce$ 函数与通常编写程序的区别是我们需要利用 $Hadoop$ 流的 $API$,通过标准输入($sys.stdin$)、标准输出($sys.stdout$)在 $Map$ 函数和 $Reduce$ 函数之间传递数据,其余的事情 $Hadoop$ 流将会帮助我们完成。

Mapper部分

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print "%s\t%s" % (word, 1)

文件从标准输入 sys.stdin 读取文件后把单词切开,并把单词和词频输出标准输出。$mapper$ 不计算单词的总数,而是输出 (word, 1) ,方便让随后的 $reducer$ 做统计工作。

对文件赋权以使其可以运行

chmod +x mapper.py

Reducer部分

!/usr/bin/env python
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_count = count
        current_word = word

if word == current_word:
    print "%s\t%s" % (current_word, current_count)

按照 $Hadoop$ 的设计理念,$mapper$ 输出的相同的 $key/value$ 数据会存储在同一个 $part$ 上,并且 $mapper$ 的输出会自动作为 $reducer$ 的输入。在这个需求中,$reducer$ 要做的事情只是依次接受集群的标准输入,并统计连续单词的个数,因为在这个例子中,$key$ 代表的就是单词。

同样对文件赋权以使其可以运行

chmod +x reducer.py

本地测试

通常在把任务提交给 $Hadoop$ 集群执行的之前,都进行一次本地测试。本地与 $Hadoop$ 环境不同之处在于我们需要手动对 $mapper$ 的输出按照 $key$ 来进行排序,模拟 $Hadoop$ 集群场景。

cat data | mapper | sort | reducer > output

实例:

cat news.merge.0.json | python mapper.py | sort -k1,1 | python reducer.py > ret
cat file_input | python mapper.py > ret

第一行是对当前统计单词个数的测试命令,可以拓展到含有 $mapper$ 和 $reducer$ 的任务中,第二行是对 $mapper$ 的单独测试,同样也可是应用到只有 $mapper$ 没有 $reducer$ 的任务中。

使用 Hadoop 集群执行任务

集群运行

在把任务提交给集群执行时,我们常常将诸多命令合并到一个脚本文件中,一次性执行所有内容

nohup sh -x hadoop.sh &

$hadoop.sh$ 的书写方法如下

对于含有 $mapper$ 和 $reducer$ 的任务

#!/bin/bash
HADOOP='/home/work/infra/infra-client/bin/hadoop'

${HADOOP} fs -rmr /user/.../output

${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \
    -D mapreduce.job.queuename=root.production.cloud_group.qabot  \
    -input /user/..../input-data \
    -output /user/.../output/\
    -mapper 'python mapper.py' \
    -reducer 'python reducer.py' \
    -file './mapper.py' \
        -file './reducer.py'

只有 $mapper$ 没有 $reducer$ 的任务

#!/bin/bash
HADOOP=/home/work/infra/infra-client/bin/hadoop

${HADOOP} fs -rmr /user/.../output

${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \
    -D mapreduce.job.queuename=root.production.cloud_group.qabot  \
    -input /user/..../input-data \
    -output /user/.../output/\
    -mapper 'python mapper.py' \
    -reducer 'cat' \
    -file './mapper.py' \

$nohup$ 中会实时更新集群对任务的处理进度,任务执行结束后会在指定的 $output$ 目录下输出一个 $part-00000$ 文件。

其中, -file 命令用于上传文件到 $Hadoop$ 集群,若 $mapper$ 或 $reducer$ 中读入了额外的文件,同样需要将该程序上传到 $Hadoop$ 集群上。

常用的 Hadoop Shell 命令

$Hadoop$ 所有的对文件的操作都需要调用文件系统,所以其命令的格式为

hadoop fs -shell

详细的命令可以查看这里: Hadoop Shell

get

hadoop fs -get [-ignorecrc] [-crc]  

复制文件到本地文件系统。可用 -ignorecrc 选项复制 CRC 校验失败的文件。使用 -crc 选项复制文件以及 CRC 信息。

hadoop fs -get /user/hadoop/file localfile
hadoop fs -get hdfs://host:port/user/hadoop/file localfile

put

hadoop fs -put  ... 

从本地文件系统中复制单个或多个源路径到目标文件系统。也支持从标准输入中读取输入写入目标文件系统。

hadoop fs -put localfile /user/hadoop/hadoopfile
hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir
hadoop fs -put localfile hdfs://host:port/hadoop/hadoopfile
hadoop fs -put - hdfs://host:port/hadoop/hadoopfile

参考文献:

[1] MapReduce: Simplified Data Processing on Large Clusters-Google

[2] 用python写MapReduce函数 以WordCount为例-CNBolgs

[3] Hadoop Shell命令-Apache Hadoop