- 浏览: 1777085 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
奔跑的小牛:
例子都打不开
如何使用JVisualVM进行性能分析 -
蜗牛coder:
好东西[color=blue][/color]
Lucene学习:全文检索的基本原理 -
lovesunweina:
不在haoop中是在linux系统中,映射IP的时候,不能使用 ...
java.io.IOException: Incomplete HDFS URI, no host -
evening_xxxy:
挺好的, 谢谢分享
如何利用 JConsole观察分析Java程序的运行,进行排错调优 -
di1984HIT:
学习了~~~
ant使用ssh和linux交互 如:上传文件
Hadoop 其实并非一个单纯用于存储的分布式文件系统,而是一个被设计用来在由普通硬件设备组成的大型集群上执行分布式应用的框架。 Hadoop 包含两个部分:一个分布式文件系统 HDFS (Hadoop Distributed File System),和一个Map-Reduce实现。
研究hadoop,从nutch入手是比较好的选择,分布式文件系统就不说了,下面说说MapReduce产生Job中设置的输入输出,一般new一个Job会这样设置 输入输出路径:
FileInputFormat.addInputPath(job, in); FileOutputFormat.setOutputPath(job, out);
从方法名称上,你可能会发现add、set的前缀,没错,输入可以添加多个路径,输出只能设置一个路径。
设置输入、输出格式:
job.setInputFormat(SequenceFileInputFormat.class); job.setOutputFormat(MapFileOutputFormat.class);
输出格式
看过nutch的同志,会发现nutch的一个精彩实现,就是实现OutputFormat接口的FetcherOutputFormat类,我们来看看怎么个回事。
接口 :org.apache.hadoop.mapred.OutputFormat<K , V >
public interface OutputFormat<K, V> { RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,String name, Progressable progress) throws IOException; void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException; }
checkOutputSpecs :检查job的输出路径是否存在,如果存在则抛出异常(IOException)。我这里的版本是0.19.2,还没有override的功能,可能后面会支持。
getRecordWriter :把输出键值对 output <key, value> 写入到输出路径中。
mapred下面的实现有三个,如下图:
基类FileOutputFormat :org.apache.hadoop.mapred.FileOutputFormat
public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> { public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,JobConf job, String name,Progressable progress) throws IOException; public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, InvalidJobConfException, IOException { // Ensure that the output directory is set and not already there Path outDir = getOutputPath(job); if (outDir == null && job.getNumReduceTasks() != 0) { throw new InvalidJobConfException("Output directory not set in JobConf."); } if (outDir != null) { FileSystem fs = outDir.getFileSystem(job); // normalize the output directory outDir = fs.makeQualified(outDir); setOutputPath(job, outDir); // check its existence if (fs.exists(outDir)) { throw new FileAlreadyExistsException("Output directory " + outDir + " already exists"); } } }
这是个抽象类,实现了检查输入路径是否存在的方法,具体输出方式写成抽象方法预留给了子类。
子类见下图:
子类MapFileOutputFormat :org.apache.hadoop.mapred.MapFileOutputFormat
public class MapFileOutputFormat extends FileOutputFormat<WritableComparable, Writable> { public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { // get the path of the temporary output file Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); CompressionCodec codec = null; CompressionType compressionType = CompressionType.NONE; if (getCompressOutput(job)) { // find the kind of compression to do compressionType = SequenceFileOutputFormat.getOutputCompressionType(job); // find the right codec Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, DefaultCodec.class); codec = ReflectionUtils.newInstance(codecClass, job); } // ignore the progress parameter, since MapFile is local final MapFile.Writer out = new MapFile.Writer(job, fs, file.toString(), job.getOutputKeyClass().asSubclass(WritableComparable.class), job.getOutputValueClass().asSubclass(Writable.class), compressionType, codec, progress); return new RecordWriter<WritableComparable, Writable>() { public void write(WritableComparable key, Writable value) throws IOException { out.append(key, value); } public void close(Reporter reporter) throws IOException { out.close();} }; } }
关键点在于获取分布式文件输出句柄MapFile.Writer,完成输出任务后会关闭输出。每个实现都有特定用途,都需要弄清楚,在这里就不再一一介绍了。
上面是hadoop自己的实现,在具体的编程过程中,我们肯定会有自己的实现去定义输出格式。上面也讲到了job只能设置输出路径,不能添加多个输出路径,那么有什么解决措施呢?来看看nutch中的精彩实现,会给我们启示:
自己的实现: org.apache.nutch.parse.ParseOutputFormat
public class ParseOutputFormat implements OutputFormat<Text, Parse> { //这里不是检查输出路径,是检查数据路径下的子路径,改变了接口中的定义 public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException { Path out = FileOutputFormat.getOutputPath(job); if (fs.exists(new Path(out, CrawlDatum.PARSE_DIR_NAME))) throw new IOException("Segment already parsed!"); } //下面获取了三个输入句柄,分别向三个路径中输出键值对 public RecordWriter<Text, Parse> getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress) throws IOException { ...... Path text = new Path(new Path(out, ParseText.DIR_NAME), name); // 一个输出路径 Path data = new Path(new Path(out, ParseData.DIR_NAME), name); //两个输出路径 Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);//三个输出路径 //一个写入 final MapFile.Writer textOut = new MapFile.Writer(job, fs, text.toString(), Text.class, ParseText.class, CompressionType.RECORD, progress); //第二个写入 final MapFile.Writer dataOut = new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class, compType, progress); //第三个写入 final SequenceFile.Writer crawlOut = SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class, compType, progress); return new RecordWriter<Text, Parse>() { public void write(Text key, Parse parse)throws IOException { ...... crawlOut.append(key, d); ....... crawlOut.append(new Text(newUrl), newDatum); ...... crawlOut.append(key, adjust); ...... dataOut.append(key, parseData); ...... crawlOut.append(key, datum); } } //关闭三个句柄 public void close(Reporter reporter) throws IOException { textOut.close(); dataOut.close(); crawlOut.close(); } }; } }
ParseOutputFormat实现了OutputFormat接口,改变了job中设置的输出路径,并且把不同的内容输出到不同的路径,从而达到了多个输出(并且根据逻辑划分)。这个我觉得值得借鉴。
关于输入以及输入输出的各个实现都有什么用处,以后有机会再来写写。本人现在还是一知半解,见笑了。
发表评论
-
HBase配置LZO压缩
2011-07-10 22:40 6115系统: gentoo HDFS: hadoop:hado ... -
HBase RegionServer 退出 ( ZooKeeper session expired)
2011-04-23 08:32 9014RegionServer 由于 ZooKeeper sessi ... -
HBase迁移数据方案1(两个集群不能通信)
2011-03-30 18:23 3814前一篇文章里面介绍了 两个可以直接通信的集群之间很容易拷贝数据 ... -
HBase如何迁移数据
2011-03-10 13:42 6460HBase如何迁移数据?这里有个方案:http://blog. ... -
HBase如何存取多个版本的值
2011-03-07 16:11 27174HBase如何存取多个版本 ... -
HBase简介(很好的梳理资料)
2011-01-30 10:18 130581一、 简介 history s ... -
Google_三大论文中文版(Bigtable、 GFS、 Google MapReduce)
2010-11-28 16:30 22144做个中文版下载源: http://dl.iteye.c ... -
hadoop主节点(NameNode)备份策略以及恢复方法
2010-11-11 19:35 27726一、dits和fsimage 首先要提到 ... -
HRegionServer: ZooKeeper session expired
2010-11-01 14:21 11407Hbase不稳定,分析日志 ... -
Bad connect ack with firstBadLink
2010-10-25 13:20 8283hbase报的错误,经过分析是Hadoop不能写入数据了。可恶 ... -
hbase0.20.支持多个主节点容灾切换功能(只激活当前某个节点,其他节点备份)
2010-09-09 14:53 2835http://wiki.apache.org/hadoop/H ... -
java.io.IOException: Incomplete HDFS URI, no host
2010-09-07 08:31 16123ERROR org.apache.hadoop.hdfs.se ... -
升级hadoop0.20.2到hadoop-0.21.0
2010-09-05 11:52 7719按照新的文档来 更新配置: http://hadoop.apa ... -
hadoop-hdfs启动又自动退出的问题
2010-05-20 10:45 6059hadoop-hdfs启动又自动退出的问题,折腾了我1天时间啊 ... -
在windows平台下Eclipse调试Hadoop/Nutch
2010-04-29 14:34 3263即让碰到这个问题说明 准备工作都做好了,软件包,环境什么的这里 ... -
Hadoop运行mapreduce实例时,抛出错误 All datanodes xxx.xxx.xxx.xxx:xxx are bad. Aborting…
2010-04-29 14:26 6366Hadoop运行mapreduce实例时,抛出错误 All d ... -
cygwin 添加用户
2010-04-13 17:48 7347http://hi.baidu.com/skychen1900 ... -
nutch总体输入输出流程图解析
2010-04-12 16:58 2426附件里面有word文档,请下 ... -
nutch分布式搭建
2010-04-06 17:54 6788如何在eclipse中跑nutch :http://jiaj ... -
解析Nutch插件系统
2010-03-31 16:31 6488nutch系统架构的一个亮点就是插件,借鉴这个架构我们 ...
相关推荐
#hadoop-map-reduce-job:网址: :
Hadoop Map-reduce Job Scheduler Resources Hadoop, Why? 数据太多了,需要能存储、快速分析Pb级数据集的系统 单机的存储、IO、内存、CPU有限,需要可扩展的集群 使用门槛低,数据分析是个庞杂的问题,MPI太复杂 ...
18/05/25 19:51:49 INFO mapreduce.Job: map 0% reduce 0% 18/05/25 19:52:20 INFO mapreduce.Job: map 100% reduce 0% 18/05/25 19:52:29 INFO mapreduce.Job: map 100% reduce 100% 18/05/25 19:52:31 INFO ...
hadoop 1.2 api 伪中文版。支持即时查询,高级查询。方便编码学习。 大数据炙手可热!hadoop是一个大数据分布式系统基础架构,由...虽然Hadoop框架是用JavaTM实现的,但Map/Reduce应用程序则不一定要用 Java来写 。
修改mapper和reducer数量,如何使用combiner,什么时候该选择哪个writeable等。资料里很详细说明了。
依次选择open " "perspective,other,Map、Reduce,如下图所示: " " " "(3)设置Map/Reduce location,选择Map/Reduce locations,new hadoop " "location,将其中的内容设置成下图所示的内容: " " " "设置...
Map是把输入Input分解成中间的Key/Value对,Reduce把Key/Value合成最终输出Output。这两个函数由程序员提供给系统,下层设施把Map和Reduce操作分布在集群上运行,并把结果存储在GFS上。 3、BigTable。一个大型的...
Hadoop集群安装的详细说明文档, 實作七: Hadoop 叢集安裝 前言 您手邊有兩台電腦,假設剛剛操作的電腦為"主機一" ,另一台則為"主機二" 。則稍後的環境如下 • 管理Data的身份 管理Job的身份 "主機一" namenode ...
017 查看Hadoop 日志以及日志的格式和命名组成 018 Hadoop 守护进程服务三种启动停止方式 019 测试环境(HDFS Shell基本命令和运行WordCount程序) 020 结合WordCount实例讲解Hadoop的数据存储和数据计算 021 Hadoop ...
org.apache.hadoop.mapreduce.lib.reduce org.apache.hadoop.mapreduce.security org.apache.hadoop.mapreduce.server.jobtracker org.apache.hadoop.mapreduce.server.tasktracker org.apache.hadoop.mapreduce...
注意这里的作业在运行时,需要等所有的Map任务完成时才能运行Reduce任务。 配置如下: mapred.reduce.slowstart.completed.maps : 1.0 执行命令: hadoop jar hadoop-wm-1.0.0-job.jar input-video watermark-image ...
You will also learn about optimizing map and reduce tasks by using Combiners and compression. The book ends with best practices and recommendations on how to use your Hadoop cluster optimally. What...
hadoop权威指南第三版(英文版)。 Foreword . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xiii Preface . . . . . . ....
Map and Reduce Java MapReduce Scaling Out Data Flow Combiner Functions Running a Distributed MapReduce Job Hadoop Streaming Ruby Python import import import import import import org.apache.hadoop.fs...
Map和 Reduce的设计思路(含 Map、Reduce阶段的 K、V类型) 基本要求与排序 因为两者代码具有关联性,故放在一起说。 首先在基本要求中,Map 我们对于输入的文件每句进行切割,将单词与文件名作为(text)key,...
Dive into map/reduce mechanics and build your first map/reduce job in Python Understand how to run chains of map/reduce jobs in the form of Pig scripts Use a real-world dataset—baseball performance ...
web调用hadoop集群的改进版,修改了job中map和reduce进度的获取方式;修改了页面跳转,添加了servlet,servlet处理数据,jsp专注展现;解压后有两个文件,一个是web工程直接导入myeclipse,另外一个是jar放在hadoop...
用户只要继承MapReduceBase,提供分别实现Map和Reduce的两个类,并注册Job即可自动分布式运行。 目前Release版本是0.20.203.0。还不成熟,但是已经集群规模已经可以达到4000个节点,是由Yahoo!实验室中构建的。下面...
Hadoop 2 (YARN API) 中带有 Map Reduce 示例的存储库 目前的例子: 如何执行示例? 我假设你克隆了这个存储库,你用 netbeans 编译并构建了一个 jar 文件,并且你已经安装了 Hadoop 2.X。 如果之前没问题,则应...
Master将M份Job分给Idle状态的M个worker来处理; 对于输入中的每一个, value> pair 进行Map操作,将中间结果Buffer在Memory里; 定期的(或者根据内存状态),将Buffer中的中间信息Dump到本地磁盘上,并且把文件...