- 浏览: 5422 次
文章分类
最新评论
MapReduce 顺序组合, 迭代式,组合式,链式
1、顺序组合式
顺序组合式就是按照指定顺序执行任务如:mapreduce1 --> mapreduce2 --> mapreduce3
即:mapreduce1的输出是mapreduce2的输入,mapreduce2的输出式mapreduce3的输入
代码片段如下:
String inPath1 = "hdfs://hadoop0:9000/user/root/3D/"; String outPath1 = "hdfs://hadoop0:9000/user/root/3DZout/"; String outPath2 = "hdfs://hadoop0:9000/user/root/3DZout2/"; String outPath3 = "hdfs://hadoop0:9000/user/root/3DZout3/"; // job1配置 Job job1 = Job.getInstance(conf); job1.setJarByClass(Mode.class); job1.setMapperClass(Map1.class); job1.setReducerClass(Reduce1.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(IntWritable.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job1, new Path(inPath1)); FileOutputFormat.setOutputPath(job1, new Path(outPath1)); job1.waitForCompletion(true); // job2配置 Job job2 = Job.getInstance(conf); job2.setJarByClass(Mode.class); job2.setMapperClass(Map2.class); job2.setReducerClass(Reduce2.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(IntWritable.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job2, new Path(inPath1)); FileOutputFormat.setOutputPath(job2, new Path(outPath2)); job2.waitForCompletion(true); // job3配置 Job job3 = Job.getInstance(conf); job3.setJarByClass(Mode.class); job3.setMapperClass(Map3.class); job3.setReducerClass(Reduce3.class); job3.setMapOutputKeyClass(Text.class); job3.setMapOutputValueClass(IntWritable.class); job3.setOutputKeyClass(Text.class); job3.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job3, new Path(outPath2)); FileOutputFormat.setOutputPath(job3, new Path(outPath3)); job3.waitForCompletion(true);
子任务作业配置代码运行后,将按照顺序逐个执行每个子任务作业。由于后一个子任务需要使用前一个子任务的输出数据,因此,每一个子任务
都需要等前一个子任务执行执行完毕后才允许执行,这是通过job.waitForCompletion(true)方法加以保证的。
2、迭代组合式
迭代也可以理解为for循环或while循环,当满足某些条件时,循环结束
mapreduce的迭代算法正在研究中,后续提供完整源码....
代码如下:
3、复杂的依赖组合式
处理复杂的要求的时候,有时候一个mapreduce程序完成不了,往往需要多个mapreduce程序 这个时候就牵扯到各个任务之间的依赖关系,
所谓依赖就是一个M/R job的处理结果是另外一个M/R的输入,以此类推,
这里的顺序是 job1 和 job2 单独执行, job3依赖job1和job2执行后的结果
代码如下:
package com.hadoop.mapreduce; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Mode { // 第一个Job public static class Map1 extends Mapper<Object, Text, Text, IntWritable>{ Text word = new Text(); @Override protected void map(Object key, Text value,Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()){ word.set(st.nextToken()); context.write(word, new IntWritable(1)); } } } public static class Reduce1 extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val : values){ sum += val.get(); } result.set(sum); context.write(key, result); } } // 第二个Job public static class Map2 extends Mapper<Object, Text, Text, IntWritable>{ Text word = new Text(); @Override protected void map(Object key, Text value,Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()){ word.set(st.nextToken()); context.write(word, new IntWritable(1)); } } } public static class Reduce2 extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val : values){ sum += val.get(); } result.set(sum); context.write(key, result); } } // 第三个Job public static class Map3 extends Mapper<Object, Text, Text, IntWritable>{ Text word = new Text(); @Override protected void map(Object key, Text value,Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()){ word.set(st.nextToken()); context.write(word, new IntWritable(1)); } } } public static class Reduce3 extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val : values){ sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws IOException{ String inPath1 = "hdfs://hadoop0:9000/user/root/3D/"; String outPath1 = "hdfs://hadoop0:9000/user/root/3DZout/"; String outPath2 = "hdfs://hadoop0:9000/user/root/3DZout2/"; String outPath3 = "hdfs://hadoop0:9000/user/root/3DZout3/"; String[] inOut = {inPath1, outPath1}; Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, inOut).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } // 判断输出路径是否存在,如存在先删除 FileSystem hdfs = FileSystem.get(conf); Path findFile = new Path(outPath1); boolean isExists = hdfs.exists(findFile); if(isExists){ hdfs.delete(findFile, true); } if(hdfs.exists(new Path(outPath2))){ hdfs.delete(new Path(outPath2), true); } if(hdfs.exists(new Path(outPath3))){ hdfs.delete(new Path(outPath3), true); } // job1配置 Job job1 = Job.getInstance(conf); job1.setJarByClass(Mode.class); job1.setMapperClass(Map1.class); job1.setReducerClass(Reduce1.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(IntWritable.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job1, new Path(inPath1)); FileOutputFormat.setOutputPath(job1, new Path(outPath1)); // 将job1加入控制容器 ControlledJob ctrljob1 = new ControlledJob(conf); ctrljob1.setJob(job1); // job2配置 Job job2 = Job.getInstance(conf); job2.setJarByClass(Mode.class); job2.setMapperClass(Map2.class); job2.setReducerClass(Reduce2.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(IntWritable.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job2, new Path(inPath1)); FileOutputFormat.setOutputPath(job2, new Path(outPath2)); // 将job2加入控制容器 ControlledJob ctrljob2 = new ControlledJob(conf); ctrljob2.setJob(job2); // job3配置 Job job3 = Job.getInstance(conf); job3.setJarByClass(Mode.class); job3.setMapperClass(Map3.class); job3.setReducerClass(Reduce3.class); job3.setMapOutputKeyClass(Text.class); job3.setMapOutputValueClass(IntWritable.class); job3.setOutputKeyClass(Text.class); job3.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job3, new Path(outPath2)); FileOutputFormat.setOutputPath(job3, new Path(outPath3)); ControlledJob ctrljob3 = new ControlledJob(conf); // 设置job3依赖job1和job2 ctrljob3.addDependingJob(ctrljob1); ctrljob3.addDependingJob(ctrljob2); ctrljob3.setJob(job3); // 主控制器 JobControl jobCtrl = new JobControl("myctrl"); jobCtrl.addJob(ctrljob1); jobCtrl.addJob(ctrljob2); jobCtrl.addJob(ctrljob3); // 在启动线程,记住一定要有这个 Thread t = new Thread(jobCtrl); t.start(); while(true){ // 如果作业全部完成,就打印成功作业的信息 if(jobCtrl.allFinished()){ System.out.println(jobCtrl.getSuccessfulJobList()); jobCtrl.stop(); break; } } } }
3、链式组合式
所谓连式MapReduce就是用多个Mapper处理任务,最后用一个Reducer输出结果,注意和迭代式和组合式MapReduce的不同之处
一个MapReduce作业可能会有一些前处理和后处理步骤,将这些前后处理步骤以单独的MapReduce任务实现也可以达到目的,但由于
增加了多个MapReduce作业,将增加整个作业的处理周期,而且还会增加很多I/O操作,因此处理效率不高。
Hadoop为此提供了专门的链式Mapper(ChainMapper)和链式Reducer(ChainReducer)来完成这种处理。
ChainMapper允许在一个单一Map任务中添加和使用多个Map子任务;而ChainReducer则允许在一个单一Reduce任务执行了Reduce处理
后,继续使用多个Map子任务完成一些后续处理。
package com.hadoop.mapreduce; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.chain.ChainReducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Chain { // 第一个Job public static class Map1 extends Mapper<LongWritable, Text, Text, IntWritable>{ Text word = new Text(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()){ word.set(st.nextToken()); context.write(word, new IntWritable(1)); } } } public static class Reduce1 extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val : values){ sum += val.get(); } result.set(sum); context.write(key, result); } } // 第二个Job public static class Map2 extends Mapper<Text, IntWritable, Text, IntWritable>{ Text word = new Text(); @Override protected void map(Text key, IntWritable value,Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()){ word.set(st.nextToken()); context.write(word, new IntWritable(1)); } } } public static class Reduce2 extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val : values){ sum += val.get(); } result.set(sum); context.write(key, result); } } // 第三个Job public static class Map3 extends Mapper<Text, IntWritable, Text, IntWritable>{ Text word = new Text(); @Override protected void map(Text key, IntWritable value,Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()){ word.set(st.nextToken()); context.write(word, new IntWritable(1)); } } } public static class Reduce3 extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val : values){ sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ String inPath1 = "hdfs://hadoop0:9000/user/root/input/"; String outPath1 = "hdfs://hadoop0:9000/user/root/3DZout/"; String outPath2 = "hdfs://hadoop0:9000/user/root/3DZout2/"; String outPath3 = "hdfs://hadoop0:9000/user/root/3DZout3/"; String[] inOut = {inPath1, outPath1}; Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, inOut).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } // 判断输出路径是否存在,如存在先删除 FileSystem hdfs = FileSystem.get(conf); Path findFile = new Path(outPath1); boolean isExists = hdfs.exists(findFile); if(isExists){ hdfs.delete(findFile, true); } if(hdfs.exists(new Path(outPath2))){ hdfs.delete(new Path(outPath2), true); } if(hdfs.exists(new Path(outPath3))){ hdfs.delete(new Path(outPath3), true); } // job1配置 Job job1 = Job.getInstance(conf); job1.setJarByClass(Chain.class); job1.setJobName("ChainJob"); FileInputFormat.addInputPath(job1, new Path(inPath1)); FileOutputFormat.setOutputPath(job1, new Path(outPath1)); // 连式编程要注意的是,可以有多个个Mapper,且后面Mapper的输入是是上一个Mapper的输出,最后一个Mapper的输出是Reducer的输入, // 但全局只有一个Reducer ChainMapper.addMapper(job1, Map1.class, LongWritable.class, Text.class, Text.class, IntWritable.class, conf); ChainMapper.addMapper(job1, Map2.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf); ChainMapper.addMapper(job1, Map3.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf); // 执行顺序 map1 --> map2 --> map3 --> reduce1 ChainReducer.setReducer(job1, Reduce1.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf); job1.waitForCompletion(true); } }
相关推荐
近几年,研究者扩展和改进原始MapReduce,已开发了若干迭代式MapReduce以更好地为大数据处理而支持迭代计算。对迭代式MapReduce编程框架进行综合评述,较详细地阐述了这些研究成果,给出了它们各自的基本思想,并...
Guagua目前主要支持的是同步的Master-Workers结构的迭代式计算框架,今后我们希望能够支持异步方式的迭代计算框架,2012年Google MapReduce之父Jeff Dean发表了一篇论文,上面提到了对神经网络深度模型的支持,文章...
MapReduce求行平均值--标准差--迭代器处理--MapReduce案例
mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce ...
基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf基于MapReduce作业拆分组合机制的并行ETL组件实现.pdf...
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业...比较Spark和MapReduce执行迭代应用的性能差异源码+学习说明(课程作业).zip
与迭代 MR 框架比较的实验 我们为在迭代 MapReduce 框架上运行迭代算法而实现的示例代码 来自 KAIST 菲尔
1. 启 动 全 分 布 模 式 Hadoop 集 群 , 守护进程 包 括 NameNode 、 DataNode 、 SecondaryNameNode、ResourceManager、NodeManager 和 JobHistoryServer。 2. 在 Hadoop 集群主节点上搭建 MapReduce 开发环境 ...
提出一种改进的链式MapReduce 框架,并将此框架应用于一个并行ETL 工具,同时提出一些针对ETL 处理的流程级优化规则,使ETL流程产生更少的MapReduce作业,从而减少I/O以及网络传输的消耗;利用某省份手机上网数据与...
Mapreduce编程模型是Google采用的云计算编程模式,本论文阐述了Mapreduce编程模型
MapReduce发明人关于MapReduce的介绍
2.使用代码简述迭代式、组合式的job执行方法 3.HBASE的Map、Reduce继承类和序列化类是什么 4.简述容量调度的配置方法 5.简述mapreduce流程 6.简述二次排序算法 有输入数据如下所示: 1 2 2 3 2 1 4 6 3 1 3 8 3 2 ...
图解MapReduce,系统介绍Hadoop MapReduce工作过程原理
MPI等并行计算方法缺少高层并行编程模型,为了克服这一缺陷,MapReduce借鉴了Lisp函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型 上升到构架:统一构架,为程序员隐藏系统层细节 MPI等...
【MapReduce篇07】MapReduce之数据清洗ETL1
Hadoop 用mapreduce实现Wordcount实例,绝对能用
MapReduce简单程序示例
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
MapReduce的实现细节,对mapreduce的具体实现讲解
mapreduce example