- 浏览: 2148399 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
不得不说,Hadoop确实是处理海量离线数据的利器,当然,凡是一个东西有优点必定也有缺点,hadoop的缺点也很多,比如对流式计算,实时计算,DAG具有依赖关系的计算,支持都不友好,所以,由此诞生了很多新的分布式计算框架,Storm,Spark,Tez,impala,drill,等等,他们都是针对特定问题提出一种解决方案,新框架的的兴起,并不意味者他们就可以替代hadoop,一手独大,HDFS和MapReduce依旧是很优秀的,特别是对离线海量数据的处理。
hadoop在如下的几种应用场景里,用的还是非常广泛的,1,搜索引擎建索引,2,topK热关键词统计,3,海量日志的数据分析等等。
散仙,今天的这个例子的场景要对几亿的单词或短语做统计和并按词频排序,当然这些需求都是类似WordCount问题,如果你把Hadoop自带的WordCount的例子,给搞懂了,基本上做一些IP,热词的统计与分析就很很容易了,WordCount问题,确实是一个非常具有代表性的例子。
下面进入正题,先来分析下散仙这个例子的需求,总共需要二步来完成,第一步就是对短语的统计,第二步就是对结果集的排序。所以如果使用MapReduce来完成的话,就得需要2个作业来完成这件事情,第一个作业来统计词频,第二个来负责进行排序,当然这两者之间是有依赖关系的,第二个作业的执行,需要依赖第一个作业的结果,这就是典型的M,R,R的问题并且作业之间具有依赖关系,这种问题使用MapReduce来完成,效率可能有点低,如果使用支持DAG作业的Tez来做这件事情,那么就很简单了。不过本篇散仙,要演示的例子还是基于MapReduce来完成的,有兴趣的朋友,可以研究一下使用Tez。
对于第一个作业,我们只需要改写wordcount的例子,即可,因为散仙的需求里面涉及短语的统计,所以定义的格式为,短语和短语之间使用分号隔开,(默认的格式是按单词统计的,以空格为分割符)在map时只需要,按分号打散成数组,进行处理即可,测试的数据内容如下:
map里面的核心代码如下:
reduce端的核心代码如下:
main函数里面的代码如下:
运行结果,如下所示:
下面,散仙来分析下排序作业的代码,如上图所示hadoop默认的排序,是基于key排序的,如果是字符类型的则基于字典表排序,如果是数值类型的则基于数字大小排序,两种方式都是按默认的升序排列的,如果想要降序输出,就需要我们自己写个排序组件了,散仙会在下面的代码给出例子,因为我们是要基于词频排序的,所以需要反转K,V来实现对词频的排序,map端代码如下:
reduce端代码如下:
下面,我们再来看下排序组件的代码:
main方法里面的实现代码如下所示:
输出结果,如下所示:
至此,我们可以成功实现,统计并排序的业务,当然这种类型的需求非常多而且常见,如对某个海量日志IP的分析,散仙上面的例子使用的只是测试的数据,而真实数据是对几亿或几十亿的短语构建语料库使用,配置集群方面,可以根据自己的需求,配置集群的节点个数以及map,reduce的个数,而代码,只需要我们写好,提交给hadoop集群执行即可。
最后在简单总结一下,数据处理过程中,格式是需要提前定制好的,也就是说你得很清楚的你的格式代表什么意思,另外一点,关于hadoop的中文编码问题,这个是内部固定的UTF-8格式,如果你是GBK的文件编码,则需要自己单独在map或reduce过程中处理一下,否则输出的结果可能是乱码,最好的方法就是统一成UTF-8格式,否则,很容易出现一些编码问题的。
hadoop在如下的几种应用场景里,用的还是非常广泛的,1,搜索引擎建索引,2,topK热关键词统计,3,海量日志的数据分析等等。
散仙,今天的这个例子的场景要对几亿的单词或短语做统计和并按词频排序,当然这些需求都是类似WordCount问题,如果你把Hadoop自带的WordCount的例子,给搞懂了,基本上做一些IP,热词的统计与分析就很很容易了,WordCount问题,确实是一个非常具有代表性的例子。
下面进入正题,先来分析下散仙这个例子的需求,总共需要二步来完成,第一步就是对短语的统计,第二步就是对结果集的排序。所以如果使用MapReduce来完成的话,就得需要2个作业来完成这件事情,第一个作业来统计词频,第二个来负责进行排序,当然这两者之间是有依赖关系的,第二个作业的执行,需要依赖第一个作业的结果,这就是典型的M,R,R的问题并且作业之间具有依赖关系,这种问题使用MapReduce来完成,效率可能有点低,如果使用支持DAG作业的Tez来做这件事情,那么就很简单了。不过本篇散仙,要演示的例子还是基于MapReduce来完成的,有兴趣的朋友,可以研究一下使用Tez。
对于第一个作业,我们只需要改写wordcount的例子,即可,因为散仙的需求里面涉及短语的统计,所以定义的格式为,短语和短语之间使用分号隔开,(默认的格式是按单词统计的,以空格为分割符)在map时只需要,按分号打散成数组,进行处理即可,测试的数据内容如下:
map里面的核心代码如下:
/** * 统计词频的map端 * 代码 * * **/ public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String [] data=value.toString().split(";");//按每行的分号拆分短语 for(String s:data){ if(s.trim().length()>0){//忽略空字符 word.set(s); context.write(word, one); } } }
reduce端的核心代码如下:
/** * reduce端的 * 代码 * **/ public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get();//累加词频 } result.set(sum); context.write(new Text(key.toString()+"::"), result);//为方便短语排序,以双冒号分隔符间隔 } }
main函数里面的代码如下:
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
运行结果,如下所示:
a good student:: 1 good student:: 3 patient:: 2 patient a:: 1
下面,散仙来分析下排序作业的代码,如上图所示hadoop默认的排序,是基于key排序的,如果是字符类型的则基于字典表排序,如果是数值类型的则基于数字大小排序,两种方式都是按默认的升序排列的,如果想要降序输出,就需要我们自己写个排序组件了,散仙会在下面的代码给出例子,因为我们是要基于词频排序的,所以需要反转K,V来实现对词频的排序,map端代码如下:
/** * 排序作业 * map的实现 * * **/ @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String s[]=value.toString().split("::");//按两个冒号拆分每行数据 word.set(s[0]);// one.set(Integer.parseInt(s[1].trim()));// context.write(one, word);//注意,此部分,需要反转K,V顺序 }
reduce端代码如下:
/*** * * 排序作业的 * reduce代码 * **/ @Override protected void reduce(IntWritable arg0, Iterable<Text> arg1, Context arg2) throws IOException, InterruptedException { for(Text t:arg1){ result.set(t.toString()); arg2.write(result, arg0); } }
下面,我们再来看下排序组件的代码:
/*** * 按词频降序排序 * 的类 * * **/ public static class DescSort extends WritableComparator{ public DescSort() { super(IntWritable.class,true);//注册排序组件 } @Override public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) { return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序 } @Override public int compare(Object a, Object b) { return -super.compare(a, b);//注意使用负号来完成降序 } }
main方法里面的实现代码如下所示:
public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job=new Job(conf, "sort"); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setMapperClass(SortIntValueMapper.class); job.setReducerClass(SortIntValueReducer.class) ; job.setSortComparatorClass(DescSort.class);//加入排序组件 job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }
输出结果,如下所示:
good student 3 patient 2 a good student 1 patient a 1
至此,我们可以成功实现,统计并排序的业务,当然这种类型的需求非常多而且常见,如对某个海量日志IP的分析,散仙上面的例子使用的只是测试的数据,而真实数据是对几亿或几十亿的短语构建语料库使用,配置集群方面,可以根据自己的需求,配置集群的节点个数以及map,reduce的个数,而代码,只需要我们写好,提交给hadoop集群执行即可。
最后在简单总结一下,数据处理过程中,格式是需要提前定制好的,也就是说你得很清楚的你的格式代表什么意思,另外一点,关于hadoop的中文编码问题,这个是内部固定的UTF-8格式,如果你是GBK的文件编码,则需要自己单独在map或reduce过程中处理一下,否则输出的结果可能是乱码,最好的方法就是统一成UTF-8格式,否则,很容易出现一些编码问题的。
发表评论
-
Apache Flink在阿里的使用(译)
2019-02-21 21:18 1059Flink是未来大数据实时 ... -
计算机图形处理的一些知识
2018-04-25 17:46 1192最近在搞opencv来做一些 ... -
如何在kylin中构建一个cube
2017-07-11 19:06 1208前面的文章介绍了Apache Kylin的安装及数据仓 ... -
Apache Kylin的入门安装
2017-06-27 21:27 2112Apache Kylin™是一个开源的分布式分析引擎,提供 ... -
ES-Hadoop插件介绍
2017-04-27 18:07 1948上篇文章,写了使用spark集成es框架,并向es写入数据,虽 ... -
如何在Scala中读取Hadoop集群上的gz压缩文件
2017-04-05 18:51 2090存在Hadoop集群上的文件,大部分都会经过压缩,如果是压缩 ... -
如何收集项目日志统一发送到kafka中?
2017-02-07 19:07 2749上一篇(http://qindongliang.iteye. ... -
Hue+Hive临时目录权限不够解决方案
2016-06-14 10:40 4641安装Hue后,可能会分配多个账户给一些业务部门操作hive,虽 ... -
Hadoop的8088页面失效问题
2016-03-31 11:21 4381前两天重启了测试的hadoop集群,今天访问集群的8088任 ... -
Hadoop+Hbase集群数据迁移问题
2016-03-23 21:00 2478数据迁移或备份是任何 ... -
如何监控你的Hadoop+Hbase集群?
2016-03-21 16:10 4834前言 监控hadoop的框架 ... -
Logstash与Kafka集成
2016-02-24 18:44 11508在ELKK的架构中,各个框架的角色分工如下: Elastic ... -
Kakfa集群搭建
2016-02-23 15:36 2592先来整体熟悉下Kafka的一些概念和架构 (一)什么是Ka ... -
大数据日志收集框架之Flume入门
2016-02-02 14:25 4147Flume是Cloudrea公司开源的一款优秀的日志收集框架 ... -
Apache Tez0.7编译笔记
2016-01-15 16:33 2430目前最新的Tez版本是0.8,但还不是稳定版,所以大家还 ... -
Bug死磕之hue集成的oozie+pig出现资源任务死锁问题
2016-01-14 15:52 3758这两天,打算给现有的 ... -
Hadoop2.7.1和Hbase0.98添加LZO压缩
2016-01-04 17:46 25521,执行命令安装一些依赖组件 yum install -y ... -
Hadoop2.7.1配置NameNode+ResourceManager高可用原理分析
2015-11-11 19:51 3130关于NameNode高可靠需要配置的文件有core-site ... -
设置Hadoop+Hbase集群pid文件存储位置
2015-10-20 13:40 2783有时候,我们对运行几 ... -
Hadoop+Maven项目打包异常
2015-08-11 19:36 1505先简单说下业务:有一个单独的模块,可以在远程下载Hadoop上 ...
相关推荐
这里提出了一种基于分布式计算技术进行管理和存储海量海洋科学数据方法,构建了海量海洋科学数据存储平台解决方案,采用Linux集群技术,设计开发一个基于Hadoop的海量数据存储平台.系统由五大模块组成,有系统管理模块、...
本书介绍了Hadoop技术的相关知识,并将理论知识与实际项目相结合。全书共分为三个部分:基础篇、应用篇和总结篇。
基于Hadoop的海量数据处理模型研究和应用
基于Hadoop的海量数据管理系统,是PDF文件。
接着,对基于 Hadoop 的海量交易记录查询系统进行了设计和实现, 主要包括数据接入层、存储层和查询层;为了解决集群中节点时间不同步的问题,设计并实 现了 TSS 时间同步子系统。最后,通过具体的测试用例对系统...
基于Hadoop的地震数据分析统计.rar
云计算大作业使用Hadoop对美国新冠肺炎疫情数据分析项目。 实验内容 统计指定日期下,美国每个州的累计确诊人数和累计死亡人数。 对实验1的结果按累计确诊人数进行倒序排序。(重写排序规则) 对实验1的结果再运算,...
hadoop海量数据处理技术详解,包括hdfs、MapReduce、hive、sqoop等相关技术和伪代码,代码是使用python语言写的。
基于Hadoop的海量数据分析系统设计吗,完整版学位论文。
Hadoop海量数据处理 Hadoop海量数据处理 Hadoop海量数据处理
豆瓣用户每天都在对“看过”的电影进行“很差”到“力荐”的评价,豆瓣根据每部影片看过的人数以及该影片所得的评价等综合数据,通过算法分析产生豆瓣电影 Top 250。 为了分析电影产业的发展趋势,本次实验需要对...
数据管理问题"Hadoop是依照coogle的oFS分布式文件系统与MapReduee并 行编程框架的开源实现,主要用于W七b数据的管理和挖掘,在存储与管理图像 数据方面存在不足"本文首先扩展了Hadoop的相应功能模块,设计和开发了一 个...
基于Hadoop的海量电信数据云计算平台研究.pdf
hadoop海量数据处理.pdfhadoop海量数据处理.pdfhadoop海量数据处理.pdfhadoop海量数据处理.pdfhadoop海量数据处理.pdfhadoop海量数据处理.pdf
这是一个Eclipse中复制出来的MapReduce工程文件,如果你下载了源代码,需要部署在Linux环境下的Eclipse中,并且,这个Eclipse必须已经安装了hadoop开发插件,能够做Hadoop开发。不然,会提示找不到hadoop开发包。
基于Hadoop的海量数据平台
Hadoop豆瓣电影数据分析(Hadoop)操作源码
在处理海量数据的时候,传统的单机方法面临着数据存储和计算的瓶颈。本文提出了 利用开源框架Hadoop 处理海量数据方法,以弥补传统方法在这方面的缺陷与不足.