- 浏览: 2145273 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
在hadoop里面处理的数据,默认按输入内容的key进行排序的,大部分情况下,都可以满足的我们的业务需求,但有时候,可能出现类似以下的需求,输入内容:
要求输出1:
要求输出2:
注意上面的输出1,和输出2,其实都是一样的逻辑,只不过,输出的形式稍微改了下,那么今天散仙,就来分析下,怎么在hadoop里面,实现这样的需求。
其实这样的需求,就类似数据库的标准SQL分组
SELECT A,B FROM TABLE GROUP BY A,B ORDER BY A,B
当然也不一定,是2个字段分组,可能有2个或2个以上的多个字段分组。
下面,我们先来看下MapReduce内部执行2次排序的流程图,这图是散仙收集的,画的很不错。
由上图可知,Map在处理数据时,先由InputFormat组件提供输入格式,然后Split一行数据,默认的是TextInputFormat,Key为字节偏移量,Value为内容,然后把这行数据,传给Map,Map根据某种约定的分隔符,进行拆分数据,进行业务处理,如果是计数的直接在Value上输出1,在Map输出前,如果有Combine组件,则会执行Combine阶段,进行本地Reduce,一般是用来优化程序用的,Combine执行完后,会执行Partition组件,进行数据分区,默认的是HashPartition,按照输出的Key的哈希值与上Integer的最大值,然后对reduce的个数进行取余得到的值,经过Partition后,数据就会被按桶输出到本地磁盘上,在输出的时候,会按照Key进行排序,然后等所有的Map执行完毕后,就会进入Reduce阶段,这个阶段会进行一个大的混洗阶段,术语叫shuffle,每个reduce都会去每个map输出的分区里面,拉取对应的一部分数据,这个时候,是最耗网络IO,以及磁盘IO的,是影响性能的一个重要瓶颈,当Reduce把所有的数据拉取完毕后,就会进行分组并按照Key进行排序,每处理好一个分组,都会调用一次Reduce函数,进行累加,或其他的业务处理,处理完毕后,就会通过OutputFormat进行输出到HDFS上,至此,整个流程就执行完毕。
代码如下:
在eclipse下,执行,打印日志内容如下:
执行完,我们在输出目录里里面查看
执行完,内容如下:
我们发现,跟我们预期的结果一致,熟悉MapReduce的执行原理,可以帮助我们更好的使用Hive,因为Hive本身就是一个或多个MapReduce作业构成的,Hive语句的优化,对MapReduce作业的影响的性能也是不容忽视的,所以我们一定要多熟悉熟悉MapReduce编程的模型,以便于我们对它有一个更清晰的认识和了解。
秦东亮;72 秦东亮;34 秦东亮;100 三劫;899 三劫;32 三劫;1 a;45 b;567 b;12
要求输出1:
a 45 b 12,567 三劫 1,32,899 秦东亮 34,72,100
要求输出2:
a 45 b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100
注意上面的输出1,和输出2,其实都是一样的逻辑,只不过,输出的形式稍微改了下,那么今天散仙,就来分析下,怎么在hadoop里面,实现这样的需求。
其实这样的需求,就类似数据库的标准SQL分组
SELECT A,B FROM TABLE GROUP BY A,B ORDER BY A,B
当然也不一定,是2个字段分组,可能有2个或2个以上的多个字段分组。
下面,我们先来看下MapReduce内部执行2次排序的流程图,这图是散仙收集的,画的很不错。
由上图可知,Map在处理数据时,先由InputFormat组件提供输入格式,然后Split一行数据,默认的是TextInputFormat,Key为字节偏移量,Value为内容,然后把这行数据,传给Map,Map根据某种约定的分隔符,进行拆分数据,进行业务处理,如果是计数的直接在Value上输出1,在Map输出前,如果有Combine组件,则会执行Combine阶段,进行本地Reduce,一般是用来优化程序用的,Combine执行完后,会执行Partition组件,进行数据分区,默认的是HashPartition,按照输出的Key的哈希值与上Integer的最大值,然后对reduce的个数进行取余得到的值,经过Partition后,数据就会被按桶输出到本地磁盘上,在输出的时候,会按照Key进行排序,然后等所有的Map执行完毕后,就会进入Reduce阶段,这个阶段会进行一个大的混洗阶段,术语叫shuffle,每个reduce都会去每个map输出的分区里面,拉取对应的一部分数据,这个时候,是最耗网络IO,以及磁盘IO的,是影响性能的一个重要瓶颈,当Reduce把所有的数据拉取完毕后,就会进行分组并按照Key进行排序,每处理好一个分组,都会调用一次Reduce函数,进行累加,或其他的业务处理,处理完毕后,就会通过OutputFormat进行输出到HDFS上,至此,整个流程就执行完毕。
代码如下:
package com.qin.groupsort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.qin.operadb.PersonRecoder; import com.qin.operadb.ReadMapDB; /** * @author qindongliang * * 大数据交流群:376932160 * * * **/ public class GroupSort { /** * map任务 * * */ public static class GMapper extends Mapper<LongWritable, Text, DescSort, IntWritable>{ private DescSort tx=new DescSort(); private IntWritable second=new IntWritable(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { System.out.println("执行map"); // System.out.println("进map了"); //mos.write(namedOutput, key, value); String ss[]=value.toString().split(";"); String mkey=ss[0]; int mvalue=Integer.parseInt(ss[1]); tx.setFirstKey(mkey); tx.setSecondKey(mvalue); second.set(mvalue); context.write(tx, second); } } /*** * Reduce任务 * * **/ public static class GReduce extends Reducer<DescSort, IntWritable, Text, Text>{ @Override protected void reduce(DescSort arg0, Iterable<IntWritable> arg1, Context ctx) throws IOException, InterruptedException { System.out.println("执行reduce"); StringBuffer sb=new StringBuffer(); for(IntWritable t:arg1){ // sb.append(t).append(","); //con ctx.write(new Text(arg0.getFirstKey()), new Text(t.toString())); /**这种写法,是这种输出 *a 45 *b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100 */ } if(sb.length()>0){ sb.deleteCharAt(sb.length()-1);//删除最后一位的逗号 } // 在循环里拼接,在循环外输出是这种格式 // b 12,567 // 三劫 1,32,899 // 秦东亮 34,72,100 // ctx.write(new Text(arg0.getFirstKey()), new Text(sb.toString())); } } /*** * * 自定义组合键 * **/ public static class DescSort implements WritableComparable{ public DescSort() { // TODO Auto-generated constructor stub } private String firstKey; private int secondKey; public String getFirstKey() { return firstKey; } public void setFirstKey(String firstKey) { this.firstKey = firstKey; } public int getSecondKey() { return secondKey; } public void setSecondKey(int secondKey) { this.secondKey = secondKey; } // @Override // public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, // int arg4, int arg5) { // return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序 // } // // @Override // public int compare(Object a, Object b) { // // return -super.compare(a, b);//注意使用负号来完成降序 // } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub firstKey=in.readUTF(); secondKey=in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(firstKey); out.writeInt(secondKey); } @Override public int compareTo(Object o) { // TODO Auto-generated method stub DescSort d=(DescSort)o; //this在前代表升序 return this.getFirstKey().compareTo(d.getFirstKey()); } } /** * 主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组 * * **/ public static class TextComparator extends WritableComparator{ public TextComparator() { // TODO Auto-generated constructor stub super(DescSort.class,true);//注册Comparator } @Override public int compare(WritableComparable a, WritableComparable b) { System.out.println("执行TextComparator分组排序"); DescSort d1=(DescSort)a; DescSort d2=(DescSort)b; return d1.getFirstKey().compareTo(d2.getFirstKey()); } } /** * 组内排序的策略 * 按照第二个字段排序 * * */ public static class TextIntCompartator extends WritableComparator{ public TextIntCompartator() { super(DescSort.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { DescSort d1=(DescSort)a; DescSort d2=(DescSort)b; System.out.println("执行组内排序TextIntCompartator"); if(!d1.getFirstKey().equals(d2.getFirstKey())){ return d1.getFirstKey().compareTo(d2.getFirstKey()); }else{ return d1.getSecondKey()-d2.getSecondKey();//0,-1,1 } } } /** * 分区策略 * * */ public static class KeyPartition extends Partitioner<DescSort, IntWritable>{ @Override public int getPartition(DescSort key, IntWritable arg1, int arg2) { // TODO Auto-generated method stub System.out.println("执行自定义分区KeyPartition"); return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%arg2; } } public static void main(String[] args) throws Exception{ JobConf conf=new JobConf(ReadMapDB.class); //Configuration conf=new Configuration(); conf.set("mapred.job.tracker","192.168.75.130:9001"); //读取person中的数据字段 conf.setJar("tt.jar"); //注意这行代码放在最前面,进行初始化,否则会报 /**Job任务**/ Job job=new Job(conf, "testpartion"); job.setJarByClass(GroupSort.class); System.out.println("模式: "+conf.get("mapred.job.tracker"));; // job.setCombinerClass(PCombine.class); // job.setNumReduceTasks(3);//设置为3 job.setMapperClass(GMapper.class); job.setReducerClass(GReduce.class); /**设置分区函数*/ job.setPartitionerClass(KeyPartition.class); //分组函数,Reduce前的一次排序 job.setGroupingComparatorClass(TextComparator.class); //组内排序Map输出完毕后,对key进行的一次排序 job.setSortComparatorClass(TextIntCompartator.class); //TextComparator.class //TextIntCompartator.class // job.setGroupingComparatorClass(TextIntCompartator.class); //组内排序Map输出完毕后,对key进行的一次排序 // job.setSortComparatorClass(TextComparator.class); job.setMapOutputKeyClass(DescSort.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); String path="hdfs://192.168.75.130:9000/root/outputdb"; FileSystem fs=FileSystem.get(conf); Path p=new Path(path); if(fs.exists(p)){ fs.delete(p, true); System.out.println("输出路径存在,已删除!"); } FileInputFormat.setInputPaths(job, "hdfs://192.168.75.130:9000/root/input"); FileOutputFormat.setOutputPath(job,p ); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
在eclipse下,执行,打印日志内容如下:
模式: 192.168.75.130:9001 输出路径存在,已删除! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404152114_0003 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 33% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 100% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404152114_0003 INFO - Counters.log(585) | Counters: 29 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | Launched reduce tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=7040 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=1 INFO - Counters.log(589) | Data-local map tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=9807 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=86 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | FILE_BYTES_READ=162 INFO - Counters.log(589) | HDFS_BYTES_READ=205 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=111232 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=86 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=93 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map output materialized bytes=162 INFO - Counters.log(589) | Map input records=9 INFO - Counters.log(589) | Reduce shuffle bytes=162 INFO - Counters.log(589) | Spilled Records=18 INFO - Counters.log(589) | Map output bytes=138 INFO - Counters.log(589) | Total committed heap usage (bytes)=176033792 INFO - Counters.log(589) | CPU time spent (ms)=970 INFO - Counters.log(589) | Combine input records=0 INFO - Counters.log(589) | SPLIT_RAW_BYTES=112 INFO - Counters.log(589) | Reduce input records=9 INFO - Counters.log(589) | Reduce input groups=4 INFO - Counters.log(589) | Combine output records=0 INFO - Counters.log(589) | Physical memory (bytes) snapshot=258830336 INFO - Counters.log(589) | Reduce output records=9 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=1461055488 INFO - Counters.log(589) | Map output records=9
执行完,我们在输出目录里里面查看
执行完,内容如下:
a 45 b 12 b 567 三劫 1 三劫 32 三劫 899 秦东亮 34 秦东亮 72 秦东亮 100
我们发现,跟我们预期的结果一致,熟悉MapReduce的执行原理,可以帮助我们更好的使用Hive,因为Hive本身就是一个或多个MapReduce作业构成的,Hive语句的优化,对MapReduce作业的影响的性能也是不容忽视的,所以我们一定要多熟悉熟悉MapReduce编程的模型,以便于我们对它有一个更清晰的认识和了解。
发表评论
-
Apache Flink在阿里的使用(译)
2019-02-21 21:18 1053Flink是未来大数据实时 ... -
计算机图形处理的一些知识
2018-04-25 17:46 1187最近在搞opencv来做一些 ... -
如何在kylin中构建一个cube
2017-07-11 19:06 1204前面的文章介绍了Apache Kylin的安装及数据仓 ... -
Apache Kylin的入门安装
2017-06-27 21:27 2105Apache Kylin™是一个开源的分布式分析引擎,提供 ... -
ES-Hadoop插件介绍
2017-04-27 18:07 1942上篇文章,写了使用spark集成es框架,并向es写入数据,虽 ... -
如何在Scala中读取Hadoop集群上的gz压缩文件
2017-04-05 18:51 2084存在Hadoop集群上的文件,大部分都会经过压缩,如果是压缩 ... -
如何收集项目日志统一发送到kafka中?
2017-02-07 19:07 2742上一篇(http://qindongliang.iteye. ... -
Hue+Hive临时目录权限不够解决方案
2016-06-14 10:40 4631安装Hue后,可能会分配多个账户给一些业务部门操作hive,虽 ... -
Hadoop的8088页面失效问题
2016-03-31 11:21 4371前两天重启了测试的hadoop集群,今天访问集群的8088任 ... -
Hadoop+Hbase集群数据迁移问题
2016-03-23 21:00 2472数据迁移或备份是任何 ... -
如何监控你的Hadoop+Hbase集群?
2016-03-21 16:10 4828前言 监控hadoop的框架 ... -
Logstash与Kafka集成
2016-02-24 18:44 11502在ELKK的架构中,各个框架的角色分工如下: Elastic ... -
Kakfa集群搭建
2016-02-23 15:36 2587先来整体熟悉下Kafka的一些概念和架构 (一)什么是Ka ... -
大数据日志收集框架之Flume入门
2016-02-02 14:25 4140Flume是Cloudrea公司开源的一款优秀的日志收集框架 ... -
Apache Tez0.7编译笔记
2016-01-15 16:33 2425目前最新的Tez版本是0.8,但还不是稳定版,所以大家还 ... -
Bug死磕之hue集成的oozie+pig出现资源任务死锁问题
2016-01-14 15:52 3750这两天,打算给现有的 ... -
Hadoop2.7.1和Hbase0.98添加LZO压缩
2016-01-04 17:46 25501,执行命令安装一些依赖组件 yum install -y ... -
Hadoop2.7.1配置NameNode+ResourceManager高可用原理分析
2015-11-11 19:51 3123关于NameNode高可靠需要配置的文件有core-site ... -
设置Hadoop+Hbase集群pid文件存储位置
2015-10-20 13:40 2779有时候,我们对运行几 ... -
Hadoop+Maven项目打包异常
2015-08-11 19:36 1500先简单说下业务:有一个单独的模块,可以在远程下载Hadoop上 ...
相关推荐
hadoop分区二次排序代码示例,包含基站数据集,对基站数据,按电话号码升序、到达时间降序进行排序,只需打包成jar,即可在hadoop集群中运行
Hadoop 大数据方向 mapreduce计算中的二次排序,讲解透彻
hadoop分区二次排序示例,对基站数据,按电话号码升序、到达时间降序进行排序
mapreduce二次排序,年份升序,按照年份聚合,气温降序
NULL 博文链接:https://xjward.iteye.com/blog/1816821
主要介绍了hadoop二次排序的原理和实现,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
完整的二次排序具有多个层次的排序功能,可以有效提高系统的处理性能。 排序功能分别包括:排序分区、Key值排序、Key值分组 需要注意的是,这多个层次的排序功能均只能针对Key进行,而不能针对Value进行排序。在...
二次排序 联接 map端联接 reduce端联接 边数据分布 利用JobConf来配置作业 分布式缓存 MapReduce库类 第9章 构建Hadoop集群 集群规范 网络拓扑 集群的构建和安装 安装Java 创建Hadoop用户...
大数据学习资料全排序二次排序
二次排序 联接 map端联接 reduce端联接 边数据分布 利用JobConf来配置作业 分布式缓存 MapReduce库类 第9章 构建Hadoop集群 集群规范 网络拓扑 集群的构建和安装 安装Java 创建Hadoop用户 安装Hadoop 测试安装 SSH...
join技术点20 实现semi-join4.1.4 为你的数据挑选最优的合并策略4.2 排序4.2.1 二次排序技术点21 二次排序的实现4.2.2 整体并行排序技术点22 通过多个reducer 对key 进行排序4.3 抽样技术点23 蓄水...
# hadoop-word-predict ...3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),仅供学习参考, 切勿用于商业用途。 --------
10、Mapreduce中value集合的二次排序 ....................... - 38 - 11、Hive SQL手册翻译 ................................... - 47 - 12、Mahout Kmeans简介 .................................... - 57 -
目录 第1章二次排序:简介 19 第2章二次排序:详细示例 42 第3章 Top 10 列表 54 第4章左外连接 96 第5章反转排序 127 第6章移动平均 137 第7章购物篮分析 155 第8章共同好友 182 第9章使用MapReduce实现推荐引擎 ...
技术点21 二次排序的实现 4.2.2 整体并行排序 技术点22 通过多个reducer 对key 进行排序 4.3 抽样 技术点23 蓄水池抽样(reservoir 抽样) 4.4 本章小结 5 优化HDFS 处理大数据的技术 5.1 处理小文件 ...
10、Mapreduce中value集合的二次排序 ....................... - 38 - 11、Hive SQL手册翻译 ................................... - 47 12、Mahout Kmeans简介 .................................... - 57 -
本文将介绍一种通过使用地理网格进行数据关联,并利用Shuffle过程的二次排序实现高效的统计各条道路上位置点分布情况的方法。交通领域正产生着海量的车辆位置点数据。将这些车辆位置信息和道路进行关联的统计操作则...
四川大学IT企业实训,拓思爱诺大数据第二次作业,MapReduce编程,包括Hadoop wordcount程序,及flowcount流量统计程序,包括重写排序及分区函数
03_MapReduce 二次排序回顾及Reduce Join实现详解 04_MapReduce 中Map Join实现思路及伪代码详解 05_Hive重点知识回顾总结及小表与大表关联时MapJoin优化 06_Hive中大表与大表关联时SMB Join优化 07_Hive中高级...