当前位置:首页 » 《关注互联网》 » 正文

大数据-90 Spark 集群 RDD 编程-高阶 RDD容错机制、RDD的分区、自定义分区器(Scala编写)、RDD创建方式

0 人参与  2024年09月25日 12:01  分类 : 《关注互联网》  评论

点击全文阅读


点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis (已更完)Kafka(已更完)Spark(正在更新!)

章节内容

上节完成的内容如下:

Spark RDD的依赖关系重回 WordCountRDD 持久化RDD 缓存

在这里插入图片描述

RDD容错机制

基本概念

涉及到的算子:checkpoint,也是Transformation

Spark中对于数据的保存除了持久化操作外,还提供了检查点的机制检查点本质是通过RDD写入高可靠的磁盘,主要目的是为了容错。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。Lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销cache和checkpoint是有显著区别的,缓存把RDD计算出来然后放到内存中,但RDD的依赖链不能丢掉,当某个点某个Executor宕机了,上面cache的RDD就会丢掉,需要通过依赖链重新计算。不同的是,checkpoint是把RDD保存在HDFS中,是多副本的可靠存储,此时依赖链可以丢弃,所以斩断了依赖链。

适合场景

DAG中的Lineage过长,如果重新计算,开销会很大在宽依赖上做checkpoint获得的收益更大

启动Shell

# 启动 spark-shellspark-shell --master local[*]

checkpoint

// 设置检查点目录sc.setCheckpointDir("/tmp/checkpoint")val rdd1 = sc.parallelize(1 to 1000)val rdd2 = rdd1.map(_*2)rdd2.checkpoint// checkpoint是lazy操作rdd2.isCheckpointed

可以发现,返回结果是False
在这里插入图片描述

RDD 依赖关系1

checkpoint之前的rdd依赖关系

rdd2.dependencies(0).rddrdd2.dependencies(0).rdd.collect

我们可以观察到,依赖关系是有的,关系到之前的 rdd1 的数据了:
在这里插入图片描述

触发checkpoint

我们可以通过执行 Action 的方式,来触发 checkpoint
执行一次action,触发checkpoint的执行

rdd2.countrdd2.isCheckpointed

此时观察,可以发现 checkpoint 已经是 True 了:
在这里插入图片描述

RDD依赖关系2

我们再次观察RDD的依赖关系:
再次查看RDD的依赖关系。可以看到checkpoint后,RDD的lineage被截断,变成从checkpointRDD开始

rdd2.dependencies(0).rddrdd2.dependencies(0).rdd.collect

此时观察到,已经不是最开始的 rdd1 了:
在这里插入图片描述

查看checkpoint

我们可以查看对应的保存的文件,查看RDD所依赖的checkpoint文件

rdd2.getCheckpointFile
运行的结果如下图:
在这里插入图片描述

RDD的分区

基本概念

spark.default.paralleism: 默认的并发数 2

本地模式

# 此时 spark.default.paralleism 为 Nspark-shell --master local[N]# 此时 spark.default.paralleism 为 1spark-shell --master local

伪分布式

x为本机上启动的Executor数y为每个Executor使用的core数z为每个Executor使用的内存spark.default.paralleism 为 x * y
spark-shell --master local-cluster[x,y,z]

分布式模式

spark.default.paralleism = max(应用程序持有Executor的core总数, 2)

创建RDD方式

集合创建

简单的说,RDD分区数等于cores总数

val rdd1 = sc.paralleize(1 to 100)rdd.getNumPartitions

textFile创建

如果没有指定分区数:

本地文件: rdd的分区数 = max(本地文件分片数,sc.defaultMinPartitions)HDFS文件:rdd的分区数 = max(HDFS文件block数,sc.defaultMinPartitions)

需要额外注意的是:

本地文件分片数 = 本地文件大小 / 32M读取 HDFS 文件,同时指定了分区数 < HDFS文件的Block数,指定的数将不会生效
val rdd = sc.textFile("data/1.txt")rdd.getNumPartitions

RDD分区器

判断分区器

以下RDD分别是否有分区器,是什么类型的分区器

val rdd1 = sc.textFile("/wcinput/wc.txt")rdd1.partitionerval rdd2 = sc.flatMap(_.split("\\s+"))rdd2.partitionerval rdd3 = rdd2.map((_, 1))rdd3.partitionerval rdd4 = rdd3.reduceByKey(_ + _)rdd4.partitionerval rdd5 = rdd4.sortByKey()rdd5.partitioner

分区器作用与分类

在PairRDD(key,value)中,很多操作都是基于Key的,系统会按照Key对数据进行重组,如 GroupByKey
数据重组需要规则,最常见的就是基于Hash的分区,此外还有一种复杂的基于抽样Range分区方法:
在这里插入图片描述

HashPartitioner

最简单、最常用,也是默认提供的分区器。
对于给定的Key,计算HashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个Key所属的分区ID。
该分区方法可以保证Key相同的数据出现在同一个分区中。
用户可以通过 partitionBy主动使用分区器,通过 partitions参数指定想要分区的数量。

