`

解析hadoop框架下的Map-Reduce job的输出格式的实现

阅读更多

      Hadoop 其实并非一个单纯用于存储的分布式文件系统,而是一个被设计用来在由普通硬件设备组成的大型集群上执行分布式应用的框架。 Hadoop 包含两个部分:一个分布式文件系统 HDFS (Hadoop Distributed File System),和一个Map-Reduce实现。

 

    研究hadoop,从nutch入手是比较好的选择,分布式文件系统就不说了,下面说说MapReduce产生Job中设置的输入输出,一般new一个Job会这样设置 输入输出路径:

FileInputFormat.addInputPath(job, in);

FileOutputFormat.setOutputPath(job, out);

 

从方法名称上,你可能会发现add、set的前缀,没错,输入可以添加多个路径,输出只能设置一个路径。

设置输入、输出格式:

job.setInputFormat(SequenceFileInputFormat.class);

job.setOutputFormat(MapFileOutputFormat.class);

 

输出格式

看过nutch的同志,会发现nutch的一个精彩实现,就是实现OutputFormat接口的FetcherOutputFormat类,我们来看看怎么个回事。

 

接口 :org.apache.hadoop.mapred.OutputFormat<K , V >

public interface OutputFormat<K, V> {

RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,String name, Progressable progress)
throws IOException;

void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;

}

 checkOutputSpecs :检查job的输出路径是否存在,如果存在则抛出异常(IOException)。我这里的版本是0.19.2,还没有override的功能,可能后面会支持。

 getRecordWriter     :把输出键值对 output <key, value> 写入到输出路径中。

 

mapred下面的实现有三个,如下图:

 

基类FileOutputFormat :org.apache.hadoop.mapred.FileOutputFormat

public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {

public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,JobConf job, String name,Progressable progress) 
throws IOException;

public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, 
InvalidJobConfException, IOException {

    // Ensure that the output directory is set and not already there
    Path outDir = getOutputPath(job);
    if (outDir == null && job.getNumReduceTasks() != 0) {
      throw new InvalidJobConfException("Output directory not set in JobConf.");
    }
    if (outDir != null) {
      FileSystem fs = outDir.getFileSystem(job);
      // normalize the output directory
      outDir = fs.makeQualified(outDir);
      setOutputPath(job, outDir);
      // check its existence
      if (fs.exists(outDir)) {
        throw new FileAlreadyExistsException("Output directory " + outDir +  " already exists");
      }
    }
  }

 这是个抽象类,实现了检查输入路径是否存在的方法,具体输出方式写成抽象方法预留给了子类。

 子类见下图:

 

子类MapFileOutputFormat :org.apache.hadoop.mapred.MapFileOutputFormat

public class MapFileOutputFormat 
extends FileOutputFormat<WritableComparable, Writable> {

  public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
                                      String name, Progressable progress)
    throws IOException {
    // get the path of the temporary output file 
    Path file = FileOutputFormat.getTaskOutputPath(job, name);
    
    FileSystem fs = file.getFileSystem(job);
    CompressionCodec codec = null;
    CompressionType compressionType = CompressionType.NONE;
    if (getCompressOutput(job)) {
      // find the kind of compression to do
      compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);

      // find the right codec
      Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
	  DefaultCodec.class);
      codec = ReflectionUtils.newInstance(codecClass, job);
    }
    
    // ignore the progress parameter, since MapFile is local
    final MapFile.Writer out =
      new MapFile.Writer(job, fs, file.toString(),
                         job.getOutputKeyClass().asSubclass(WritableComparable.class),
                         job.getOutputValueClass().asSubclass(Writable.class),
                         compressionType, codec,
                         progress);

    return new RecordWriter<WritableComparable, Writable>() {

        public void write(WritableComparable key, Writable value)
          throws IOException {

          out.append(key, value);
        }

        public void close(Reporter reporter) throws IOException { out.close();}
      };
  }
}

   关键点在于获取分布式文件输出句柄MapFile.Writer,完成输出任务后会关闭输出。每个实现都有特定用途,都需要弄清楚,在这里就不再一一介绍了。

 

    上面是hadoop自己的实现,在具体的编程过程中,我们肯定会有自己的实现去定义输出格式。上面也讲到了job只能设置输出路径,不能添加多个输出路径,那么有什么解决措施呢?来看看nutch中的精彩实现,会给我们启示:

 

自己的实现: org.apache.nutch.parse.ParseOutputFormat

public class ParseOutputFormat implements OutputFormat<Text, Parse> {

//这里不是检查输出路径,是检查数据路径下的子路径,改变了接口中的定义
public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
    Path out = FileOutputFormat.getOutputPath(job);
    if (fs.exists(new Path(out, CrawlDatum.PARSE_DIR_NAME)))
      throw new IOException("Segment already parsed!");
  }

//下面获取了三个输入句柄,分别向三个路径中输出键值对
public RecordWriter<Text, Parse> getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress)
 throws IOException {

   ......
    Path text = new Path(new Path(out, ParseText.DIR_NAME), name); // 一个输出路径
    Path data = new Path(new Path(out, ParseData.DIR_NAME), name); //两个输出路径
 Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);//三个输出路径
     
//一个写入
    final MapFile.Writer textOut =
      new MapFile.Writer(job, fs, text.toString(), Text.class, ParseText.class,
          CompressionType.RECORD, progress);
    
//第二个写入
    final MapFile.Writer dataOut =
      new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class,
          compType, progress);
    
//第三个写入
    final SequenceFile.Writer crawlOut =
      SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class,
          compType, progress);
    
    return new RecordWriter<Text, Parse>() {

        public void write(Text key, Parse parse)throws IOException {

......
              crawlOut.append(key, d);
.......
             crawlOut.append(new Text(newUrl), newDatum);
......
             crawlOut.append(key, adjust);
......
              dataOut.append(key, parseData);
......
              crawlOut.append(key, datum);

          }
        }
//关闭三个句柄
 public void close(Reporter reporter) throws IOException {
          textOut.close();
          dataOut.close();
          crawlOut.close();
        }
      };
 }
}

   ParseOutputFormat实现了OutputFormat接口,改变了job中设置的输出路径,并且把不同的内容输出到不同的路径,从而达到了多个输出(并且根据逻辑划分)。这个我觉得值得借鉴。

 

   关于输入以及输入输出的各个实现都有什么用处,以后有机会再来写写。本人现在还是一知半解,见笑了。

  • 大小: 2.9 KB
  • 大小: 4.5 KB
分享到:
评论

相关推荐

    hadoop-map-reduce-demo

    #hadoop-map-reduce-job:网址: :

    大数据云计算技术 优酷网Hadoop及Mapreduce入门教程(共35页).pptx

    Hadoop Map-reduce Job Scheduler Resources Hadoop, Why? 数据太多了,需要能存储、快速分析Pb级数据集的系统 单机的存储、IO、内存、CPU有限,需要可扩展的集群 使用门槛低,数据分析是个庞杂的问题,MPI太复杂 ...

    hadoop 2.7.6 eclipse插件

    18/05/25 19:51:49 INFO mapreduce.Job: map 0% reduce 0% 18/05/25 19:52:20 INFO mapreduce.Job: map 100% reduce 0% 18/05/25 19:52:29 INFO mapreduce.Job: map 100% reduce 100% 18/05/25 19:52:31 INFO ...

    hadoop 1.2.1 api 最新chm 伪中文版

    hadoop 1.2 api 伪中文版。支持即时查询,高级查询。方便编码学习。 大数据炙手可热!hadoop是一个大数据分布式系统基础架构,由...虽然Hadoop框架是用JavaTM实现的,但Map/Reduce应用程序则不一定要用 Java来写 。

    提高hadoop的mapreduce job效率笔记

    修改mapper和reducer数量,如何使用combiner,什么时候该选择哪个writeable等。资料里很详细说明了。

    基于Hadoop的数据分析.doc

    依次选择open " "perspective,other,Map、Reduce,如下图所示: " " " "(3)设置Map/Reduce location,选择Map/Reduce locations,new hadoop " "location,将其中的内容设置成下图所示的内容: " " " "设置...

    Apress - Pro Hadoop

    Map是把输入Input分解成中间的Key/Value对,Reduce把Key/Value合成最终输出Output。这两个函数由程序员提供给系统,下层设施把Map和Reduce操作分布在集群上运行,并把结果存储在GFS上。  3、BigTable。一个大型的...

    Hadoop集群安装

    Hadoop集群安装的详细说明文档, 實作七: Hadoop 叢集安裝 前言 您手邊有兩台電腦,假設剛剛操作的電腦為"主機一" ,另一台則為"主機二" 。則稍後的環境如下 • 管理Data的身份 管理Job的身份 "主機一" namenode ...

    Hadoop从入门到上手企业开发

    017 查看Hadoop 日志以及日志的格式和命名组成 018 Hadoop 守护进程服务三种启动停止方式 019 测试环境(HDFS Shell基本命令和运行WordCount程序) 020 结合WordCount实例讲解Hadoop的数据存储和数据计算 021 Hadoop ...

    hadoop0.23.9离线api

    org.apache.hadoop.mapreduce.lib.reduce org.apache.hadoop.mapreduce.security org.apache.hadoop.mapreduce.server.jobtracker org.apache.hadoop.mapreduce.server.tasktracker org.apache.hadoop.mapreduce...

    hadoop-wm:基于Hadoop视频水印应用

    注意这里的作业在运行时,需要等所有的Map任务完成时才能运行Reduce任务。 配置如下: mapred.reduce.slowstart.completed.maps : 1.0 执行命令: hadoop jar hadoop-wm-1.0.0-job.jar input-video watermark-image ...

    Optimizing Hadoop for MapReduce(PACKT,2014)

    You will also learn about optimizing map and reduce tasks by using Combiners and compression. The book ends with best practices and recommendations on how to use your Hadoop cluster optimally. What...

    hadoop 权威指南(第三版)英文版

    hadoop权威指南第三版(英文版)。 Foreword . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xiii Preface . . . . . . ....

    hadoop_the_definitive_guide_3nd_edition.pdf

    Map and Reduce Java MapReduce Scaling Out Data Flow Combiner Functions Running a Distributed MapReduce Job Hadoop Streaming Ruby Python import import import import import import org.apache.hadoop.fs...

    hadoop倒排索引实现 完整代码+报告

    Map和 Reduce的设计思路(含 Map、Reduce阶段的 K、V类型) 基本要求与排序 因为两者代码具有关联性,故放在一起说。 首先在基本要求中,Map 我们对于输入的文件每句进行切割,将单词与文件名作为(text)key,...

    Big.Data.for.Chimps.A.Guide.to.Massive-Scale.Data.Processing.in.Practice.epub

    Dive into map/reduce mechanics and build your first map/reduce job in Python Understand how to run chains of map/reduce jobs in the form of Pig scripts Use a real-world dataset—baseball performance ...

    web调用hadoop

    web调用hadoop集群的改进版,修改了job中map和reduce进度的获取方式;修改了页面跳转,添加了servlet,servlet处理数据,jsp专注展现;解压后有两个文件,一个是web工程直接导入myeclipse,另外一个是jar放在hadoop...

    Hadoop权威指南(第2版).

    用户只要继承MapReduceBase,提供分别实现Map和Reduce的两个类,并注册Job即可自动分布式运行。 目前Release版本是0.20.203.0。还不成熟,但是已经集群规模已经可以达到4000个节点,是由Yahoo!实验室中构建的。下面...

    YarnExamples:Hadoop 2 (YARN API) 中带有 Map Reduce 示例的存储库

    Hadoop 2 (YARN API) 中带有 Map Reduce 示例的存储库 目前的例子: 如何执行示例? 我假设你克隆了这个存储库,你用 netbeans 编译并构建了一个 jar 文件,并且你已经安装了 Hadoop 2.X。 如果之前没问题,则应...

    hadoop5hadoop5

    Master将M份Job分给Idle状态的M个worker来处理; 对于输入中的每一个, value&gt; pair 进行Map操作,将中间结果Buffer在Memory里; 定期的(或者根据内存状态),将Buffer中的中间信息Dump到本地磁盘上,并且把文件...

Global site tag (gtag.js) - Google Analytics