Hadoop权威指南-2.3 使用Hadoop来分析数据 - 高飞网

2.3 使用Hadoop来分析数据

2016-10-17 16:22:29.0

2.3.1 map和reduce

    MapReduce任务过程分为两个是阶段:map阶段和reduce阶段。每个阶段都以键值对作为输入和输出,其类型由程序员来选择。程序员还需要写两个函数:map函数和reduce函数。

    map阶段的输入是NCDC的原始数据。由于我们只对年份的气温属性感兴趣,所以只需取出这两个字段的数据。在本例中,map函数只是一个数据准备阶段,通过这种试来准备数据,使reducer函数能够继续对它进行处理:即找出每年的最高气温。map函数还是一个比较适合去除已损记录的地方:此处,我们筛掉缺失的可疑的或错误的气温数据。

    为了全面了解map的工作方式,我们考虑以下输入数据的示例数据,这些行以键/值对的方式作为map函数的输入:

    键(key)是文件中的行的偏移量,map函数并不需要这个信息,所以将其忽略。map函数的功能仅限于提取年份和气温信息(以粗体显示),并将它们作为输出(气温值已用整数表示):


    map函数的输出经由MapReduce 框架处理后,最后发送到reduce函数。这个处理过程过程基于键值对进行排序和分组。因此,这一示例中,reduce函数看到的是如下输入:

    每一年份后紧跟着一系列气温数据。reduce函数现在 要做的是遍历整个列表并从中找出最大的读数:

    这是最终的输出结果:每一年的全球最高气温记录。

    整个数据流如下图所示。在图的底部是Unix管线,用于模拟整个MapReduce的流程,部分内容将在讨论Hadoop Streaming时再次涉及。



2.3.2 Java MapReduce

    明白MapReduce程序的工作原理之后,下一步就是写代码实现它。我们需要三样东西:一个map函数、一个reduce函数和一 些运行作业的代码。map函数通过继承org.apache.hadoop.mapreduce.Mapper类来 实现,并实现其中的map方法。

由于历史原因,Hadoop存在着新旧两种API,如下图所示:mapred包中的为旧API,而mapreduce包中的为新API。所以写代码时要注意这一点。


以笔者使用的Hadoop 2.7.3来说,运行本书中的示例并不通过,因为书中的样例代码中,使用了extends MapReduceBase implements Mapper,而在2.7.3版本中,Mapper是一个基类,并不是接口,显示不能编译通过。因此,下面的代码列出的都是笔者真实运行过代码,可能与书本上并不一致。

map代码:

package hadooptest;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final int MISSING = 9999;

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String year = line.substring(15, 19);
        int airTemprature;
        if (line.charAt(87) == '+') {
            airTemprature = Integer.parseInt(line.substring(88, 92));
        } else {
            airTemprature = Integer.parseInt(line.substring(87, 92));
        }
        String quality = line.substring(92, 93);
        if (airTemprature != MISSING && quality.matches("[01459]")) {
            // 年为key,温度为值
            context.write(new Text(year), new IntWritable(airTemprature));
        }
    }
}

    可以看到,Mapper类的四个泛型,分别为LongWriteable,Text这两个表示输入数据的key-value类型,即行偏移量和一段文本;Text、IntWriteable,这两个表示输出结果的key-value类型,是年份和温度。

    如前所述,对于输入数据的键(key)即行偏移量不关心,因此这里只对value值(即line)进行了处理,提取出年份和温度值。

    提取并过滤掉非法的数据之后,使用Context类将输出的内容写入。这里把年份(用Text类型)和温度(用IntWriteable类型)写入。


类似方法来定义reduce函数。如下是找出最高气温的Reducer类

package hadooptest;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        int maxValue = Integer.MAX_VALUE;
        for (IntWritable value : values) {
            maxValue = Math.max(maxValue, value.get());
        }
        context.write(key, new IntWritable(maxValue));
    }

}

    同样,reduce函数也有四个形式参数类型用于指定输入和输出类型。reduce函数的输入类型必须匹配map函数的输出类型。即Text类型和IntWriteable类型。


    第三部分代码负责运行MapReduce作业,如下:

    这个用户程序在气象数据集中找出最高气温

