设为首页 - 加入收藏 ASP站长网(Aspzz.Cn)- 科技、建站、经验、云计算、5G、大数据,站长网!
热搜: 手机 数据 公司
当前位置: 首页 > 服务器 > 安全 > 正文

Spark的RDD原理以及2.0特性的介绍(2)

发布时间:2021-01-05 02:09 所属栏目:53 来源:网络整理
导读:函数定义 f: (TaskContext,Int,Iterator[T]) = Iterator[U] 2. 获取分片信息 protected def getPartitions: Array[Partition]? 即这个操作的数据划分为多少个分 区.跟 mapreduce 里的 map 上的 split 类似的. 3. 获

函数定义

f: (TaskContext,Int,Iterator[T]) => Iterator[U]

2. 获取分片信息

protected def getPartitions: Array[Partition]?

即这个操作的数据划分为多少个分 区.跟 mapreduce 里的 map 上的 split 类似的.

3. 获取父 RDD 的依赖关系

protected def getDependencies:?Seq[Dependency[_]]?

依赖分二种:如果 RDD 的每个分区最多只能被一个 Child RDD 的一个分区使用,则称之为 narrow dependency;若依赖于多个 Child RDD 分区,则称之为 wide dependency.不同的操作根据其特性,可能会产生不同的依赖??.如下图所示

map 操作前后二个 RDD 操作之间的分区是一对一的关系,故产生 narrow dependency,而 join 操作的分区分别对应于它的二个子操作相对应的分区,故产生 wide dependency.当最后要生成具体的 task 运行时,就需要利用这个依赖关系也生成 Stage 的 DAG 图.

4. 获取该操作对应数据的存放位置信息,主要是针对 HDFS 这类有数据源的 RDD.

protected def getPreferredLocations(split: Partition): Seq[String]

Spark 的执行模式

Spark 的执行模式有 local、Yarn、Standalone、Mesos 四类.后面三个分别有 cluster 和 client 二种.client 和 cluster 的区别就是指 Driver 是在程序提交客户端还是在集群的 AM 上. 比如常见的 Yarn-cluster 模式如下图所示:

一般来说,运行简单测试或 UT 用的是 local 模式运行,其实就是用多线程模似分布式执行. 如果业务部门较少且不需要对部门或组之间的资源做划分和优先级调度的话,可以使用 Standalone 模式来部署.

当如果有多个部门或组,且希望每个组织可以限制固定运行的最大资源,另外组或者任务需要有优先级执行的话,可以选择 Yarn 或 Mesos.

Spark 2.0 的特性

Unifying DataFrames and Datasets in Scala/Java

DataFrame? 和 Dataset? 的功能是什么?

它们都是提供给用户使用,包括各类操作接口的 API.1.3 版本引入 DataFrame,1.6 版本引入 Dataset,2.0 提供的功能是将二者统一,即保留 Dataset,而把 DataFrame 定义为 Dataset[Row],即是 Dataset 里的元素对象为 Row 的一种(SPARK-13485).

在参考资料? 中有介绍 DataFrame,它就是提供了一系列操作 API,与 RDD API 相比较,DataFrame 里操作的数据都是带有 Schema 信息,所以 DataFrame 里的所有操作是可以享受 Spark SQL Catalyst optimizer 带来的性能提升,比如 code generation 以及 Tungsten??等.执行过程如下图所示

但是 DataFrame 出来后发现有些情况下 RDD 可以表达的逻辑用 DataFrame 无法表达.比如 要对 group by 或 join 后的结果用自定义的函数,可能用 SQL 是无法表达的.如下代码:

case class ClassData(a: String,b: Int)

case class ClassNullableData(a: String,b: Integer)

val ds = Seq(ClassData(“a”,1),ClassData(“a”,2)).toDS()

val agged = ds.groupByKey(d => ClassNullableData(d.a,null))

.mapGroups {

case (key,values) => key.a + values.map(_.b).sum

}

中间处理过程的数据是自定义的类型,并且 groupby 后的聚合逻辑也是自定义的,故用 SQL?比较难以表达,所以提出了 Dataset API.Dataset API 扩展 DataFrame API 支持静态类型和运行已经存在的 Scala 或 Java 语言的用户自定义函数.同时 Dataset 也能享受 Spark SQL 里所有性能 带来的提升.

那么后面发现 Dataset 是包含了 DataFrame 的功能,这样二者就出现了很大的冗余,故在 2.0 时将二者统一,保留 Dataset API,把 DataFrame 表示为 Dataset[Row],即 Dataset 的子集.

因此我们在使用 API 时,优先选择 DataFrame & Dataset,因为它的性能很好,而且以后的优化它都可以享受到,但是为了兼容早期版本的程序,RDD API 也会一直保留着.后续 Spark 上层的库将全部会用 DataFrame,比如 MLlib、Streaming、Graphx 等.

Whole-stage code generation

在参考资料 9 中有几个例子的代码比较,我们看其中一个例子:

elect count(*) from store_sales where ss_item_sk = 1000

那么在翻译成计算引擎的执行计划如下图:

而通常物理计划的代码是这样实现的:

class Filter {

def next(): Boolean = {

var found = false

while (!found && child.next()) {

found = predicate(child.fetch())

}

return found

}

def fetch(): InternalRow = {

child.fetch()

}…

}

(编辑:ASP站长网)

网友评论
推荐文章
    热点阅读