默认情况下的分区情况是:

val rdd1 = sc.makeRDD(1 to 100).map((_, 1))rdd1.getNumPartitions

执行结果如下图所示:
在这里插入图片描述
执行结果如下图所示,分区已经让我们手动控制成10个了:

val rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(10))rdd2.getNumPartitionsrdd2.glom.collect.foreach(x => println(x.toBuffer))

RangePartitioner

简单来说就是将一定范围内的数映射到某个分区内,在实现中,分界的算法尤为重要,用到了水塘抽样算法。sortByKey会使用RangePartitioner。
在这里插入图片描述
进行代码的测试:

val rdd3 = rdd1.partitionBy(new org.apache.spark.RangePartitioner(10, rdd1))rdd3.glom.collect.foreach(x => println(x.toBuffer))

执行结果如下图所示:
在这里插入图片描述
但是现在的问题是:在执行分区之前其实并不知道数据的分布情况,如果想知道数据的分区就需要对数据进行采样。

Spark中的RangePartitioner在对数据采样的过程中使用了 “水塘采样法”水塘采样法是:在包含N个项目的集合S中选取K个样本,其中N为1或者很大的未知的数量,尤其适用于不能把所有N个项目都存放到主内存的情况。在采样过程中执行了 collect() 操作,引发了 Action 操作。

自定义分区器

Spark允许用户通过自定义的Partitioner对象,灵活的来控制RDD的分区方式。
我们需要实现自定义分区器,按照以下的规则进行分区:

分区 0 < 100100 <= 分区1 < 200200 <= 分区2 < 300300 <= 分区3 < 400…900 <= 分区9 < 1000

编写代码

package icu.wzkimport org.apache.spark.rdd.RDDimport org.apache.spark.{Partitioner, SparkConf, SparkContext}import scala.collection.immutableclass MyPartitioner(n: Int) extends Partitioner {  override def numPartitions: Int = n  override def getPartition(key: Any): Int = {    val k = key.toString.toInt    k / 100  }}object UserDefinedPartitioner {  def main(args: Array[String]): Unit = {    val conf = new SparkConf()      .setAppName("UserDefinedPartitioner")      .setMaster("local[*]")    val sc = new SparkContext(conf)    sc.setLogLevel("WARN")    val random = scala.util.Random    val arr: immutable.IndexedSeq[Int] = (1 to  100)      .map(idx => random.nextInt(1000))    val rdd1: RDD[(Int, Int)] = sc.makeRDD(arr).map((_, 1))    rdd1.glom.collect.foreach(x => println(x.toBuffer))    println("=========================================")    val rdd2 = rdd1.partitionBy(new MyPartitioner(10))    rdd2.glom.collect().foreach(x => println(x.toBuffer))        sc.stop()      }}

打包上传

这里之前已经重复过多次,就跳过了

mvn clean package

运行测试

spark-submit --master local[*] --class icu.wzk.UserDefinedPartitioner spark-wordcount-1.0-SNAPSHOT.jar

可以看到如下的运行结果:
在这里插入图片描述


点击全文阅读


本文链接:http://zhangshiyu.com/post/164373.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

最新文章

  • 祖母寿宴,侯府冒牌嫡女被打脸了(沈屿安秦秀婉)阅读 -
  • 《雕花锦年,昭都旧梦》(裴辞鹤昭都)完结版小说全文免费阅读_最新热门小说《雕花锦年,昭都旧梦》(裴辞鹤昭都) -
  • 郊区41号(许洛竹王云云)完整版免费阅读_最新全本小说郊区41号(许洛竹王云云) -
  • 负我情深几许(白诗茵陆司宴)完结版小说阅读_最热门小说排行榜负我情深几许白诗茵陆司宴 -
  • 九胞胎孕妇赖上我萱萱蓉蓉免费阅读全文_免费小说在线看九胞胎孕妇赖上我萱萱蓉蓉 -
  • 为保白月光,侯爷拿我抵了债(谢景安花田)小说完结版_完结版小说全文免费阅读为保白月光,侯爷拿我抵了债谢景安花田 -
  • 陆望程映川上官硕《我的阿爹是带攻略系统的替身》最新章节阅读_(我的阿爹是带攻略系统的替身)全章节免费在线阅读陆望程映川上官硕
  • 郑雅琴魏旭明免费阅读_郑雅琴魏旭明小说全文阅读笔趣阁
  • 头条热门小说《乔书意贺宴临(乔书意贺宴临)》乔书意贺宴临(全集完整小说大结局)全文阅读笔趣阁
  • 完结好看小说跨年夜,老婆初恋送儿子故意出车祸_沈月柔林瀚枫完结的小说免费阅读推荐
  • 热推《郑雅琴魏旭明》郑雅琴魏旭明~小说全文阅读~完本【已完结】笔趣阁
  • 《你的遗憾与我无关》宋怀川冯洛洛无弹窗小说免费阅读_免费小说大全《你的遗憾与我无关》宋怀川冯洛洛 -

    关于我们 | 我要投稿 | 免责申明

    Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1