- 浏览: 2148237 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
有时候,我们使用Hadoop处理数据时,在Reduce阶段,我们可能想对每一个输出的key进行单独输出一个目录或文件,这样方便数据分析,比如根据某个时间段对日志文件进行时间段归类等等。这时候我们就可以使用MultipleOutputs类,来搞定这件事,
下面,先来看下散仙的测试数据:
输出结果:预期输出结果是:
中国一组,美国一组,中国人一组
核心代码如下:
如果是中文的路径名,则会报如下的一个异常:
源码中关于名称的校验如下:
程序运行成功输出:
运行成功后,生成的文件如下所示:
china-r-00000里面的数据如下:
USA-r-00000里面的数据如下:
cperson-r-00000里面的数据如下:
在输出结果中,reduce自带的那个文件仍然会输出,但是里面没有任何数据,至此,我们已经在hadoop1.2.0的基于新的API里,测试多文件输出通过。
不确定,估计没法搞了,这些名字都是提前定制好的!
下面,先来看下散仙的测试数据:
中国;我们 美国;他们 中国;123 中国人;善良 美国;USA 美国;在北美洲
输出结果:预期输出结果是:
中国一组,美国一组,中国人一组
核心代码如下:
package com.partition.test; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.qin.operadb.PersonRecoder; import com.qin.operadb.ReadMapDB; /*** * @author qindongliang * * 大数据技术交流群:324714439 * **/ public class TestMultiOutput { /** * map任务 * * **/ public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String ss[]=value.toString().split(";"); context.write(new Text(ss[0]), new Text(ss[1])); } } public static class PReduce extends Reducer<Text, Text, Text, Text>{ /** * 设置多个文件输出 * */ private MultipleOutputs mos; @Override protected void setup(Context context) throws IOException, InterruptedException { mos=new MultipleOutputs(context);//初始化mos } @Override protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2) throws IOException, InterruptedException { String key=arg0.toString(); for(Text t:arg1){ if(key.equals("中国")){ /** * 一个参数 * **/ mos.write("china", arg0,t); } else if(key.equals("美国")){ mos.write("USA", arg0,t); } else if(key.equals("中国人")){ mos.write("cperson", arg0,t); } //System.out.println("Reduce: "+arg0.toString()+" "+t.toString()); } } @Override protected void cleanup( Context context) throws IOException, InterruptedException { mos.close();//释放资源 } } public static void main(String[] args) throws Exception{ JobConf conf=new JobConf(ReadMapDB.class); //Configuration conf=new Configuration(); // conf.set("mapred.job.tracker","192.168.75.130:9001"); //读取person中的数据字段 // conf.setJar("tt.jar"); //注意这行代码放在最前面,进行初始化,否则会报 /**Job任务**/ Job job=new Job(conf, "testpartion"); job.setJarByClass(TestMultiOutput.class); System.out.println("模式: "+conf.get("mapred.job.tracker"));; // job.setCombinerClass(PCombine.class); //job.setPartitionerClass(PPartition.class); //job.setNumReduceTasks(5); job.setMapperClass(PMapper.class); /** * 注意在初始化时需要设置输出文件的名 * 另外名称,不支持中文名,仅支持英文字符 * * **/ MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "USA", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "cperson", TextOutputFormat.class, Text.class, Text.class); job.setReducerClass(PReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String path="hdfs://192.168.75.130:9000/root/outputdb"; FileSystem fs=FileSystem.get(conf); Path p=new Path(path); if(fs.exists(p)){ fs.delete(p, true); System.out.println("输出路径存在,已删除!"); } FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input"); FileOutputFormat.setOutputPath(job,p ); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
如果是中文的路径名,则会报如下的一个异常:
模式: local 输出路径存在,已删除! WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1533332464_0001 INFO - LocalJobRunner$Job.run(340) | Waiting for map tasks INFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1533332464_0001_m_000000_0 INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : null INFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/input/group.txt:0+91 INFO - MapTask$MapOutputBuffer.<init>(949) | io.sort.mb = 100 INFO - MapTask$MapOutputBuffer.<init>(961) | data buffer = 79691776/99614720 INFO - MapTask$MapOutputBuffer.<init>(962) | record buffer = 262144/327680 INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map output INFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0 INFO - Task.done(858) | Task:attempt_local1533332464_0001_m_000000_0 is done. And is in the process of commiting INFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Task.sendDone(970) | Task 'attempt_local1533332464_0001_m_000000_0' done. INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1533332464_0001_m_000000_0 INFO - LocalJobRunner$Job.run(348) | Map task executor complete. INFO - Task.initialize(534) | Using ResourceCalculatorPlugin : null INFO - LocalJobRunner$Job.statusUpdate(466) | INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segments INFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 101 bytes INFO - LocalJobRunner$Job.statusUpdate(466) | WARN - LocalJobRunner$Job.run(435) | job_local1533332464_0001 java.lang.IllegalArgumentException: Name cannot be have a '一' char at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkTokenName(MultipleOutputs.java:160) at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkNamedOutputName(MultipleOutputs.java:186) at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:363) at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:348) at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:74) at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:1) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398) INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1533332464_0001 INFO - Counters.log(585) | Counters: 17 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=91 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=177 INFO - Counters.log(589) | HDFS_BYTES_READ=91 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=71111 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=105 INFO - Counters.log(589) | Map input records=6 INFO - Counters.log(589) | Reduce shuffle bytes=0 INFO - Counters.log(589) | Spilled Records=6 INFO - Counters.log(589) | Map output bytes=87 INFO - Counters.log(589) | Total committed heap usage (bytes)=227737600 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=112 INFO - Counters.log(589) | Reduce input records=0 INFO - Counters.log(589) | Reduce input groups=0 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Reduce output records=0 INFO - Counters.log(589) | Map output records=6
源码中关于名称的校验如下:
/** * Checks if a named output name is valid token. * * @param namedOutput named output Name * @throws IllegalArgumentException if the output name is not valid. */ private static void checkTokenName(String namedOutput) { if (namedOutput == null || namedOutput.length() == 0) { throw new IllegalArgumentException( "Name cannot be NULL or emtpy"); } for (char ch : namedOutput.toCharArray()) { if ((ch >= 'A') && (ch <= 'Z')) { continue; } if ((ch >= 'a') && (ch <= 'z')) { continue; } if ((ch >= '0') && (ch <= '9')) { continue; } throw new IllegalArgumentException( "Name cannot be have a '" + ch + "' char"); } }
程序运行成功输出:
模式: 192.168.75.130:9001 输出路径存在,已删除! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404101853_0006 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404101853_0006 INFO - Counters.log(585) | Counters: 29 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=9289 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=1 INFO - Counters.log(589) | Data-local map tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=13645 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=0 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=105 INFO - Counters.log(589) | HDFS_BYTES_READ=203 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=113616 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=87 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=91 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=105 INFO - Counters.log(589) | Map input records=6 INFO - Counters.log(589) | Reduce shuffle bytes=105 INFO - Counters.log(589) | Spilled Records=12 INFO - Counters.log(589) | Map output bytes=87 INFO - Counters.log(589) | Total committed heap usage (bytes)=176033792 INFO - Counters.log(589) | CPU time spent (ms)=1880 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=112 INFO - Counters.log(589) | Reduce input records=6 INFO - Counters.log(589) | Reduce input groups=3 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=278876160 INFO - Counters.log(589) | Reduce output records=0 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=1460908032 INFO - Counters.log(589) | Map output records=6
运行成功后,生成的文件如下所示:
china-r-00000里面的数据如下:
中国 我们 中国 123
USA-r-00000里面的数据如下:
美国 他们 美国 USA 美国 在北美洲
cperson-r-00000里面的数据如下:
中国人 善良
在输出结果中,reduce自带的那个文件仍然会输出,但是里面没有任何数据,至此,我们已经在hadoop1.2.0的基于新的API里,测试多文件输出通过。
评论
3 楼
zhanggl23456
2014-08-11
import com.qin.operadb.PersonRecoder;
import com.qin.operadb.ReadMapDB;
楼主你的代码没有完整还去缺2个类:ReadMapDB,PersonRecoder
import com.qin.operadb.ReadMapDB;
楼主你的代码没有完整还去缺2个类:ReadMapDB,PersonRecoder
2 楼
qindongliang1922
2014-05-22
aiyan3344 写道
您好:
如果 MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);
这里要输出的文件名,不确定是什么的,要怎么做????
如果 MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);
这里要输出的文件名,不确定是什么的,要怎么做????
不确定,估计没法搞了,这些名字都是提前定制好的!
1 楼
aiyan3344
2014-05-22
您好:
如果 MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);
这里要输出的文件名,不确定是什么的,要怎么做????
如果 MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);
这里要输出的文件名,不确定是什么的,要怎么做????
发表评论
-
Apache Flink在阿里的使用(译)
2019-02-21 21:18 1059Flink是未来大数据实时 ... -
计算机图形处理的一些知识
2018-04-25 17:46 1190最近在搞opencv来做一些 ... -
如何在kylin中构建一个cube
2017-07-11 19:06 1208前面的文章介绍了Apache Kylin的安装及数据仓 ... -
Apache Kylin的入门安装
2017-06-27 21:27 2112Apache Kylin™是一个开源的分布式分析引擎,提供 ... -
ES-Hadoop插件介绍
2017-04-27 18:07 1947上篇文章,写了使用spark集成es框架,并向es写入数据,虽 ... -
如何在Scala中读取Hadoop集群上的gz压缩文件
2017-04-05 18:51 2090存在Hadoop集群上的文件,大部分都会经过压缩,如果是压缩 ... -
如何收集项目日志统一发送到kafka中?
2017-02-07 19:07 2749上一篇(http://qindongliang.iteye. ... -
Hue+Hive临时目录权限不够解决方案
2016-06-14 10:40 4641安装Hue后,可能会分配多个账户给一些业务部门操作hive,虽 ... -
Hadoop的8088页面失效问题
2016-03-31 11:21 4380前两天重启了测试的hadoop集群,今天访问集群的8088任 ... -
Hadoop+Hbase集群数据迁移问题
2016-03-23 21:00 2478数据迁移或备份是任何 ... -
如何监控你的Hadoop+Hbase集群?
2016-03-21 16:10 4834前言 监控hadoop的框架 ... -
Logstash与Kafka集成
2016-02-24 18:44 11508在ELKK的架构中,各个框架的角色分工如下: Elastic ... -
Kakfa集群搭建
2016-02-23 15:36 2592先来整体熟悉下Kafka的一些概念和架构 (一)什么是Ka ... -
大数据日志收集框架之Flume入门
2016-02-02 14:25 4147Flume是Cloudrea公司开源的一款优秀的日志收集框架 ... -
Apache Tez0.7编译笔记
2016-01-15 16:33 2430目前最新的Tez版本是0.8,但还不是稳定版,所以大家还 ... -
Bug死磕之hue集成的oozie+pig出现资源任务死锁问题
2016-01-14 15:52 3758这两天,打算给现有的 ... -
Hadoop2.7.1和Hbase0.98添加LZO压缩
2016-01-04 17:46 25521,执行命令安装一些依赖组件 yum install -y ... -
Hadoop2.7.1配置NameNode+ResourceManager高可用原理分析
2015-11-11 19:51 3129关于NameNode高可靠需要配置的文件有core-site ... -
设置Hadoop+Hbase集群pid文件存储位置
2015-10-20 13:40 2782有时候,我们对运行几 ... -
Hadoop+Maven项目打包异常
2015-08-11 19:36 1505先简单说下业务:有一个单独的模块,可以在远程下载Hadoop上 ...
相关推荐
主要介绍了 Hadoop MultipleOutputs输出到多个文件中的实现方法的相关资料,希望通过本文能帮助到大家,需要的朋友可以参考下
hadoop的默认配置文件,下载记得关注我哦
Hadoop的MapReduce中多文件输出.pdf
hadoop的dll文件 hadoop.zip
Hadoop HA 集群搭建所需要的配置文件:core-site,hdfs-site,mapred-site,yarn-site四个xml文件和一个slaves文件
hadoop2.7.2安装依赖文件,用于在window下调试hadoop! hadoop2.7.2安装依赖文件,用于在window下调试hadoop hadoop2.7.2安装依赖文件,用于在window下调试hadoop
hadoop/etc/hadoop/6个文件 core-site.xml hadoop-env.sh hdfs-site.xml mapred-site.xml yarn-env.sh yarn-site.xml
Hadoop 2.2.0 配置文件 在4台CentOs 6.4版本下运行成功
windows平台上,使用Eclipse hadoop插件,开发基于hdfs文件的中文分词统计和排序功能,以唐诗三百首为例,找出其中使用频率最高的词语。
大数据与云计算培训学习资料 Hadoop的MapReduce中多文件输出 共9页.pdf
Hadoop分布式文件系统使用指南Hadoop分布式文件系统使用指南Hadoop分布式文件系统使用指南Hadoop分布式文件系统使用指南Hadoop分布式文件系统使用指南Hadoop分布式文件系统使用指南Hadoop分布式文件系统使用指南...
hadoop web tomcat上传文件,自己写的一个WEB只有上传文件功能,hadoop.rar是已经发布的解压直接放到TOMCAT WEBAPPS下就可以运行
hadoop在windows上运行需要winutils支持和hadoop.dll等文件
Hadoop分布式文件系统的模型分析,Hadoop 分布式文件系统是遵循Google 文件系统原理进行开发和实现的,受到了业界极大关注,并 已被广泛应用。 鉴于当前缺乏从系统设计理论的角度对其开展的相关研究,本文从 Hadoop ...
十分详细的Hadoop虚拟集群的搭建,手把手教,从安装虚拟机到集群搭建完成每一步手把手教,包教包会。
hadoop2.x配置文件,一共八个文件,能够配置hadoop yarn等框架
hadoop配置文件hadoop配置文件hadoop配置文件hadoop配置文件hadoop配置文件hadoop配置文件hadoop配置文件hadoop配置文件hadoop配置文件
基于Hadoop的在线文件管理系统-开题报告.pdf基于Hadoop的在线文件管理系统-开题报告.pdf基于Hadoop的在线文件管理系统-开题报告.pdf基于Hadoop的在线文件管理系统-开题报告.pdf基于Hadoop的在线文件管理系统-开题...
Hadoop3.1.3 配置文件
关键字:Linux CentOS Hadoop Java 版本: CentOS7 Hadoop2.8.0 JDK1.8 说明:Hadoop从版本2开始加入了Yarn这个资源管理器,Yarn并不需要单独安装。只要在机器上安装了JDK就可以直接安装Hadoop,单纯安装Hadoop...