大数据生态圈

主要把精力集中在hadoop生态系统和spark生态系统上:
大数据技术本身是个很宽泛的概念,简单来说都是为了处理超过单机的内存和运算的数据的处理和应用而诞生的。
大数据技术的介绍:
1、存储,我们需要了解在大数据的架构下,数据大致是怎么进行存储的,传统的文件系统是单机的,不能横跨不同的机器。HDFS(Hadoop Distributed FileSystem)的设计本质上是为了大量的数据能横跨成百上千台机器,但是用户在实际的应用中,看到的是一个文件系统而不是多个文件系统。比如要获取/hdfs/tmp/file1的数据,看起来和单机无异,引用的是一个文件路径,但是实际的数据存放在很多不同的机器上。作为用户,不需要知道数据具体是怎么存储在分布式系统中的,就好比在单机上我们不关心文件分散在什么磁道什么扇区一样。HDFS会自动为你管理这些数据;
2、数据处理,解决了存储的问题之后,我们就要开始考虑如何处理数据了,虽然HDFS可以整体管理不同机器上的数据,但是这些数据太大了。一台机器承载着也是巨量的数据,使用一台机器跑也许需要好几天甚至好几周。那么,一个非常自然的想法,既然数据可以在多台机器上存储,难道不能使用多台机器来处理吗?这就好比我们常使用的多核计算,类比来看,在大数据中,一台机器对应的就是一个核,因此同样,和多核计算一样,我们面临着如何分配不同机器工作的问题,如果一台机器挂了如何重新启动相应的任务,机器之间如何互相通信交换数据以完成复杂的计算等等,这就是MapReduce架构;
3、Mapreduce mapreduce不仅仅提供了技术上的架构,更重要的是其思想,对于巨大型任务的处理思路,其实很简单,先map,然后reduce,它的思想和多核计算是非常相似的:
Mapreduce
MapReduce实际上是一种编程模型,主要是用于处理大规模数据集,其实现核心逻辑实际上是跟分治方法是统一的。在这个编程模型下,用户只需要关心两个函数,一个是map函数,用于处理一个键值对,然后生成一个中间键值对的数据集合。另一个是reduce函数,是用来将map产生的中间键值对数据集根据相同中间键来进行合并操作。这种编程模型自然而然的是可以通过在一个集群上进行并行的处理。整个系统需要做的是划分输入数据、调度作业任务与机器、处理机器故障以及管理机器间的通信等等,但是这些问题并不是用户需要操心的,当一个完备的集群方案确定后,用户完全不用去了解并行分布式系统的操作流程,便可以进行最大化利用分布式集群的功能(作为一个应用人员,还是需要深入了解以下这些基本概念的,重点在使用工具的熟练度和性能优化的基本方法的掌握上)。
一、编程模型
用户在使用MapReduce的时候只需要定义map和reduce函数。map函数会处理输入键值对(类似于python中dict的概念)然后产生一个中间键值对数据集,然后这个编程模型会将所有中间键相同的数据组合起来,并将它们传递给reduce函数。而reduce函数则是接受某个中间键和该键的一个数据集合,他会根据需求去合并这些数据,从而产生一个更小的数据集,中间数据是通过迭代的方式来提供给reduce函数的,这样就可以处理内存无法满足的大量数据。
二、pyspark中map reduce的一个直观的代码demo:
##################################### 例子1 #####################################
## 建立第一个RDD --- sparkContext
sc=spark.sparkcontext
wordsList = ['cat','elephant','rat','cat']
wordsRDD = sc.parallelize(wordsList,4)
print(type(wordsRDD))
## map函数 --- 将function 套用到rdd中的每个元素
def makePlural(word):
return word + 's'
## 调用该函数
print(makePlural('cat)) --- cats
appliedRDD = wordsRDD.map(makePlural)
## 可以看到这里没有一个显式的key value的字典式的数据结构,map-reduce是一种分而治之的分布式数据处理的思想,而不是纯粹
## 流于形式的一些解释,当然,这里也可以简单认为每一个value都有一个“隐式”的key ,这个key可以是value的index。
## collect ---- 将RDD元素送回master并转换为list
## reduce操作,将map端计算的结果进行聚合
print(appliedRDD.collect())
print(type(appliedRDD.collect())) --list
## 使用lambda函数 --- lambda函数逻辑
lambdaRDD = wordsRDD.map(lambda word: word + 's')
print(appliedRDD.collect())
##################################### 例子2 #####################################
## 这个例子就具有显式的key-value的字典式数据结构了
## 使用成对rdd来做计算 -- (key,values)存储RDD
[('key',value),('key2',value2)]
pairRDD = wordsRDD.map(lambda word: (word,1))
print(pairRDD)
print(pairRDD.take(1))
pairRDD.collect()
## 将资料依照key重新排序
wordsGrouped = pairRDD.groupByKey()
for key,value in wordsGrouped.collect():
print({0}:{1}.format(key,list(value)))
## 依照key值加总
wordCountsGrouped = wordsGrouped.map(lambda (k,v):(k,sum(v)))
wordCountsGrouped.collect()
## 寻找不重复值 --distinct
uniquewords = wordRDD.map(lambda word:(word,1)).distinct()
print(uniquewords.collect())
## count
countuniquewords = wordRDD.map(lambda word:(word,1)).distinct().count()
print(countuniquewords)
## 计算每个字平均出现几次
wordscounts = {('cat',2),('elephant',1),('rat',2)}
wordCountRDD = sc.parallelize(wordCounts)
## map -- 找到值,reduce--求和
totalCount = wordCountRDD.map(lambda (x,y):y).reduce(lambda x,y:x+y)
average = totalCount / wordCountRDD.distinct().count()
print(average)
map-reduce的精髓在于提供了分布式的一种解决问题的思路,即将一个大的任务,通过map方式拆分成大量的子任务分别对应到集群中不同的计算节点上去计算,然后将不同计算节点的计算结果进行reduce的聚合得到最终的计算结果。因此,map-reduce时代我们要解决某些分布式计算的问题,就要按照这样的思维方式/编程框架/编程思想。。。etc 去将任务拆解为map-reduce的范式。
三、一个生动的案例:词频统计
Hive的问世
上述的过程听起来简单,但是实现上是非常繁琐的,很多时候,例如对于词频的计算,我们要自己去手动编写一个词频计算的map function,这对于数据处理与分析来说,实在太麻烦了,这就好比我想要去计算某一个特征的取值情况,通过pandas的value_counts()功能就可以轻松实现,但是现在我要自己先写个map函数将feature拆分成一大堆的键值对并进行grouping,然后还要写一个reduce函数对map的输出结构进行求和;
我们希望有个更高层更抽象的语言层来描述算法和数据处理流程,于是,就诞生了Hive和pig。Pig是接近脚本方式去描述MapReduce(没用过),Hive则用的是类SQL的语法(hive sql,对于经常写sql同学来说非常简单容易上手)。pig和hive分别把脚本和SQL语言翻译成MapReduce程序,丢给计算引擎去计算,这样我们就从繁琐的MapReduce程序中解脱出来。(hive sql 转化为map reduce的的原理可见:菜鸟:hive sql语句转换成mapreduce)
Hive逐渐成长成了大数据仓库的核心组件(因为sql本身没什么技术门槛,一般的非技术人员也可以很快上手),甚至很多公司的流水线作业集完全是用hive SQL描述,因为易写易改,一看就懂,容易维护。
但是hive也有一个问题,Hive底层执行使用的是MapReduce引擎,仍然是一个批处理过程,难以满足查询的交互性,比如我们写一个数据分析报告,我们想要去统计譬如每个用户分别浏览了多少商品或者每个商品被多少用户浏览过,当数据量非常大的时候,hive的查询效率是偏低的,于是又诞生了impala等分布式交互查询数据库,其底层做了非常多的性能优化大大提升了交互式查询,数据分析的效率;
再后来,spark的横空出世支撑了hive on spark和sparksql(也就是我们熟悉的spark dataframe的父集),尤其是spark dataframe,不仅对于sql使用者来说友好,对于pandas的使用者也是较为友好的;
这个时候,一个完整的大数据处理分析的框架就已经跃然纸上了:
1、最底层的hdfs存储数据解决数据存储问题
2、底层的mapreduce的计算架构或spark的计算架构;
3、中层的hive或pig或者impala等;
4、上层的hive on spark或是直接使用sparksql(目前最常用的spark dataframe)
5. 在spark dataframe之上,后续又出现了进一步的优化和api的简化,其中最有名的就是koalas了,在spark dataframe上进一步的优化以及更加简单的几乎和pandas完全一样的api
最后,作为小白用户,我们直接可以通过例如sparksql或hivesql on spark来进行海量数据的快速分析与数据分析报告甚至是基本的特征工程等;这个时候,我们实际上已经较好的解决了没有实时需求的海量数据的离线处理和分析问题,后续的模型训练,我们可以通过分布式的xgb、lgb,tf等框架或者是spark ml等进行模型的训练和验证等工作;
streaming computation:
关于流计算的描述,具体可见:
Hadoop 生态圈总结
初代的hadoop1.0只有一个简单的mapreduce编程框架构建在HDFS分布式文件管理系统上,而在Hadoop2.0中,YARN负责管理MapReduce中的资源(例如有十个数据分析师同时使用一个集群来做一些海量数据的分析建模的任务,每个人都申请了一部分资源,此时yarn就负责从当前集群中闲置的内存,CPU等中拿出一部分分配给不同的分析师使用)并且将其打包成Container,这样可以精简MapReduce,使用户专注于其擅长的数据处理任务,无需考虑资源调度的麻烦问题。
组件1:YARN
1、提交MapReduce作业的客户端
2、YARN资源管理器,负责协调集群上计算资源的分配
3、YARN节点管理器,负责启动和监视集群中机器上的计算容器(container)
4、MapReduce应用程序master负责协调运行MapReduce作业的任务,它和MapReduce任务在容器中运行,这些容器由资源管理器分配并由节点管理器进行管理分布式文件系统(HDFS)用来于其他实体间共享作业文件;
组件2:HDFS
1、超大文件存储
2、流式数据访问(一次写入,多次读取最高效)
3、对硬件要求低,可运行在廉价的商用服务器上
4、数据访问延迟高(HDFS是为高数据吞吐量应用优化的,可能会以提高时间延迟为代价,而HBase更适用低延迟的访问需求)
5、Namenode将文件系统的元数据存在内存中,因此该文件系统的存储量受限于Namenode的内存容量,小 文件多的情况。
6、HDFS中的文件可能只有一个writer,而且写操作总是将数据添加在文件的末尾。不支持多个写入者操作,也不支持在文件的任意位置进行修改。
组件3:HIVE
Hive是Facebook开发的,构建于Hadoop集群上的数据仓库应用。 2008年年Facebook将Hive项目贡献给Apache,成为开源项目; Hive是一个SQL解析引擎,他将SQL语句句转译成MapReduce作业并 在Hadoop上执行; Hive表是HDFS的一个文件目录,一个表名对应一个目录名,如果有分区表的话,则分区值对应子目录名称;
因为平常打交道最多的还是hive和pyspark,所以主要关注这两个大数据工具;;
hive的由来
随着数据量量的增加,mysql的某些查询需要几个小时甚至几天才能完成。 当数据达到1T的时候,Mysql进程垮掉。
对数据库进行底层的大量优化,使得ORACLE可以应付几T的数据,但收集用户点击流的数据(每天大约400G)时,ORACLE 也开始支撑不住;
hadoop的问世,有效解决了了数据大规模存储与统计分析的问题,但是MapReduce程序对于普通分析人员的使用过于复杂以及繁琐;
hive对外提供类似SQL语法的HQL语句句数据接口,自动将HQL语句句转化为MR作业,在Hadoop上执行,大大降低了分析人员使用Hadoop进行行数据分析的难度,因为hivesql和sql有非常多语法上的相似甚至相同之处,而sql是一门跟python一样非常容易上手的语言;
hive的特点:
1、支持索引,加快数据查询;
2、支持不同的存储类型,例如,纯文本文件、HBase 中的文件等;
3、将元数据保存在关系数据库中,这大大减少了在查询过程中执行语义检查的时间;
4、可以直接使用存储在Hadoop文件系统即HDFS中的数据;
5、内置大量常用函数来操作时间、字符串和其它类型数据,可以进行基本的数据挖掘工具来使用,支持用户扩展UDF函数来完成内置函数无法实现的操作;
6、类SQL的查询方式,将SQL查询转换为MapReduce的job从而在Hadoop集群上执行。
hive数据类型
简单数据类型:
复杂数据类型:
复杂数据类型的声明必须使用尖括号指明其中数据字段的类型。定义三列,每列对应一种复杂的数据类型,如下所示。
CREATE TABLE complex(
col1 ARRAY< INT>,
col2 MAP< STRING,INT>,
col3 STRUCT< a:STRING,b:INT,c:DOUBLE>
)
hive的文件格式
TEXTFILE //文本,默认值
SEQUENCEFILE // 二进制序列文件
RCFILE //列式存储格式文件 Hive0.6以后开始支持
ORC //列式存储格式文件,比RCFILE有更高的压缩比和读写效率,Hive0.11以后开始支持
PARQUET //列出存储格式文件,Hive0.13以后开始支持
hivesql
hive sql常见命令大全:
Spark生态圈:
在介绍spark生态圈之前,我们总结一下hadoop:
Hadoop就是存储海量数据和分析海量数据的工具,Hadoop是由java语言编写的,在分布式服务器集群上存储海量数据并运行分布式分析应用的开源框架,其核心部件是HDFS与MapReduce。HDFS是一个分布式文件系统:引入存放文件元数据信息的服务器Namenode和实际存放数据的服务器Datanode,对数据进行分布式储存和读取。
MapReduce是一个计算框架:MapReduce的核心思想是把计算任务分配给集群内的服务器里执行。通过对计算任务的拆分(Map计算/Reduce计算)再根据任务调度器(JobTracker)对任务进行分布式计算。(mapreduce框架的作用 类似于在github上做contribute,作者已经写好了框架,然后给contributer开放了 contribute的框架代码,你只要把逻辑写到框架代码里就可以,不需要再考虑别的事情)
Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据提供了计算。
把HDFS理解为一个分布式的,有冗余备份的,可以动态扩展的用来存储大规模数据的大硬盘。 把MapReduce理解成为一个计算引擎,按照MapReduce的规则编写Map计算/Reduce计算的程序,可以完成计算任务。
Hadoop能干什么
大数据存储:
分布式存储日志处理:擅长日志分析
ETL:数据抽取到oracle、mysql、DB2、mongdb及主流数据库
机器学习: 比如Apache Mahout项目
搜索引擎:Hadoop + lucene实现
数据挖掘:目前比较流行的广告推荐,个性化广告推荐
Hadoop是专为离线和大规模数据分析而设计的,并不适合那种对几个记录随机读写的在线事务处理模式。
然后我们来谈谈spark,首先大家需要知道的是,虽然我们常说spark生态这个词,但是spark和hadoop并不是等同的概念,前者从属于后者,Hadoop当初作为一种大数据技术横空出世,经过多年的发展,Hadoop已经不单单指某一个技术,而是一个完整的大数据生态。实际上Spark对应的是Hadoop中的MapReduce部分。 那么为什么后世要创造spark来取代mapreduce呢?
1. Reduce需要在Map后完成,如果数据没有合理的分割,则整个流程将会大大延时
2. Map与Reduce在处理复杂逻辑上有些力不从心
3. 性能瓶颈,因为MapReduce处理的中间结果需要存放在HDFS上,所以写入写出时间大大影响了性能
对于机器学习和深度学习来说,map-reduce的第三个缺点是致命的,我们知道主流的机器学习算法,无论是lr、gbdt还是深度学习,模型训练的过程中都需要经过大量的迭代,早期使用hadoop的mahout——基于mapreduce编程范式的机器学习框架,因为大量的迭代,每次迭代都需要把中间结果(例如梯度)存放到hdfs上,然后下一次迭代的时候再读取出来,通信开销非常大,导致整个模型训练的过程非常的慢:
spark为什么会比hadoop快,原因太多了,就类似于lightgbm为什么比xgb快的问题,主要原因在于直方图算法,但是工程上的优化也有很多,比如说直方图做差加速,Cache命中率优化,多线程优化etc,
这些因素共同决定了lgb的速度快于xgb,那么延申来看,spark比hadoop快也有一个核心原因,
Spark和MapReduce的计算都发生在内存中,区别在于:
MapReduce通常需要将计算的中间结果写入磁盘,然后还要读取磁盘,从而导致了频繁的磁盘IO。
Spark则不需要将计算的中间结果写入磁盘,这得益于Spark的RDD(弹性分布式数据集,很强大)和DAG(有向无环图),其中DAG记录了job的stage以及在job执行过程中父RDD和子RDD之间的依赖关系。中间结果能够以RDD的形式存放在内存中,且能够从DAG中恢复,大大减少了磁盘IO。
当然还有很多其它的原因,但是个人认为,尤其是对于机器学习或深度学习应用而言,加速的最主要原因在此。
Apache Spark是一个开源的、强大的分布式查询和处理引擎。它提 供MapReduce的灵活性和可扩展性,但速度明显更高:当数据存储在内存中时,它比Apache Hadoop快100倍,访问磁盘时高达10倍。
Apache Spark允许用户读取、转换、聚合数据,还可以轻松地训练和部署复杂的统计模型。Java、Scala、Python、R和SQL都可以访问 Spark API。Apache Spark可用于构建应用程序,或将其打包成为要部署 在集群上的库,或通过笔记本(notebook)(例如Jupyter、Spark- Notebook、Databricks notebooks和Apache Zeppelin)交互式执行快速的分析。我们可以简单的理解为,spark的功能之一是大数据版本的pandas(不过和pandas相似度更高的是dask,dask在国外的一些公司已经在使用了,国内目前主要还是spark)
Apache Spark提供的很多库会让那些使用过Python的pandas或R语言 的data.frame或者data.tables的数据分析师、数据科学家或研究人员觉得 熟悉。非常重要的一点是,虽然Spark DataFrame会让pandas或 data.frame、data.tables用户感到熟悉,但是仍有一些差异,所以不要期望过高。具有更多SQL使用背景的用户也可以用该语言来塑造其数据(spark sql)。 此外,Apache Spark还提供了几个已经实现并调优过的算法、统计模型 和框架:为机器学习提供的MLlib和ML,为图形处理提供的GraphX和 GraphFrames(python api),以及Spark Streaming(DStream和Structured)。Spark允许 用户在同一个应用程序中随意地组合使用这些库。
Apache Spark可以方便地在本地笔记本电脑上运行,而且还可以轻 松地在独立模式下通过YARN或Apache Mesos于本地集群或云中进行部 署。它可以从不同的数据源读取和写入,包括(但不限于)常规的数据库、csv、txt格式文件、Hive、HDFS、 Apache Cassandra、Apache HBase和S3:
spark的执行过程和hadoop的mapreduce过程类似,不过spark是基于内存运行的,较少涉及到频繁的hdfs的读写操作,因此效率高得多;
RDD和DAG
前面提到过spark是基于内存运算的,不需要硬盘多次读写,这里就引出了RDD的概念,基于分布式内存的数据抽象。RDD的全称叫做Resilient Distributed Datasets,即弹性分布式数据集,基于RDD,Apache Spark就是围绕着RDD而构建的。我们使用 Python时,尤为重要的是要注意Python数据是存储在JVM对象中的。这些对象允许作业非常快速地执行计算。对RDD的计算依据缓存和存储在内存中的模式进行,与其他传统分布式框架(如Apache Hadoop)相比,该模式使得计算速度快了一个数量级。
当然,RDD这个概念十分难以理解,它并不是一个实际存在的东西,而是一个逻辑上的概念,在实际的物理存储中,真实的数据仍然是存放在不同的节点中。它具有以下几个特性:
分区
不可变
能并行操作
分区
区的意思是,同一个RDD中的数据存储在集群不同的节点中,正是这个特性,才能保证它能够被并行处理。前面提到,RDD是一个逻辑上的概念,它只是一种数据的组织形式,我们可以用下图来说明这个组织结构:
不可变
数据仍然是分布在集群中的各个节点,RDD中不存放任何数据,但是每个分区有它在RDD中的一个index,通过RDD自己的ID和分区的index可以确定每个数据块的编号,从而能够提取到相应的数据进行操作。
每一个RDD都是只读的,包含的分区信息不可以被改变。因为已有的RDD无法被改变,所以每次对数据的操作,会产生新的RDD作为结果。每次产生的新RDD,我们需要记录它是通过哪个RDD进行转换操作得来,因此新老RDD存在依赖关系,这样做的一个好处是不需要将每一步产生的数据结果进行存储,如果某一步失败了,只需要回滚至它的前一步RDD再次进行操作,而不需要重复所有的操作。具体依赖的细节这里不再阐述,实现逻辑比较复杂,我们不需要太过深入探讨和研究,只要了解概念即可。
并行操作
之前提到同一个RDD中的数据存储在集群不同的节点中,正是这个特性,才能保证它能够被并行处理。因为不同节点的数据可以被分别处理,比如现在一群人手中都分别拿着几种水果,如果现在要给这些水果按照种类顺序削皮,例如先削苹果,后削梨,最后削桃子,肯定是一种水果分别在不同的人手上才能完成并行的任务。如果一个人手上都是苹果,一个人手上都是梨,那只能等一个人削完另一个人才能继续。
总结
相比MapReduce,Spark做出了几个改进,从而获得了性能大幅度的提升。 Spark将操作的数据放入内存中,而不是硬盘,这让读写速度大大提升;
Spark任务中每一步操作产生的结果并不需要写入硬盘,而是只记录操作之间的依赖关系,因此提高了容错率,并大大降低了恢复任务的成本;
使用分区的方式,让数据能够并行处理;
所以,Spark是一个分布式计算框架,相当于MapReduce的改进版,支持基于内存的迭代计算,大多数情况下Spark要搭配Hadoop来处理HDFS上的数据,因此,做海量数据存储,无疑只能选Hadoop了,Hadoop的HDFS可以看作是业内的分布式存储标准,而Spark只能用来跑计算无法取代Hadoop。如果涉及到HDFS上的数据处理,那么Hadoop + Spark是最佳选择。相比MapReduce,使用Spark处理数据不仅可以得到10倍以上的性能提升,而且Spark的RDD相关API丰富且支持SQL对数据做处理(此外还支持python 、R),MapReduce在开发的效率上无法与之匹敌。
Spark dataframe 和 Spark sql
DataFrame是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库中的表。SchemaRDD作为Apache Spark 1.0 版本中的试验性功能,它在Apache Spark 1.3版本中被命名为 DataFrame。对于熟悉Python pandas DataFrame或者R DataFrame的读 者,Spark DataFrame是一个近似的概念,即允许用户轻松地使用结构化 数据(如数据表)。
通过在分布式数据集上施加结构,让Spark用户利用Spark SQL来查询结构化的数据或使用Spark表达式方法。下面,我们将给出两种方法的代码示例。通过构建数据,使得Apache Spark引擎——具体来说就是catalyst优化器(Catalyst Optimizer)——显 著提高了Spark的查询性能。Spark早期的API中(即RDD),由于Java JVM和Py4J之间的通信开销,使用Python执行的查询会明显变慢。
每当使用RDD执行PySpark程序时,潜在地需要巨大的开销来执行 作业。如下图所示,在PySpark驱动器中,Spark Context通过Py4j启动一 个使用JavaSparkContext的JVM。所有的RDD转换最初都映射到Java中的 PythonRDD对象。 一旦这些任务被推送到Spark工作节点,PythonRDD对象就使用管 道(pipe)启动Python的子进程(subprocess),发送代码和数据到 Python中进行处理;
虽然该方法允许PySpark将数据处理分布到多个工作节点的多个 Python子进程中,但是如你所见,Python和JVM之间还是有很多上下文 切换和通信开销的。
spark sql拯救世界! 因为spark sql在各个语言之间的性能都很高并且达到了较好的平衡
Spark SQL引擎如此之快的主要原因之一是 Catalyst优化器。对于拥有数据库背景的读者,这张图看起来类似于关 系数据库管理系统(RDBMS)的逻辑/物理计划和成本模型/基于成本的 优化。其意义在于,相对立即处理查询来说,Spark引擎的Catalyst优化器 编译并优化了逻辑计划,而且还有一个能够确保生成最有效的物理计划 的成本优化器。如下图:
DataFrame和Catalyst优化器(以及Tungsten项目)的意义是在和非 优化的RDD查询比较时增加PySpark查询的性能。如下图所示,引入 DataFrame之前,Python查询速度普遍比使用RDD的Scala查询慢(后者 快两倍)。通常情况下,这种查询性能的降低源于Python和JVM之间的 通信开销:
利用DataFrame,PySpark往往明显加快,但也有一些例外。最典型的是Python UDF的使用,导致在Python和 Java虚拟机之间的往返通信。请注意,这将是最坏的情况,如果计算基 于RDD来做,情况将会是相似的,因此在使用dataframe的时候尽量使用其内置的方法,如果迫不得已需要处理复杂的逻辑,要不然就是放弃一部分速度使用python来写函数逻辑,要不然就是使用scala来写内部逻辑,后者需要对scala有较好的掌握基础;
总之,Python DataFrame和SQL、Scala DataFrame以及R DataFrame 都能够利用Catalyst优化器(按照以下更新的图)
Spark Sql体系
Catalyst优化器
Spark SQL是Apache Spark最具技术性的组件之一,因为它支持SQL 查询和DataFrame API(spark dataframe也很容易和spark sql查询语句进行交互)。Spark SQL的核心是Catalyst优化器。优化器基于函数式编程结构,并且旨在实现两个目的:简化向Spark SQL添加新的 优化技术和特性的条件,并允许外部开发人员扩展优化器(例如,添加 数据源特定规则,支持新的数据类型等等):
Tungsten
Tungsten(钨丝)是Apache Spark执行引擎项目的代号。该项目的重点是改进Spark算法,使它们更有效地使用内存和CPU,使现代硬件的性能发挥到极致。 该项目的工作重点包括:
·显式管理内存,以消除JVM对象模型和垃圾回收的开销。
·设计利用内存层次结构的算法和数据结构。
·在运行时生成代码,以便应用程序可以利用现代编译器并优化 CPU。
·消除虚拟函数调度,以减少多个CPU调用。
·利用初级编程(例如,将即时数据加载到CPU寄存器),以加速 内存访问并优化Spark的引擎,以有效地编译和执行简单循环;
总结
SparkSQL体系结构如上图所示,整体由上到下分为三层:编程模型层、执行任务优化层以及任务执行引擎层,其中SparkSQL编程模型可以分为SQL和DataFrame两种;执行计划优化又称为Catalyst,该模块负责将SQL语句解析成AST(逻辑执行计划),并对原始逻辑执行计划进行优化,优化规则分为基于规则的优化策略和基于代价的优化策略两种,最终输出优化后的物理执行计划;任务执行引擎就是Spark内核,负责根据物理执行计划生成DAG,在任务调度系统的管理下分解为任务集并分发到集群节点上加载数据运行,Tungsten基于对内存和CPU的性能优化,使得Spark能够更好地利用当前硬件条件提升性能;
说到计算模型,批处理计算从最初提出一直到现在,一共经历了两次大的变革,第一次变革是从MR编程模式到RDD编程模型,第二次则是从RDD编程模式进化到DataFrame模式。
第一次变革:MR编程模型 -> RDD编程模型
和MR计算模型相比,RDD计算模型有很多改进:
可以支持更多的算子,比如filter算子、sum算子等,不再像MR只支持map和reduce两种
更加灵活的存储机制,RDD可以支持本地硬盘存储、缓存存储以及混合存储三种模式,用户可以进行选择。而MR目前只支持HDFS存储一种模式。很显然,HDFS存储需要将中间数据进行存储,而RDD则不需要,这是RDD编程模型效率高的一个重要原因之一。
RDD模型带来了更细粒度的任务并发,不再像MR那样每次起个任务就要起个JVM进程;另外,RDD模型带来了另一个利好是很好的容错性,一个任务即使中间断掉了,也不需要从头再来一次。
延迟计算机制一方面可以使得同一个stage内的操作可以合并到一起落在一块数据上,而不再是所有数据先执行a操作、再扫描一遍执行b操作,太浪费时间。另一方面给执行路径优化留下了很灵活的操作空间;
所有这些改进使得RDD编程模型相比MR编程模型,性能可以有10~100倍的提升!然而,RDD计算模型就很完美吗?要知道,用户手写的RDD程序基本或多或少都会有些问题,性能也肯定不会是最优的。如果没有一个高手指点或者优化,性能依然有很大的优化潜力。这就是促成了第二次变革,从RDD编程模型进化到DataFrame编程模型。
Spark 1.6中引入的Spark Dataset旨在提供一个API,允许用户轻松地 表达域对象的转换,同时还提供了具有强大性能和优点的Spark SQL执行引擎。
DataFrame和Dataset API的统一使创建向后兼容的重大改变成为可 能。这是Apache Spark 2.0成为主要版本(相对1.x这种重大改变很少的 次要版本而言)的主要原因之一。从下图中可以看出,DataFrame和 Dataset都属于新的Dataset API,作为Apache Spark 2.0的一部分被引入进来:
如前所述,Dataset API提供了一种类型安全的面向对象的编程接 口。通过将表达式和数据字段暴露给查询计划器和Project Tungsten的快 速内存编码,Dataset可以利用Catalyst优化器。但是现在DataFrame和 Dataset已统一为Apache Spark 2.0的一部分,DataFrame现在是未类型化 的Dataset API的一个别名。进一步来说:
本文转自 知乎,原文链接:https://zhuanlan.zhihu.com/p/396717070,如需转载请自行联系原作者