`
qindongliang1922
  • 浏览: 2148237 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:116343
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:124610
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:58490
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:70377
社区版块
存档分类
最新评论

如何使用Hadoop的MultipleOutputs进行多文件输出

阅读更多
有时候,我们使用Hadoop处理数据时,在Reduce阶段,我们可能想对每一个输出的key进行单独输出一个目录或文件,这样方便数据分析,比如根据某个时间段对日志文件进行时间段归类等等。这时候我们就可以使用MultipleOutputs类,来搞定这件事,

下面,先来看下散仙的测试数据:

中国;我们
美国;他们
中国;123
中国人;善良
美国;USA
美国;在北美洲

输出结果:预期输出结果是:
中国一组,美国一组,中国人一组
核心代码如下:

package com.partition.test;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.qin.operadb.PersonRecoder;
import com.qin.operadb.ReadMapDB;
 
/***
 * @author qindongliang
 * 
 * 大数据技术交流群:324714439
 * **/
public class TestMultiOutput {
	
	
	/**
	 * map任务
	 * 
	 * **/
	public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			 String ss[]=value.toString().split(";");
			context.write(new Text(ss[0]), new Text(ss[1]));	
		}
		
		
	}
	
 
	 public static class PReduce extends Reducer<Text, Text, Text, Text>{
		 /**
		  * 设置多个文件输出
		  * */
		 private MultipleOutputs mos;
		 
		 @Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			  mos=new MultipleOutputs(context);//初始化mos
		}
		 @Override
		protected void reduce(Text arg0, Iterable<Text> arg1, Context arg2)
				throws IOException, InterruptedException {
			 
			  String key=arg0.toString();
			 for(Text t:arg1){
				   if(key.equals("中国")){ 
					   /**
					    * 一个参数
					    * **/
					   mos.write("china", arg0,t); 
				   } else if(key.equals("美国")){
					   mos.write("USA", arg0,t);    
				   } else if(key.equals("中国人")){
					   mos.write("cperson", arg0,t); 
					   
				   }
	     
				 //System.out.println("Reduce:  "+arg0.toString()+"   "+t.toString());
			 }
			   
			 
		}
		 
		 @Override
		protected void cleanup(
				 Context context)
				throws IOException, InterruptedException {
			 mos.close();//释放资源
		}
		 
	 }
	 
	 
	 public static void main(String[] args) throws Exception{
		 JobConf conf=new JobConf(ReadMapDB.class);
		 //Configuration conf=new Configuration();
	  	// conf.set("mapred.job.tracker","192.168.75.130:9001");
		//读取person中的数据字段
	  	// conf.setJar("tt.jar");
		//注意这行代码放在最前面,进行初始化,否则会报
	 
	 
		/**Job任务**/
		Job job=new Job(conf, "testpartion");
		job.setJarByClass(TestMultiOutput.class);
		System.out.println("模式:  "+conf.get("mapred.job.tracker"));;
		// job.setCombinerClass(PCombine.class);
		//job.setPartitionerClass(PPartition.class);
		//job.setNumReduceTasks(5);
		 job.setMapperClass(PMapper.class);
		 
		 /**
		  * 注意在初始化时需要设置输出文件的名
		  * 另外名称,不支持中文名,仅支持英文字符
		  * 
		  * **/
		 MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);
		 MultipleOutputs.addNamedOutput(job, "USA", TextOutputFormat.class, Text.class, Text.class);
		 MultipleOutputs.addNamedOutput(job, "cperson", TextOutputFormat.class, Text.class, Text.class);
		 job.setReducerClass(PReduce.class);
		 job.setOutputKeyClass(Text.class);
		 job.setOutputValueClass(Text.class);
	    
		String path="hdfs://192.168.75.130:9000/root/outputdb";
		FileSystem fs=FileSystem.get(conf);
		Path p=new Path(path);
		if(fs.exists(p)){
			fs.delete(p, true);
			System.out.println("输出路径存在,已删除!");
		}
		FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input");
		FileOutputFormat.setOutputPath(job,p );
		System.exit(job.waitForCompletion(true) ? 0 : 1);  
		 
		 
	}
	
	

}

如果是中文的路径名,则会报如下的一个异常:
模式:  local
输出路径存在,已删除!
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
WARN - JobClient.copyAndConfigureFiles(870) | No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_local1533332464_0001
INFO - LocalJobRunner$Job.run(340) | Waiting for map tasks
INFO - LocalJobRunner$Job$MapTaskRunnable.run(204) | Starting task: attempt_local1533332464_0001_m_000000_0
INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : null
INFO - MapTask.runNewMapper(729) | Processing split: hdfs://192.168.75.130:9000/root/input/group.txt:0+91
INFO - MapTask$MapOutputBuffer.<init>(949) | io.sort.mb = 100
INFO - MapTask$MapOutputBuffer.<init>(961) | data buffer = 79691776/99614720
INFO - MapTask$MapOutputBuffer.<init>(962) | record buffer = 262144/327680
INFO - MapTask$MapOutputBuffer.flush(1289) | Starting flush of map output
INFO - MapTask$MapOutputBuffer.sortAndSpill(1471) | Finished spill 0
INFO - Task.done(858) | Task:attempt_local1533332464_0001_m_000000_0 is done. And is in the process of commiting
INFO - LocalJobRunner$Job.statusUpdate(466) | 
INFO - Task.sendDone(970) | Task 'attempt_local1533332464_0001_m_000000_0' done.
INFO - LocalJobRunner$Job$MapTaskRunnable.run(229) | Finishing task: attempt_local1533332464_0001_m_000000_0
INFO - LocalJobRunner$Job.run(348) | Map task executor complete.
INFO - Task.initialize(534) |  Using ResourceCalculatorPlugin : null
INFO - LocalJobRunner$Job.statusUpdate(466) | 
INFO - Merger$MergeQueue.merge(408) | Merging 1 sorted segments
INFO - Merger$MergeQueue.merge(491) | Down to the last merge-pass, with 1 segments left of total size: 101 bytes
INFO - LocalJobRunner$Job.statusUpdate(466) | 
WARN - LocalJobRunner$Job.run(435) | job_local1533332464_0001
java.lang.IllegalArgumentException: Name cannot be have a '一' char
	at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkTokenName(MultipleOutputs.java:160)
	at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkNamedOutputName(MultipleOutputs.java:186)
	at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:363)
	at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(MultipleOutputs.java:348)
	at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:74)
	at com.partition.test.TestMultiOutput$PReduce.reduce(TestMultiOutput.java:1)
	at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
	at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_local1533332464_0001
INFO - Counters.log(585) | Counters: 17
INFO - Counters.log(587) |   File Input Format Counters 
INFO - Counters.log(589) |     Bytes Read=91
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=177
INFO - Counters.log(589) |     HDFS_BYTES_READ=91
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=71111
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=105
INFO - Counters.log(589) |     Map input records=6
INFO - Counters.log(589) |     Reduce shuffle bytes=0
INFO - Counters.log(589) |     Spilled Records=6
INFO - Counters.log(589) |     Map output bytes=87
INFO - Counters.log(589) |     Total committed heap usage (bytes)=227737600
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112
INFO - Counters.log(589) |     Reduce input records=0
INFO - Counters.log(589) |     Reduce input groups=0
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Reduce output records=0
INFO - Counters.log(589) |     Map output records=6


源码中关于名称的校验如下:
 /**
   * Checks if a named output name is valid token.
   *
   * @param namedOutput named output Name
   * @throws IllegalArgumentException if the output name is not valid.
   */
  private static void checkTokenName(String namedOutput) {
    if (namedOutput == null || namedOutput.length() == 0) {
      throw new IllegalArgumentException(
        "Name cannot be NULL or emtpy");
    }
    for (char ch : namedOutput.toCharArray()) {
      if ((ch >= 'A') && (ch <= 'Z')) {
        continue;
      }
      if ((ch >= 'a') && (ch <= 'z')) {
        continue;
      }
      if ((ch >= '0') && (ch <= '9')) {
        continue;
      }
      throw new IllegalArgumentException(
        "Name cannot be have a '" + ch + "' char");
    }
  }

程序运行成功输出:
模式:  192.168.75.130:9001
输出路径存在,已删除!
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404101853_0006
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404101853_0006
INFO - Counters.log(585) | Counters: 29
INFO - Counters.log(587) |   Job Counters 
INFO - Counters.log(589) |     Launched reduce tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=9289
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Launched map tasks=1
INFO - Counters.log(589) |     Data-local map tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=13645
INFO - Counters.log(587) |   File Output Format Counters 
INFO - Counters.log(589) |     Bytes Written=0
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=105
INFO - Counters.log(589) |     HDFS_BYTES_READ=203
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=113616
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=87
INFO - Counters.log(587) |   File Input Format Counters 
INFO - Counters.log(589) |     Bytes Read=91
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=105
INFO - Counters.log(589) |     Map input records=6
INFO - Counters.log(589) |     Reduce shuffle bytes=105
INFO - Counters.log(589) |     Spilled Records=12
INFO - Counters.log(589) |     Map output bytes=87
INFO - Counters.log(589) |     Total committed heap usage (bytes)=176033792
INFO - Counters.log(589) |     CPU time spent (ms)=1880
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=112
INFO - Counters.log(589) |     Reduce input records=6
INFO - Counters.log(589) |     Reduce input groups=3
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=278876160
INFO - Counters.log(589) |     Reduce output records=0
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=1460908032
INFO - Counters.log(589) |     Map output records=6

运行成功后,生成的文件如下所示:


china-r-00000里面的数据如下:
中国	我们
中国	123

USA-r-00000里面的数据如下:
美国	他们
美国	USA
美国	在北美洲

cperson-r-00000里面的数据如下:
中国人	 善良


在输出结果中,reduce自带的那个文件仍然会输出,但是里面没有任何数据,至此,我们已经在hadoop1.2.0的基于新的API里,测试多文件输出通过。
  • 大小: 72.7 KB
分享到:
评论
3 楼 zhanggl23456 2014-08-11  
import com.qin.operadb.PersonRecoder; 
import com.qin.operadb.ReadMapDB;
楼主你的代码没有完整还去缺2个类:ReadMapDB,PersonRecoder
2 楼 qindongliang1922 2014-05-22  
aiyan3344 写道
您好:
如果 MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);

这里要输出的文件名,不确定是什么的,要怎么做????



不确定,估计没法搞了,这些名字都是提前定制好的!
1 楼 aiyan3344 2014-05-22  
您好:
如果 MultipleOutputs.addNamedOutput(job, "china", TextOutputFormat.class, Text.class, Text.class);

这里要输出的文件名,不确定是什么的,要怎么做????

相关推荐

Global site tag (gtag.js) - Google Analytics