package hadooptest;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxTemperature {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.Job对象指定作业规范
        Job job = Job.getInstance();
        job.setJarByClass(MaxTemperature.class);
        job.setJobName("Max Temperature");
        //2. 输入数据的路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //3. 
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReduce.class);
        //4. 输入和输出数据的路径
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}

    Job对象指定作业执行规范。可以用它来控制整个作业的运行。我们在Hadoop集群上运行这个作业时,要把代码打包成一个JAR文件(Hadoop集群上发布这个文件)。不必明确指定JAR文件的名称,在Job对象的setJarByClass()方法中,传递一个类即可,Hadoop利用这个类来查找含有它的JAR文件,进而找到相关的JAR文件。

    构造Job对象之后,需要指定输入和输出数据的路径。这个路径可以是单个文件、一个目录(此时,将目录下所有 文件当作输入)或符合特定文件模式的一系列文件。

    调用FileOutputFormat类中的静态方法setOutputPath()来指定输出路径(只能有一个输出路径)。这个方法指定的是reduce函数输出文件的写入目录。在运行作业前该目录是不应该在的,否则Hadoop会报错并拒绝运行作业。这种预防措施的目的是防止数据丢失(长时间运行的作业如果结果被意外覆盖,肯定是非常恼人的)。

    接口,通过setMapperClass和setReducerClass指定map类型和reduce类型。

    如果map的输出类型与reduce的输入类型不一致,可以通过setOutputKeyClass()和setOutputValueClass()控制map和reduce函数的输出类型。输入的类型通过InputFormat类控制,示例中没有设置,因为使用的是默认的TextInputFormat(文本输入格式)。

    在设置定义map和reduce函数的类之后,可以开始运行作业。Job中的waitForCompletion方法提交作业并等待执行完成。该方法中的布尔参数是个详细标识,所以作业会把进度写到控制台。


