Python进展简单的MapReduce(1)
Python进行简单的MapReduce(1)
所有操作,假定hadoop集群已经正常部署。
Python源码
mapper.py
reduce.py
先后存储在/home/src下,然后,cd到此目录
在hdfs上建立测试目录:
ls
hadoop fs -ls /user/hdfs
mkdir
hadoop fs -mkdir /user/hdfs/test
从本地磁盘copy测试文件到hdfs
hadoop fs -copuFromLocal /home/src/*.txt /user/hdfs/test/
使用streaming.jar执行mapreduce任务
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -input /user/hdfs/test/* -output /user/hdfs/test/reducer -mapper cat -reducer aggregate
执行结果:
......
14/11/26 12:54:52 INFO mapreduce.Job: map 0% reduce 0%
14/11/26 12:54:59 INFO mapreduce.Job: map 100% reduce 0%
14/11/26 12:55:04 INFO mapreduce.Job: map 100% reduce 100%
14/11/26 12:55:04 INFO mapreduce.Job: Job job_1415798121952_0179 completed successfully
......
14/11/26 12:55:04 INFO streaming.StreamJob: Output directory: /user/hdfs/test/reducer
......
查看执行结果集文件
hadoop fs -ls /user/hdfs/test
......
drwxr-xr-x - root Hadoop 0 2014-11-26 12:55 /user/hdfs/test/reducer
......
所有操作,假定hadoop集群已经正常部署。
Python源码
mapper.py
#!/usr/bin python import sys # input comes from STDIN (standard input) for line in sys.stdin: line = line.strip() words = line.split() for word in words: print '%s\\t%s' % (word, 1)
reduce.py
#!/usr/bin python from operator import itemgetter import sys word2count = {} # input comes from STDIN for line in sys.stdin: line = line.strip() word, count = line.split('\\t', 1) try: count = int(count) word2count[word] = word2count.get(word, 0) + count except ValueError: # count was not a number, so silently # ignore/discard this line pass sorted_word2count = sorted(word2count.items(), key=itemgetter(0)) for word, count in sorted_word2count: print '%s\\t%s'% (word, count)
先后存储在/home/src下,然后,cd到此目录
在hdfs上建立测试目录:
ls
hadoop fs -ls /user/hdfs
mkdir
hadoop fs -mkdir /user/hdfs/test
从本地磁盘copy测试文件到hdfs
hadoop fs -copuFromLocal /home/src/*.txt /user/hdfs/test/
使用streaming.jar执行mapreduce任务
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -input /user/hdfs/test/* -output /user/hdfs/test/reducer -mapper cat -reducer aggregate
执行结果:
......
14/11/26 12:54:52 INFO mapreduce.Job: map 0% reduce 0%
14/11/26 12:54:59 INFO mapreduce.Job: map 100% reduce 0%
14/11/26 12:55:04 INFO mapreduce.Job: map 100% reduce 100%
14/11/26 12:55:04 INFO mapreduce.Job: Job job_1415798121952_0179 completed successfully
......
14/11/26 12:55:04 INFO streaming.StreamJob: Output directory: /user/hdfs/test/reducer
......
查看执行结果集文件
hadoop fs -ls /user/hdfs/test
......
drwxr-xr-x - root Hadoop 0 2014-11-26 12:55 /user/hdfs/test/reducer
......