概述
本例使用 MapReduce 对 Web 日志中客户端 IP 进行统计
Mapper 的输入是 Web 日志,如下
|
|
Mapper 程序以数据的偏移为 Key,日志行为 Value 输入。数据偏移对于本例可以忽略,对输入行以空格分隔后第一个元素即为 IP。
Mapper 的输出是以 IP 为 Key,由于一行只有一个 IP,直接以 1 作为 Value。如下
|
|
经过 Shuffle (混洗),数据被处理为
|
|
混洗的过程可以描述为
Mapper 对相同 Key 的数据进行聚合,然后复制到 Reducer 节点,Reducer 节点对来自多个 Mapper 节点的相同 Key 数据再进一步聚合。这里保证了相同 Key 的数据会被放到一个 Reducer 中。
Reducer 接收到数据,对每个 Key 的 Value 相加,输出结果
|
|
至此,整个 MapReduce 完成。
处理大数据量数据时,为减少网路 IO, Mapper 程序会在靠近数据的节点上运行,优先在数据节点上运行,若数据节点上存在繁忙的 Mapper 程序,则会在同一机架的节点上运行,最后才会挑选不同机架的节点上运行。
Mapper 处理完成时需要将输出传输到 Reducer 节点,这部分需要进行网络传输。为减少这部分的网络 IO,引入了 Combiner 函数。
Combiner 函数就像运行在 Mapper 节点上的 Reducer 一样,将一个 Mapper 的输出先进行处理。
在本例中,Combiner 函数可以将 Mapper 输出的次数先进行相加。也即:
Mapper 1 输出为
|
|
Mapper 2 输出为
|
|
在不使用 Combiner 时,直接将 Mapper 输出传输到 Reducer 节点。若使用 Combiner 函数,
Mapper 1 Combiner 输出为
|
|
Mapper 2 Combiner 输出为
|
|
可以减少传输到 Reducer 的数据量。
Combiner 函数并不是在任何时刻都可以使用,例如在计算平均值时,各个 Mapper 的平均值在 Reducer 上再求平均值与整体求平均值很可能是不同的。
代码
样例代码为
|
|
输入数据
|
|
分别执行命令
|
|
可以看到结果
|
|
参考链接
《Hadoop 权威指南》