HBase 中地图reduce join的使用
HBase 中mapreduce join的使用
首先介绍常用的集中 mapreduce 方法
reduce side join
reduce side join是一种最简单的join方式,其主要思想如下:
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。
map side join
之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
SemiJoin
SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。
reduce side join + BloomFilter
在某些情况下,SemiJoin抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler以节省空间。BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和contains()。最大的特点是不会存在false negative,即:如果contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:如果contains()返回true,则该元素可能在集合中。因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。
数据模型
data.txt
info.txt
期望输出
以reduce端join为例
不使用hbase 进行join 伪代码
Map端:
map端输出结果
reduce端输出
由此可见最大的开销应该是实在shuffle阶段,若每个ID有上百万个键值对,IO开销是非常大的
以mapper端join为例
不使用hbase 进行join 伪代码
Map端代码
使用hbase进行map join
思路就是将cache 用 hbase代替,这样就不用每个分片都去复制一份cache,极大的节省了磁盘开销
如此将info.txt看作一个散列表,不必为每个任务都创建散列表。
首先介绍常用的集中 mapreduce 方法
reduce side join
reduce side join是一种最简单的join方式,其主要思想如下:
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。
map side join
之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
SemiJoin
SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。
reduce side join + BloomFilter
在某些情况下,SemiJoin抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler以节省空间。BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和contains()。最大的特点是不会存在false negative,即:如果contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:如果contains()返回true,则该元素可能在集合中。因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。
数据模型
data.txt
编号 ID 数值1 201001 1003 abc 201002 1005 def 201003 1006 ghi 201004 1003 jkl 201005 1004 mno 201006 1005 pqr
info.txt
ID 数值2 1003 kaka 1004 da 1005 jue 1006 zhao
期望输出
ID 编号 数值1 数值2 1003 201001 abc kaka 1003 201004 jkl kaka 1004 201005 mno da 1005 201002 def jue 1005 201006 pqr jue 1006 201003 ghi zhao
以reduce端join为例
不使用hbase 进行join 伪代码
Map端:
mapper{ String filename = data.txt || info.txt //为不同的来源数据打上不同标识 if(filename is data.txt){ mapper context output(ID+tag1,编号+数值1) }else if(filename is info.txt){ mapper context output(ID+tag2,数值2) } }
map端输出结果
ID TAG 数值1/编号 数值2 1003,0 kaka 1004,0 da 1005,0 jue 1006,0 zhao 1003,1 201001 abc 1003,1 201004 jkl 1004,1 201005 mon 1005,1 201002 def 1005,1 201006 pqr 1006,1 201003 ghi
按照key进行忽略tag 按照key进行分组 忽略tag 最终输出 同一组: 1003,0 kaka 1003,0 201001 abc 1003,0 201004 jkl 同一组: 1004,0 da 1004,0 201005 mon 同一组: 1005,0 jue 1005,0 201002 def 1005,0 201006 pqr 同一组: 1006,0 zhao 1006,0 201003 ghi
reduce端输出
reducer{ reducer context output() }
由此可见最大的开销应该是实在shuffle阶段,若每个ID有上百万个键值对,IO开销是非常大的
以mapper端join为例
不使用hbase 进行join 伪代码
Map端代码
Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf);//获取info.txt BufferedReader joinReader = new BufferedReader( new FileReader(cacheFiles[0].toString())); while ((line = joinReader.readLine()) != null) { tokens = line.split(" "); hashmap.put(ID, 数值2); } mapper { //读取data.TXT 获取hashmap.get(ID) 拼接map输出 }
使用hbase进行map join
思路就是将cache 用 hbase代替,这样就不用每个分片都去复制一份cache,极大的节省了磁盘开销
info = HBase.connect("info.txt") 获取相应值 context.output(拼接字符串)
如此将info.txt看作一个散列表,不必为每个任务都创建散列表。