2.3.2.1 运行测试

    写好MapReduce作业之后,通常要拿一个数据集进行测试以排除代码问题。首选,以独立(本机)模式安装Hadoop,详细说明参见附录A。在这种模式下,Hadoop在本地文件系统上运行作业程序。然后使用本书网站上的指令安装和编译示例。

    笔者在这省去了打包过程,直接把打好的包(hadooptest-0.0.1-SNAPSHOT.jar)和需要的示例文本资源(https://github.com/tomwhite/hadoop-book/blob/master/input/ncdc/sample.txt)传到hadoop所在机器上。


    由于运行作业要基于hdfs文件系统,因此,第一步要将sample.txt文件传到hdfs中。

[hadoop@localhost example]$ hdfs dfs -put input /
[hadoop@localhost example]$ hdfs dfs -ls /input/ncdc
Found 1 items
-rw-r--r--   1 hadoop supergroup        533 2016-10-19 18:34 /input/ncdc/sample.txt

    确保文件上传以后,就可以运行hadoop作业了:

% export HADOOP_CLASSPATH=hadooptest-0.0.1-SNAPSHOT.jar
% [hadoop@localhost example]$ hadoop hadooptest.MaxTemperature /input/ncdc/sample.txt /ouput

运行后会打印如下的日志:

16/10/19 18:38:04 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/10/19 18:38:04 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/10/19 18:38:05 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/10/19 18:38:05 INFO input.FileInputFormat: Total input paths to process : 1
16/10/19 18:38:05 INFO mapreduce.JobSubmitter: number of splits:1
16/10/19 18:38:06 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local319556793_0001
16/10/19 18:38:06 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/10/19 18:38:06 INFO mapreduce.Job: Running job: job_local319556793_0001
16/10/19 18:38:06 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/10/19 18:38:06 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/10/19 18:38:06 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
16/10/19 18:38:07 INFO mapred.LocalJobRunner: Waiting for map tasks
16/10/19 18:38:07 INFO mapred.LocalJobRunner: Starting task: attempt_local319556793_0001_m_000000_0
16/10/19 18:38:07 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/10/19 18:38:07 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/10/19 18:38:07 INFO mapred.MapTask: Processing split: hdfs://10.10.93.233:50040/input/ncdc/sample.txt:0+533
16/10/19 18:38:07 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/10/19 18:38:07 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/10/19 18:38:07 INFO mapred.MapTask: soft limit at 83886080
16/10/19 18:38:07 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/10/19 18:38:07 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/10/19 18:38:07 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/10/19 18:38:07 INFO mapred.LocalJobRunner: 
16/10/19 18:38:07 INFO mapred.MapTask: Starting flush of map output
16/10/19 18:38:07 INFO mapred.MapTask: Spilling map output
16/10/19 18:38:07 INFO mapred.MapTask: bufstart = 0; bufend = 45; bufvoid = 104857600
16/10/19 18:38:07 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214380(104857520); length = 17/6553600
16/10/19 18:38:07 INFO mapred.MapTask: Finished spill 0
16/10/19 18:38:07 INFO mapred.Task: Task:attempt_local319556793_0001_m_000000_0 is done. And is in the process of committing
16/10/19 18:38:07 INFO mapreduce.Job: Job job_local319556793_0001 running in uber mode : false
16/10/19 18:38:07 INFO mapreduce.Job:  map 0% reduce 0%
16/10/19 18:38:07 INFO mapred.LocalJobRunner: map
16/10/19 18:38:07 INFO mapred.Task: Task 'attempt_local319556793_0001_m_000000_0' done.
16/10/19 18:38:07 INFO mapred.LocalJobRunner: Finishing task: attempt_local319556793_0001_m_000000_0
16/10/19 18:38:07 INFO mapred.LocalJobRunner: map task executor complete.
16/10/19 18:38:07 INFO mapred.LocalJobRunner: Waiting for reduce tasks
16/10/19 18:38:07 INFO mapred.LocalJobRunner: Starting task: attempt_local319556793_0001_r_000000_0
16/10/19 18:38:07 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/10/19 18:38:07 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/10/19 18:38:07 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@128918ba
16/10/19 18:38:07 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=363285696, maxSingleShuffleLimit=90821424, mergeThreshold=239768576, ioSortFactor=10, memToMemMergeOutputsThreshold=10
16/10/19 18:38:08 INFO reduce.EventFetcher: attempt_local319556793_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
16/10/19 18:38:08 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local319556793_0001_m_000000_0 decomp: 57 len: 61 to MEMORY
16/10/19 18:38:08 INFO reduce.InMemoryMapOutput: Read 57 bytes from map-output for attempt_local319556793_0001_m_000000_0

    此时再查看hdfs中,就多了一个输入/output/,如下:

[hadoop@localhost example]$ hdfs dfs -ls /output
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2016-10-19 18:40 /output/_SUCCESS
-rw-r--r--   1 hadoop supergroup         32 2016-10-19 18:40 /output/part-r-00000

  查看文件part-r-00000就可以到结果了:

[hadoop@localhost example]$ hdfs dfs -cat /output/part-r-00000
1949	2147483647
1950	2147483647


2.3.2.2 旧的和新的Java MapReduce API

    前一小节中使用的Java  MapReduce API率先在Hadoop0.20.0中发布。本书的前两个版本基于0.20发行版本的,一直使用的是旧的API。除了极少几个地方,本书中将新的API作为主要使用的API。

    新旧API之间有如下几个明显的区别:

  1. 新API倾向于使用抽象类而不是接口,因为更有利于扩展。这意味着不用修改类的实现,即可在抽象类中加一个方法(即默认的实现)。在旧的API中使用Mapper和Reducer接口,而在新的API中使用抽象类。
  2. 新API放在org.apache.hadoop.mapreduce包中,旧的API放在org.apache.hadoop.mapred包中。
  3. 新API充分使用上下文对象,使用用户代码能与MapReduce系统通信。例如,新的Context基本统一了旧API中的JobConf、OutputCollector和Reporter的功能
  4. 键/值对记录在这两类API中都被推给mapper和reducer,但除此之外,新的API通过重写run()方法允许mapper和reducer控制执行流程。例如,既可批处理记录,也可以在处理完所有的记录之前停止。旧的API中可以通过MapRunnable类在mapper中实现上述功能,但在reducer中没有对待的实现。
  5. 新的API中作业控制由Job类实现,而非旧的API中的JobClient类,新的API中删除了JobClient类
  6. 新增的API实现了配置的统一。旧API通过一个特殊的JobConf对象配置作业,该对象是Hadoop配置对象的一个扩展。在新的API中,作业的配置由Configuration来完成。
  7. 输出文件的命名方式稍有不同。在旧API中map和reduce的输出被统一命名为part-nnum,但在新的API中map的输出文件名为part-m-nnnn,而reduce的输出文件名为part-r-nnnn(其中nnnn是从0开始表示 分块序号的整数)。
  8. 新的API中的用户重载函数 被声明为抛出异常java.lang.InterruptedException。这意味着可以用代码来实现中断响应,从而使该框架在必要时可以优雅地取消长时间运行的作业
  9. 在新的API中,reduce()传递的值是java.lang.Iterable类型,而非java.lang.Iterator类型。这一改变使我们容易通过java的for-each循环结构来迭代这些值。

以下用表格重要总结一下新旧API的不同



新API旧API
基类倾向于使用抽象类使用接口
所在包org.apache.hadoop.mapreduce
org.apache.hadoop.mapred
与架构的通信充分使用上下文对象
控制执行流程通过重要run方法MapRunnable中可,而在Reducer中没有该功能
作业的实现Job类JobClient
配置统一性实现了统一
输出文件名命名方式part-m-nnnn/part-r-nnnnpart-nnum
是否可中断可以不可
是否可for-each可以不可