最新Spark总结.docx
- 文档编号:10666368
- 上传时间:2023-05-27
- 格式:DOCX
- 页数:30
- 大小:1.21MB
最新Spark总结.docx
《最新Spark总结.docx》由会员分享,可在线阅读,更多相关《最新Spark总结.docx(30页珍藏版)》请在冰点文库上搜索。
最新Spark总结
Spark
1基本概念
1.1什么是Spark
Spark是一种计算框架,是与mapreduce不一样的计算框架。
他与Hadoopmapreduce相比具有以下优势:
1)Spark通过将中间结果缓存在内存,而不是磁盘,因此很适合于多阶段的作业,如需多次迭代的机器学习。
而mapreduce则将中间结果每次都存到磁盘,速度下降很多。
2)Spark在通信方面采用Akaa框架的(角色)Actor模型,并通过线程池复用线程来避免进程或线程启动和切换开销。
而Hadoopmapreduce最初的设计是为了离线批量计算大文件,运行都是好几个小时,所以作业调度中秒级的开销根本没考虑和优化。
3)Spark任务在Shuffle中不是所有情景都需要排序。
而MapReduce在数据Shuffle之前花费了大量的时间来排序。
总之,Spark在速度上要比mapreduce快很多。
在流式计算方面,Spark流计算与Storm相比,速度不及Storm。
Storm可以达到毫秒级响应,而Spark只能达到秒级。
但是Spark流计算更适合于计算较复杂的应用,特别是需要流数据与历史数据结合的计算。
而Storm只能完成简单的计算,如广告点击率等。
此外,Spark的吞吐量要远高于Storm。
1.4Mesos
Mesos是一个资源管理框架一,提供类似于YARN的功能。
YARN是Hadoop中的一个资源管理框架
1.5Tachyon
Tachyon(读:
忒ki样)是一个分布式内存文件系统,可以理解为内存中的HDFS。
为了提供更高的性能,将数据存储剥离JavaHeap。
用户可以基于Tachyon实现RDD或者文件的跨应用共享,并提供高容错机制,保证数据可靠性。
1.6Zookeeper
用于解决分布式系统中一致性问题。
1.7大数据
大数据很难有一个明确定义。
但是他这样几个特点,即大容量、繁杂、高价值、快速,也就是4个V。
它与海量数据相比,我认为很难有一个界定,更多的是一种商业性的口号、名称,它与海量数据有很多交叉的地方,只是为了适应新的网络化世界,而提出这种大数据概念。
3弹性分布式数据集RDD
3.1RDD基本概念
(1)RDD的两种创建方式
1)外部文件创建,如HDFS、本地文件。
2)RDD转换得到新的RDD。
(2)RDD的两种操作算子
对于RDD可以有两种计算操作算子:
Transformation(变换)与Action(行动)。
只有行动(Action)算子才会触发作业(Job)提交。
(3)RDD的重要内部属性
1)分区列表。
2)计算每个分片的函数。
3)对父RDD的依赖列表。
4)对Key-Value对数据类型RDD的分区器,控制分区策略和分区数。
5)每个数据分区的地址列表(如HDFS上的数据块的地址)。
(4)窄依赖和宽依赖
窄依赖指,子RDD分区只由一个或多个父RDD中的一个分区转换而来。
map操作就是一个父RDD的一个分区,union操作就是两个父RDD的一个分区。
宽依赖指,子RDD的分区由父RDD的所有分区转换而来,即经过过shuffle操作。
,如如reduceByKey,groupByKey等。
3.2RDD与分布式共享内存的异同
分布式共享内存(DistributedSharedMemory,DSM)是一种通用的内存数据抽象,这种通用性同时也使其在商用集群上实现有效的容错性和一致性更加困难。
区别:
表3-1 RDD与DSM的对比
对比项目
RDD
DSM
读
批量或细粒度读操作
细粒度读操作
写
批量转换操作
细粒度转换操作
一致性
不重要(RDD是不可更改的)
取决于应用程序或运行时
容错性
细粒度,低开销使用lineage(血统)
需要检查点操作和程序回滚
落后任务的处理
任务备份,重新调度执行
很难处理
任务安排
基于数据存放的位置自动实现
取决于应用程序
此外,RDD对于扫描类型操作,如果内存不足以缓存整个RDD,就进行部分缓存,将内存容纳不下的分区存储到磁盘上。
3.4 Spark算子分类及功能
3.4.1.Saprk算子的作用
图3-3描述了Spark的输入、运行转换、输出。
在运行转换中通过算子对RDD进行转
换。
算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。
图3-3 Spark算子和数据空间
1)输入:
在Spark程序运行中,数据从外部数据空间(如分布式存储:
textFile读取
HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理。
2)运行:
在Spark数据输入形成RDD后便可以通过变换算子,如fliter等,对数据进行操作并将RDD转化为新的RDD,通过Action算子,触发Spark提交作业。
如果数据需
要复用,可以通过Cache算子,将数据缓存到内存。
3)输出:
程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返
回Scalaint型数据)。
Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类实现,如MappedRDD、ShuffledRDD等子类。
Spark将常用的大数据操作都转化成为RDD的子类。
3.4.2算子分类
算子大致可以分为三大类。
1)Value型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。
2)Key-Value型的Transfromation算子,这种变换并不触发提交作业,针对处理的数据项是Key-Value型的数据对。
3)Action算子,这类算子会触发SparkContext提交作业Job。
3.4.3Value型Transformation算子
处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型。
1)输入分区与输出分区一对一型。
2)多对一型。
3)多对多型。
4)输出分区为输入分区子集型。
5)还有一种特殊的输入与输出分区一对一的算子类型:
Cache型。
Cache算子对RDD
分区进行缓存。
·输入分区与输出分区一对一型
(1)map
数据对数据直接转换(通过一种函数)。
图3-4 map算子对RDD转换
(2)flatMap
数据对数据直接转换(通过一种函数),与map的区别在于RDD中的集合内元素会合并为一个集合
图3-5 flapMap算子对RDD转换
(3)mapPartitions
对分区内的元素按一定函数方式进行转换。
mapPartitions是map的一个变种。
map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。
它的函数定义为:
defmapPartitions[U:
ClassTag](f:
Iterator[T]=>Iterator[U],preservesPartitioning:
Boolean=false):
RDD[U]
f即为输入函数,它处理每个分区里面的内容。
每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。
最终的RDD由所有分区经过输入函数处理后的结果合并起来的。
图3-6 mapPartitions算子对RDD转换
(4)glom(一瞥)
将分区内的集合元素形成数组,以进行一瞥的查看。
图3-7 glom算子对RDD转换
·输入分区与输出分区多对一型
(1)union
使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同,并不进行去重操作,保存所有元素。
如果想去重,可以使用distinct()。
++符号相当于union函数操作。
图3-8 union算子对RDD转换
(2)cartesian
对两个RDD内的所有元素进行笛卡尔积操作。
操作后,内部实现返回CartesianRDD.
图3-9 cartesian算子对RDD转换
·输入分区与输出分区多对多型
groupBy:
将元素通过函数生成相应的Key,数据就转化为Key-Value格式,之后将Key相同的元素合为一组。
图3-10 groupBy算子对RDD转换
·输出分区为输入分区子集型
(1)filter
filter的功能是对元素进行过滤,对每个元素应用f函数,返回值为true的元素在RDD
中保留,返回为false的将过滤掉。
内部实现相当于生成FilteredRDD(this,sc.clean(f))。
图3-11 filter算子对RDD转换
(2)distinct
distinct将RDD中的元素进行去重操作.
图3-12 distinct去重算子对RDD转换
(3)subtract
subtract相当于进行集合的差操作,结果为RDD1中出现的元素而RDD2中不出现。
(4)sample
sample将RDD这个集合内的元素进行采样,获取所有元素的子集。
用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。
内部实现是生成SampledRDD(withReplacement,fraction,seed)。
图3-14 sample算子对RDD转换
(5)takeSample
takeSample()函数和上面的sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行Collect(),返回结果的集合为单机的数组。
图3-15 takeSample算子对RDD转换
·Cache型
(1)cache
cache将RDD元素从磁盘缓存到内存,相当于persist(MEMORY_ONLY)函数的功能。
图3-16 cache算子对RDD转换
(2)persist
persist函数对RDD进行缓存操作。
数据缓存在哪里由StorageLevel枚举类型确定。
有以下几种类型的组合:
DISK代表磁盘,MEMORY代表内存,SER代表数据是否进行序列化存储。
3.3.4 Key-Value型Transformation算子
Key-Value型算子又大致可以分为3种类型:
输入分区与输出分区一对一、聚集和连接操作。
·输入分区与输出分区一对一
mapValues:
针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行
处理。
图3-19 mapValues算子RDD对转换
·对单个RDD或两个RDD聚集
(1)单个RDD聚集
1)combineByKey
图3-20 comBineByKey算子对RDD转换
2)reduceByKey。
reduceByKey是更简单的一种情况,只是两个值合并成一个值,所以createCombiner
很简单,就是直接返回v.
图3-21 reduceByKey算子对RDD转换
3)partitionBy。
partitionBy函数对RDD进行分区操作。
如果原有RDD的分区器和现有分区器(partitioner)一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的ShuffledRDD。
通过新的分区策略将原来在不同分区的V1、V2数据都合并到了一个分区。
图3-22 partitionBy算子对RDD转换
(2)对两个RDD进行聚集
cogroup函数将两个RDD进行协同划分.下图将RDD1中的数据(U1,1)、(U1,2)和RDD2中的数据(U1,2)合并为(U1,((1,2),
(2)))。
图3-23 Cogroup算子对RDD转换
·连接
(1)join
join对两个需要连接的RDD进行cogroup函数操作。
cogroup操作之后形成的新RDD,对每个key下的元素进行笛卡尔积操作,返回的结果再展平,对应Key下的所有元组形成一个集合,最后返回RDD[(K,(V,W))]。
图3-24 join算子对RDD转换
(2)leftOutJoin和rightOutJoin
LeftOutJoin(左外连接)和RightOutJoin(右外连接)相当于在join的基础上先判断一
侧的RDD元素是否为空,如果为空,则填充为空。
如果不为空,则将数据进行连接运算,并返回结果。
3.3.5 Actions算子
Actions算子通过SparkContext执行提交作业的runJob操作,触发了RDDDAG(有向无环图)执行。
根据Action算子的输出空间将Action算子进行分类:
无输出、HDFS、Scala集合和数据类型。
1.无输出
(1)foreach
对RDD中的每个元素都应用f函数操作,不返回RDD和Array,而是返回Uint。
图3-25 foreach算子对RDD转换
2.HDFS
(1)saveAsTextFile
函数将数据输出,存储到HDFS的指定目录。
图3-26 saveAsHadoopFile算子对RDD转换
(2)saveAsObjectFile
saveAsObjectFile将分区中的每10个元素组成一个Array,然后将这个Array序列化,映射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。
图3-27 saveAsObjectFile算子对RDD转换
3.Scala集合和数据类型
(1)collect
collect相当于toArray,toArray已经过时不推荐使用,collect将分布式的RDD返回为一个单机的scalaArray数组。
在这个数组上运用scala的函数式操作。
图3-28 Collect算子对RDD转换
(2)collectAsMap
collectAsMap对(K,V)型的RDD数据返回一个单机HashMap。
对于重复K的RDD元素,后面的元素覆盖前面的元素。
图3-29 collectAsMap算子对RDD转换
(3)reduceByKeyLocally
实现的是先reduce再collectAsMap的功能,先对RDD的整体进行reduce操作,然后再收集所有结果返回为一个HashMap。
(4)lookup
Lookup函数对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。
图3-30 lookup对RDD转换
(5)count
count返回整个RDD的元素个数。
图3-31 count对RDD转换
(6)top
top可返回最大的k个元素。
例:
result.top(10)(Ordering.by[(String,Int),Int](_._2))
(String,Int),Int:
输入类型,排序类型
(7)takeOrdered
取最小的几个数,与top相反。
sc.parallelize(Seq(2,3,4,5,6)).takeOrdered
(2)
returnsArray(2,3)
(8)reduce
reduce函数相当于对RDD中的元素进行reduceLeft函数的操作。
reduceLeft先对两个元素
reduce函数内的方法f是由用户自定义得到。
(9)fold
fold和reduce的原理相同,但是与reduce不同,相当于每个reduce时,迭代器取的第一个元素是zeroValue。
图3-33 fold算子对RDD转换
(10)aggregate
aggregate先对每个分区的所有元素进行aggregate操作,再对分区的结果进行fold操作。
aggreagate与fold和reduce的不同之处在于,aggregate相当于采用归并的方式进行数据聚集,这种聚集是并行化的。
而在fold和reduce函数的运算过程中,每个分区中需要进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结果。
3.3.6广播(broadcast)变量
广播(broadcast)变量,相当于Hadoop的distributedcache,其广泛用于广播MapSideJoin中的小表,以及广播大变量等场景。
这些数据集合在单节点内存能够容纳,不需要像RDD那样在节点之间打散存储。
Spark运行时把广播变量数据发到各个节点,并保存下来,后续计算可以复用。
相比Hadoop的distributedcache,广播的内容可以跨作业共享。
Broadcast的底层实现采用了BT。
3.3.7accumulator变量
accumulator变量:
允许做全局累加操作,广泛应用于记录当前运行指标的场合。
4 Spark工作机制
图 Spark架构图
4.1应用程序执行流程
应用程序的执行流程为:
1)写好的应用程序,打包成jar文件。
然后通过客户端上传到集群。
根据Driver的配置模式,要么运行在客户端,要么由master指定worker启动driver进程,并对整个应用程序进行监控和管理。
接着,配置一些上下文环境。
然后顺序执行代码。
2)RDD的算子包括两大类:
一是转换算子,二是行动算子。
只有Action算子才会触发Job的提交,也就是说,Spark采用的是惰性机制,在碰到行动算子的时候,才提交作业。
接着生成RDD有向无环图DAG,由DAG调度器DAGScheduler转化为阶段StageDAG,每个阶段Stage中产生相应的任务Task集合,任务调度器TaskScheduler将任务分发到worker上的Executor执行。
每个任务对应一个数据块,使用用户定义的函数处理数据块。
如图:
4.1.1 应用提交与执行方式
Driver配置(deploy-mode)模式包含以下两种方式。
·Driver进程运行在客户端,对应用进行管理监控。
(为默认项)
·主节点指定某个Worker节点启动Driver,对应用进行监控管理。
图4-4 SparkDriver位于Client图4-5 SparkDriver位于Worker节点的应用提交与执行
4.2 Spark任务调度模式
Spark有多种运行模式,如单机(Local)模式、Standalone模式、YARN模式、Mesos模式。
4.2.1 Spark应用程序之间的调度
一个Executor在一个时间段内只能给一个应用使用。
4.2.2 作业调度
不同线程提交的作业Job可以并行运行。
一个作业分为多个Stage。
整个RDDDAG为一个Job。
action算子中的本质是调用Spark上下文(SparkContext)中的runJob提交了Job。
作业的调度主要有FIFO和FAIR两种模式。
FIFO模式
FIFO(先进先出)。
fair模式
在fair共享模式调度下,多个作业以轮询(roundrobin)方式为分配资源。
考虑到长任务和短任务问题,这样长任务在前,短任务在后,短任务也可以获得不错的响应时间。
4.2.3 阶段(Stage)调度
Action算子触发作业的提交,并形成RDDDAG。
DAGScheduler(调度器)负责将RDDDAG转化为Stage(阶段)DAG。
Stage的DAG通过最后执行的Stage为根进行广度优先遍历,遍历到最开始执行的Stage并执行,如果提交的Stage仍有未完成的父母Stage,则Stage需要等待其父Stage执行完才能执行。
waitingStages中记录仍有未执行的父母Stage,防止过早执行。
runningStages中保存正在执行的Stage,防止重复执行。
failedStages中保存执行失败的Stage,需要重新执行,这里的设计是出于容错的考虑。
4.2.4 任务(Task)调度
一个应用只有一个任务调度器(TaskScheduler)。
所有TaskSetManager都是由这个TaskScheduler调度。
一个Stage对也只有一个TaskSetManager。
TaskSetManager通过一定次序放入调度池pool中。
在调度池中,这些TaskSetMananger又会根据JobID排序,先提交的Job的TaskSetManager优先调度,然后一个Job内的TaskSetManagerID小的先调度。
在执行任务时,任务分配规则:
按照“尽量将任务分配到数据块所存储的位置”原则分配任务。
数据块的存储位置请见4.3.3节。
执行地点的选取:
1)如果是调用过cache()方法的RDD,则读取内存缓存中分区的数据。
2)如果在磁盘中,通常最开始的RDD会有相应信息,例如,从HDFS上读取的数据,HDFS分区就是最好的执行地点。
3)如果不是上面两种情况,将遍历RDDDAG获取第一个窄依赖的父亲RDD对应分区的执行地点。
4.3 SparkI/O机制
4.3.1 序列化
序列化是将对象转换为字节流,本质上可以理解为将链表存储的非连续空间的数据存储转化为连续空间存储的数组中。
这样就可以将数据进行流式传输或者块存储。
4.3.2 压缩
当大片连续区域进行数据存储并且存储区域中数据重复性高的状况下,数据适合进行压缩。
数组或者对象序列化后的数据块可以考虑压缩。
所以序列化后的数据可以压缩,使数据紧缩,减少空间开销。
Snappy提供了更高的压缩速度,LZF提供了更高的压缩比,用户可以根据具体需求选择压缩方式。
4.3.3 Spark存储系统
可以从以下几个维度理解整个存储系统:
类接口、数据读写流程和数据通信。
(1)类接口。
所有外部类都通过块管理器接口(BlockManager)对存储模块(storage)进行操作。
(2)数据读写流程
数据存储分为3个层次:
内存、本地磁盘和远程磁盘。
在diskManager中,存储块ID(blockId)和文件路径映射。
·数据读取流程
在RDD类中,通过compute方法调用迭代器(iterator)读取某个分区(Partition)的数据。
分区是逻辑概念。
一个分区对应物理上的一个块(block)。
一个Executor负责若干个分区。
查看数据存储位置的优先级是:
1)内存;
2)Tachyon;
3)本地磁盘;
4)远程磁盘
在获取远程数据时,先得到远程数据路径,然后通过块管理器工作机创建通信管理器,并从远程读取数据。
·数据写入流程
数据写入流程主要分为以下几个步骤。
1)在RDD类中,通过调用comp
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- 最新 Spark 总结