$MapReduce$ 是一个编程模型，也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 $Map$ 函数将数据处理成 $key/value$ 格式的数据集合；然后再创建一个 $Reduce$ 函数用来合并所有的具有相同 $key$ 值的 $value$ 值。

## 编写MapReduce函数

### Mapper部分

#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print "%s\t%s" % (word, 1)


chmod +x mapper.py


### Reducer部分

!/usr/bin/env python
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if current_word == word:
current_count += count
else:
if current_word:
print "%s\t%s" % (current_word, current_count)
current_count = count
current_word = word

if word == current_word:
print "%s\t%s" % (current_word, current_count)


chmod +x reducer.py


### 本地测试

cat data | mapper | sort | reducer > output


cat news.merge.0.json | python mapper.py | sort -k1,1 | python reducer.py > ret
cat file_input | python mapper.py > ret


### 集群运行

nohup sh -x hadoop.sh &


$hadoop.sh$ 的书写方法如下

#!/bin/bash

${HADOOP} fs -rmr /user/.../output${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \
-D mapreduce.job.queuename=root.production.cloud_group.qabot  \
-input /user/..../input-data \
-output /user/.../output/\
-mapper 'python mapper.py' \
-reducer 'python reducer.py' \
-file './mapper.py' \
-file './reducer.py'


#!/bin/bash

${HADOOP} fs -rmr /user/.../output${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \
-D mapreduce.job.queuename=root.production.cloud_group.qabot  \
-input /user/..../input-data \
-output /user/.../output/\
-mapper 'python mapper.py' \
-reducer 'cat' \
-file './mapper.py' \


$nohup$ 中会实时更新集群对任务的处理进度，任务执行结束后会在指定的 $output$ 目录下输出一个 $part-00000$ 文件。

$Hadoop$ 所有的对文件的操作都需要调用文件系统，所以其命令的格式为

hadoop fs -shell


### get

hadoop fs -get [-ignorecrc] [-crc]


hadoop fs -get /user/hadoop/file localfile


### put

hadoop fs -put  ...


hadoop fs -put localfile /user/hadoop/hadoopfile