Hadoop权威指南-2.4 横向扩展 - 高飞网

2.4 横向扩展

2016-10-20 10:51:49.0

    前面介绍了MapReduce针对海量输入数据是如何工作的,现在我们开始鸟瞰整个系统以及有大师输入时的数据流。为了简单起见,到目前为止,我们的例子都只是用了本地文件系统中的文件(笔者前面使用的示例已经用到了HDFS)。然后,为了实现横向扩展(scaling out),我们需要把数据存储在分布式文件系统中,一般为HDFS,由此允许Hadoop将MapReduce计算转移到存储有部分数据的各台机器上。

2.4.1 数据流

    首选定义一术语。MapReduce作业(job)是客户端需要执行的一个工作单元:它包括输入数据、MapReduce程序和配置信息。Hadoop将作业分成若干个小任务(task)来执行,其中包括两类任务:map任务和reduce任务

    有两类节点控制着作业执行过程:一个jobtracker及一系列tasktracker。jobtracker通过调度tasktracker上运行的任务来协调所有运行在系统上的作业。tasktracker在运行任务的同时将运行进度报告给jobtracker,jobtracker由此记录每项作业的整体进度情况。如果其中一个任务失败,jobtracker可以在另外一个tracker节点上重新调度该任务。

    Hadoop将MapReduce的输入数据划分成行长的小数据块,称为输入分片(input split)或简称“分片”。Hadoop为每个分片构建一个map任务,并由该任务来运行用户自定义的map函数从而处理分片中的每条记录。    

    拥有许多分片,意味着处理每个分片所需要的时间少于处理整个输入数据所花的时间。因此,如果我们并行处理每个分片,且每个分片数据比较小,那么整个处理过程将获得更好的负责平衡,因为一台较快的计算机能处理的数据分片比一台较慢的计算机更多,且成一定的比例。即使使用相同的机器,失败的进程或其他同时运行的作业能够实现满意的负责平衡,并且如果分片被切分得更细,负责平衡会更高。

    另一全面,如果分片切片得太小,那么管理分片的总时间和构建map任务的总时间将决定作业的整个执行时间。对于大多数作业来说,一个合理的分片大小趋向于HDFS的一个块大小,默认是64MB,不可以针对集群调整这个默认值(对新建的所有文件),或对新建的每个文件具体指定。

    Hadoop在存储有输入数据(HDFS中的数据)的节点上运行map任务,可以获得最佳性能。这就是所谓的“数据本地化优化”(data locality optimization),因为它无需使用宝贵的集群带宽资源。但是有时对于一个map任务的输入来说,存储有某个HDFS数据块备份的三个节点可能正在运行其他map任务,此时作业调度需要在三个备份中的某个数据寻求同个机架中空闲的机器来运行该map任务。仅仅在非常偶然的情况下(该情况基本上不会发生),会使用其他机架中的机器运行该map任务,这将导致机架与机架之间的网络传输。下图显示了这三个可能性。


    现在我们应该清楚为什么最侍分片的大小应该与块大小相同:因为它是确保可以存储在单个节点上的最大输入块的大小如果分片跨越两个数据块,那么对于任何一个HDFS节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到map任务节点。与使用本地数据运行整个map任务相比,这种方法显示效率更低。

    map任务将其输出写入本地硬盘,而非HDFS。这是为什么以?因为map的输入是中间结果:该中间结果由reduce任务处理后才产生最终输出结果,而且一量作业完成,map的输出结果就可以删除。因此如果把它存储在HDFS中并实现备份,难免有些小题大做。如果该节点上运行的map任务在将map中间结果传送给reduce任务之前失败,Hadoop将在另一个节点上重新运行这个map任务以再次构建map中间结果。

    reduce任务并不具备数据本地化的优势——单个reduce任务的输入通常来自于所有mapper的输出。在本例中,我们仅有一个reduce任务,其输入是所有map任务的输出。因此,排过序的map输出需通过网络传输发送到运行reduce任务的节点。数据在reduce端合并,然后由用户定义的reduce函数处理。reduce的输出通常存储在HDFS中以实现可靠存储。如第3章所述,对于每个reduce输出的HDFS块,第一个复本存储在本地节点上,其他复本存储在其他机架节点中。因此,将reduce的输出写入HDFS确实需要占用网络带宽,但这与正常的HDFS流水线写入的消耗一样。

    一个reduce任务的完整数据流如图所示。虚线框表示节点,虚线箭头表示节点内部的数据传输,而实线箭头表示不同节点之间的数据传输。

    reduce任务的数量并非由输入数据的大小决定,而事实上是独立指定的。7.1.1节将介绍如何为指定的作业选择reduce任务的数量

    如果有好多个reduce任务,每个任务建一个分区。每个分区有许多键,但每个键对应的键/值对记录都在同一个分区。分区由用户定义的partition函数控制,但通常用默认的partitioner通过哈希函数来分区,很高效。

    一般情况下,多个reduce任务的数据流如图2-4所示。该图清楚地表明了为什么map任务和reduce任务之间的数据流称为shuffle(混洗)。因为每个reduce任务的输入都来自许多map任务。shuffle一般比图中所示的更复杂,而且调整混洗参数对作业总执行时间影响非常大。

    最后,当数据处理可以完全并行,即无需混洗时,可能会出现无reduce任务的情况。在这种情况下,唯一的非本地节点数据传输是map任务将结果写入HDFS,如下图所示:


 2.4.2 combiner函数

    集群上的可用带宽限制了MapReduce作业的数量,因此尽量避免map和reduce任务之间的数据传输是有利的。Hadoop允许用户针对map任务的输出指定一个combiner——combiner函数的输出作为reduce函数的输入。

    combiner的规则制约着可用的函数类型。还是以气温数据为例,比如有以下两个map,第一个map为:

    (1950 ,0)、(1950,20)、(1950,10)

    第二个map为:

    (1950,25)、(1950,15)

    正常情况下reduce函数时,输入如下:max(0,20,10,25,15) ,因为25为该列数据是大的,所以它的输出如下:(1950,25)

    如果使用combiner函数,则map的输出结果变为(1950,[20,25]),简单地说,可以通过下面的表达式来说明气温数值的函数调用:

    max(0,20,10,25,15) = max((max(0,20,10),max(25,15)) = max(20,25) = 25

    并非所有的函数都具有该属性。例如如果计算平均气温,就不能用平均数作为combiner。


    指定一个combiner

    combiner是通过Reducer类来定义的,并且在这个例子中,它的实现与MaxTemperatureReducer中的reduce函数相同。唯一的改动是Job中设置combiner类。

package hadooptest;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance();
        job.setJarByClass(MaxTemperature.class);
        job.setJobName("Max Temperature");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReduce.class);
        job.setCombinerClass(MaxTemperatureReduce.class);//

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}

    这个程序无需要修改便可以在一个完整的数据侦查上直接运行。这是MapReduce的优势:它可以根据数据量的大小和硬件规模进行扩展。这里有一个运行结果:在一个10节点EC2集群运行,程序执行时间只花了6分钟。