Pig 的flatten, cogroup

原文:http://blog.csdn.net/zythy/article/details/18426347

2个数据集:
student. 结构为(classNo,stuNo,score) 

C01,	N0101,	82
C01, 	N0102,	59
C01, 	N0103,	65
C02, 	N0201,	81
C02, 	N0202,	82
C02, 	N0203,	79
C03, 	N0301,	56
C03, 	N0302,	92
C03, 	N0306,	72

teacher 结构为(classNo,name) 

C01,	Zhang
C02, 	Sun
C03, 	Wang
C04, 	Dong



GROUP
执行以下命令:

grouped_student = group student by classNo parallel 2;
dump grouped_student;


结果如下:

(C02, {(C02,N0203,79), (C02,N0202,82), (C02,N0201,81)} )
(C01, {(C01,N0103,65), (C01,N0102,59), (C01,N0101,82)} )
(C03, {(C03,N0306,72), (C03,N0302,92), (C03,N0301,56)} )



其中的Paraller 2表示启用2个Reduce操作。
如何统计每个班级及格和优秀的学生人数呢?执行以下两个命令:

result = foreach grouped_student {
         fail =filter records by score < 60;
         excellent =filter records by score >=90;
         generate group, COUNT(fail) as fail, COUNT(excellent) as excellent;
};
dump result;


结果如下:

(C01, 1, 0)
(C02, 0, 0)
(C03, 1, 1)

 

FLATTEN
flatten操作,可以将数据格式扁平化。我们分别通过tuple和bag来看看flatten的作用:
1)  Flatten对tuple的作用
执行以下命令:

a= foreach student generate $0, ($1,$2);
dump a;


输出结果如下:

(C01, (N0101,82))
(C01, (N0102,59))
(C01, (N0103,65))
(C02, (N0201,81))
(C02, (N0202,82))
(C02, (N0203,79))
(C03, (N0301,56))
(C03, (N0302,92))
(C03, (N0306,72))



然后,执行:

b = foreach a generate $0, flatten($1);
dump b;



结果如下:

(C01, N0101,82)
(C01, N0102,59)
(C01, N0103,65)
(C02, N0201,81)
(C02, N0202,82)
(C02, N0203,79)
(C03, N0301,56)
(C03, N0302,92)
(C03, N0306,72)


由此看见,flatten作用于tuple时,将flatten对应的字段(tuple)中的字段扁平化为关系中的字段。 


2)  Flatten对bag的作用
执行以下命令

c = foreach records generate $0, <strong>{($1), ($1,$2)}</strong>;
dump c;



结果如下:

(C01, {(N0101),  (N0101,82)})
(C01, {(N0102),  (N0102,59)})
(C01, {(N0103),  (N0103,65)})
(C02, {(N0201),  (N0201,81)})
(C02, {(N0202),  (N0202,82)})
(C02, {(N0203),  (N0203,79)})
(C03, {(N0301),  (N0301,56)})
(C03, {(N0302),  (N0302,92)})
(C03, {(N0306),  (N0306,72)})



接下来执行:

d = foreach c generate $0, flatten($1);
dump d;



结果如下:

(C01, N0101)
(C01, N0101,82)
(C01, N0102)
(C01, N0102,59)
(C01, N0103)
(C01, N0103,65)
(C02, N0201)
(C02, N0201,81)
(C02, N0202)
(C02, N0202,82)
(C02, N0203)
(C02, N0203,79)
(C03, N0301)
(C03, N0301,56)
(C03, N0302)
(C03, N0302,92)
(C03, N0306)
(C03, N0306,72)



可以看出,flatten作用于bag时,会消除嵌套关系,生成类似于笛卡尔乘积的结果。 
 
COGROUP
Join的操作结果是平面的(一组元组),而COGROUP的结果是有嵌套结构的。
运行以下命令:

r1 = cogroup  student by classNo,  teacher by classNo;
dump r1;



结果如下:

(C01, {(C01,N0103,65),(C01,N0102,59), (C01,N0101,82)},{(C01,Zhang)})
(C02, {(C02,N0203,79),(C02,N0202,82), (C02,N0201,81)},{(C02,Sun)})
(C03, {(C03,N0306,72),(C03,N0302,92), (C03,N0301,56)},{(C03,Wang)})
(C04, {},                             {(C04,Dong)})



由结果可以看出:
1)  cogroup和join操作类似。
2)  生成的关系有3个字段。第一个字段为连接字段;第二个字段是一个包,值为关系1中的满足匹配关系的所有元组;第三个字段也是一个包,值为关系2中的满足匹配关系的所有元组。
3)  类似于Join的外连接。比如结果中的第四个记录,第二个字段值为空包,因为关系1中没有满足条件的记录。实际上第一条语句和以下语句等同:

r1= cogroup  student by classNo outer,   teacher by classNo outer;

 
如果你希望关系1或2中没有匹配记录时不在结果中出现,则可以分别在关系中使用inner而关键字进行排除。
执行以下语句:

r1 = cogroup  student by classNo inner,  teacher by classNo outer;
dump r1;


结果为:

(C01, {(C01,N0103,65),  (C01,N0102,59),  (C01,N0101,82)},{(C01,Zhang)})
(C02, {(C02,N0203,79),  (C02,N0202,82),  (C02,N0201,81)},{(C02,Sun)})
(C03, {(C03,N0306,72),  (C03,N0302,92),  (C03,N0301,56)},{(C03,Wang)})

 
如先前我们讲到的flatten,执行以下命令:

r2 = foreach r1 generate flatten($1), flatten($2);
dump r2;


结果如下:

(C01, N0103,65, C01,Zhang)
(C01, N0102,59, C01,Zhang)
(C01, N0101,82, C01,Zhang)
(C02, N0203,79, C02,Sun)
(C02, N0202,82, C02,Sun)
(C02, N0201,81, C02,Sun)
(C03, N0306,72, C03,Wang)
(C03, N0302,92, C03,Wang)
(C03, N0301,56, C03,Wang)

 
UNION
执行以下语句:

r_union = union  student,  teacher;
dump r_union;


结果如下:

(C01, N0101,82)
(C01, N0102,59)
(C01, N0103,65)
(C02, N0201,81)
(C02, N0202,82)
(C02, N0203,79)
(C03, N0301,56)
(C03, N0302,92)
(C03, N0306,72)
(C01, Zhang)
(C02, Sun)
(C03, Wang)
(C04, Dong)


可以看出:
1)  union是取两个记录集的并集。
2)  关系r_union的schema为未知(unknown),这是因为被union的两个关系的schema是不一样的。如果两个关系的schema是一致的,则union后的关系将和被union的关系的schema一致。