文章目录
Spark内存计算框架Spark CoreSpark读写HBase1. 通过newAPIHadoopRDD实现2. 通过spark on hbase实现 Spark的序列化与反序列化1. transformation操作为什么需要序列化2. spark的任务序列化异常3. spark中解决序列化的方法4. kyro序列化 数据倾斜原理和现象分析1. 数据倾斜概述2. 数据倾斜发生时的现象3. 数据倾斜发生的原理4. 数据倾斜如何定位原因5. 数据倾斜原因总结6. 数据倾斜的后果 spark中数据倾斜的解决方案1. 解决方案一:使用Hive ETL预处理数据2. 解决方案二:过滤少数导致倾斜的key3. 解决方案三:提高shuffle操作的并行度(效果差)4. 解决方案四:两阶段聚合(局部聚合+全局聚合)5. 解决方案五:将reduce join转为map join6. 解决方案六:采样倾斜key并分拆join操作7. 解决方案七:使用随机前缀和扩容RDD进行join
Spark内存计算框架
Spark Core
Spark读写HBase
我们可以通过 Spark 整合 HBase,实现通过 Spark 来读取 HBase 的数据。数据准备:创建 HBase 表,并插入数据:create 'spark_hbase','info'put 'spark_hbase','0001','info:name','tangseng'put 'spark_hbase','0001','info:age','30'put 'spark_hbase','0001','info:sex','0'put 'spark_hbase','0001','info:addr','beijing'put 'spark_hbase','0002','info:name','sunwukong'put 'spark_hbase','0002','info:age','508'put 'spark_hbase','0002','info:sex','0'put 'spark_hbase','0002','info:addr','shanghai'put 'spark_hbase','0003','info:name','zhubajie'put 'spark_hbase','0003','info:age','715'put 'spark_hbase','0003','info:sex','0'put 'spark_hbase','0003','info:addr','shenzhen'put 'spark_hbase','0004','info:name','bailongma'put 'spark_hbase','0004','info:age','1256'put 'spark_hbase','0004','info:sex','0'put 'spark_hbase','0004','info:addr','donghai'put 'spark_hbase','0005','info:name','shaheshang'put 'spark_hbase','0005','info:age','1008'put 'spark_hbase','0005','info:sex','0'put 'spark_hbase','0005','info:addr','tiangong'
创建输出 HBase 数据 create 'spark_hbase_out','info'
1. 通过newAPIHadoopRDD实现
添加 pom.xml 依赖<repositories> <!-- spark on hbase是cloudera提供的,所以这个地方添加了cdh仓库地址 --> <repository> <id>cloudera</id> <!-- <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>--> <url>https://mvnrepository.com/artifact/org.apache.hbase/hbase-spark</url> </repository></repositories><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.3</version></dependency><!-- hadoop-core 2009、7月更名为hadoop-common --><dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId><!-- <version>2.6.0-mr1-cdh5.14.2</version>--> <version>3.1.4</version></dependency><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId><!-- <version>1.2.0-cdh5.14.2</version>--> <version>2.2.6</version></dependency><!-- <dependency>--><!-- <groupId>org.apache.hbase</groupId>--><!-- <artifactId>hbase-common</artifactId>--><!-- <version>1.2.0-cdh5.14.2</version>--><!--<!– <version>2.2.6</version>–>--><!-- </dependency>--><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId><!-- <version>1.2.0-cdh5.14.2</version>--> <version>2.2.6</version></dependency><!-- 添加此依赖,解决spark hbase集成,低版本时报Base64找不到的问题 --><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-mapreduce</artifactId> <version>2.2.6</version></dependency><!-- 为了让它与apache hbase 2.2.2兼容,使用cdh6.2版本的;具体dependency可以去https://mvnrepository.com/查找 --><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-spark</artifactId><!-- <version>1.2.0-cdh5.14.2</version>--> <version>2.1.0-cdh6.2.0</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.3.3</version></dependency>
代码实现: object Case09_SparkWithHBase { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]") val sc = new SparkContext(conf) // 1. 创建HBase的环境参数 val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181"); hbaseConf.set(TableInputFormat.INPUT_TABLE, "spark_hbase")// // 2. 设置过滤器,还可以设置起始和结束rowkey// val scan = new Scan// scan.setFilter(new RandomRowFilter(0.5f))// // 设置scan对象,让filter生效(序列化)// hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)) // 3. 读取HBase数据,生成RDD val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) resultRDD.foreach(x => { // 查询出来的结果集存在 (ImmutableBytesWritable, Result)第二个元素 val result = x._2 // 获取行键 val rowKey = Bytes.toString(result.getRow) val name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))) val age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age"))) println(rowKey + ":" + name + ":" + age) }) // 4. 向HBase表写入数据 hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "spark_hbase_out") val job = Job.getInstance(hbaseConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) // 5. 封装输出结果 resultRDD: RDD[(ImmutableBytesWritable, Result)] val outRDD: RDD[(ImmutableBytesWritable, Put)] = resultRDD.mapPartitions(eachPartition => { eachPartition.map(keyAndEachResult => { val result = keyAndEachResult._2 val rowKey = Bytes.toString(result.getRow) val name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))) val age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age"))) val put = new Put(Bytes.toBytes(rowKey)) val immutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey)) put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name)) put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age)) // 向 HBase 插入数据,需要 rowKey 和 put 对象 (immutableBytesWritable, put) }) }) // 6. 调用API Output the RDD to any Hadoop-supported storage system with new Hadoop API outRDD.saveAsNewAPIHadoopDataset(job.getConfiguration) sc.stop() }}
2. 通过spark on hbase实现
参考资料: https://github.com/cloudera-labs/SparkOnHBase/blob/cdh5-0.0.2/src/main/scala/com/cloudera/spark/hbase/HBaseContext.scalahttps://issues.apache.org/jira/browse/HBASE-13992https://github.com/cloudera-labs/SparkOnHBase 优势: 无缝的使用 HBase Connection和 Kerberos 无缝集成通过 get 或 scan 直接生成 RDD利用 RDD 支持 HBase 的任何组合操作为通用操作提供简单的方法,同时通过 API 允许不受限制的未知高级操作支持 Java 和 Scala为 Spark 和 Spark Streaming 提供相似的 API 由于 hbaseContext 是一个只依赖 hadoop、hbase、spark 的 jar 包的工具类,因此可以拿过来直接用添加依赖包:<!-- 为了让它与apache hbase 2.2.2兼容,使用cdh6.2版本的;具体dependency可以去https://mvnrepository.com/查找 --><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-spark</artifactId> <version>2.1.0-cdh6.2.0</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.3.3</version></dependency>
代码实现: object Case10_SparkOnHBase { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]") val sc = new SparkContext(conf) val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181"); hbaseConf.set(TableInputFormat.INPUT_TABLE, "spark_hbase") val hbaseContext = new HBaseContext(sc, hbaseConf) val scan = new Scan() val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = hbaseContext.hbaseRDD(TableName.valueOf("spark_hbase"), scan) hbaseRDD.map(eachResult => { val result: Result = eachResult._2 val rowKey = Bytes.toString(result.getRow) val name = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name"))) val age = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age"))) println(rowKey + ":" + name + ":" + age) }).foreach(println) sc.stop() }}
Spark的序列化与反序列化
1. transformation操作为什么需要序列化
Spark 是分布式执行引擎,其核心抽象是弹性分布式数据集 RDD,其代表了分布在不同节点的数据。Spark 的计算是在 Executor 上分布式执行的,故用户开发的关于 RDD 的 map、flatMap、reduceByKey 等 transformation 操作(闭包)有如下执行过程: ① 代码中对象在 Driver 本地序列化;② 对象序列化后传输到远程 Executor 节点;③ 远程 Executor 节点反序列化对象;④ 最终远程节点执行。 故对象在执行中需要序列化通过网络传输,则必须经过序列化过程。2. spark的任务序列化异常
在编写 spark 程序中,由于在 map、foreachPartition 等算子内部使用了外部定义的变量和函数,从而引发 Task 未序列化问题。然而 spark 算子在计算过程中使用外部变量在许多情形下确实在所难免: 比如在 filter 算子根据外部指定的条件进行过滤;map根据相应的配置进行变换。 经常会出现“org.apache.spark.SparkException: Task not serializable”这个错误 其原因就在于这些算子使用了外部的变量,但是这个变量不能序列化。当前类使用了“extends Serializable”声明支持序列化,但是由于某些字段不支持序列化,仍然会导致整个类序列化时出现问题,最终导致出现 Task 未序列化问题。3. spark中解决序列化的方法
如果函数中使用了该类对象,该类要实现序列化:类 extends Serializable。如果函数中使用了该类对象的成员变量,该类除了要实现序列化之外,所有的成员变量必须要实现序列化。对于不能序列化的成员变量使用==“@transient”==标注,告诉编译器不需要序列化。也可将依赖的变量独立放到一个小的class中,让这个class支持序列化,这样做可以减少网络传输量,提高效率。可以把对象的创建直接在该函数中构建这样避免需要序列化。4. kyro序列化
在分布式应用中,经常会进行IO操作,传递对象,而网络传输过程中就必须要序列化。Java序列化可以序列化任何类,比较灵活,但是相当慢,并且序列化后对象的提交也比较大。Spark 出于性能考虑,在 2.0 以后,开始支持 kryo 序列化机制,速度是 Serializable 的 10 倍以上,当 RDD 在 Shuffle 数据的时候,简单数据类型,简单数据类型数组,字符串类型已经使用 kryo 来序列化。也可以通过 kyro 对我们需要序列化的对象,进行序列化标价val conf = new SparkConf().setMaster(...).setAppName(...)conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))val sc = new SparkContext(conf)
举个例子: case class MySearcher(val query: String) { def getMatchRddByQuery(rdd: RDD[String]): RDD[String] = { rdd.filter(x => x.contains(query)) }}object Case11_Kyro { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]") // 替换默认序列化机制 .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注册需要使用kryo序列化自定义类 .registerKryoClasses(Array(classOf[MySearcher])) val sc = new SparkContext(conf) val rdd1: RDD[String] = sc.parallelize(Array("hadoop yarn", "hadoop hdfs", "c")) val rdd2: RDD[String] = MySearcher("hadoop").getMatchRddByQuery(rdd1) rdd2.foreach(println) }}
数据倾斜原理和现象分析
1. 数据倾斜概述
有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时 Spark 作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证 Spark 作业的性能。2. 数据倾斜发生时的现象
① 绝大多数task执行得都非常快,但个别 task 执行极慢 你的大部分的 task,都执行的特别快,很快就执行完了,剩下几个 task,执行的特别特别慢;前面的 task,一般 10s 可以执行完5个,最后发现某个task,要执行 1 个小时、2 个小时才能执行完一个 task;这个时候就出现数据倾斜了。这种方式还算好的,因为虽然老牛拉破车一样,非常慢,但是至少还能跑。 ② 绝大数 task 执行很快,有的 task 直接报OOM (Jvm Out Of Memory) 异常 运行的时候,其他 task 都很快执行完了,也没什么特别的问题;但是有的 task,就是会突然间报了一个 OOM,内存溢出了,task failed、task lost、resubmitting task等日志异常信息。反复执行几次某个 task 就是跑不通,最后就挂掉。某个 task 就直接 OOM,那么基本上也是因为数据倾斜了,task 分配的数量实在是太大了!!!所以内存放不下,然后你的 task 每处理一条数据,还要创建大量的对象。内存爆掉了。3. 数据倾斜发生的原理
如上图所示:在进行任务计算 shuffle 操作的时候,第一个 task 和第二个 task 各分配到了 1 万条数据;需要 5 分钟计算完毕;第三个 task要 98万 条数据,98 * 5 = 490分钟 = 8个小时;本来另外两个 task 很快就运行完毕了(5分钟),第三个task数据量比较大,要 8 个小时才能运行完,就导致整个 spark 作业,也得 8 个小时才能运行完。最终导致整个 spark 任务计算特别慢。4. 数据倾斜如何定位原因
主要是根据log日志信息去定位: 数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发 shuffle 操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。因为某个或者某些 key 对应的数据,远远的高于其他的key。 分析定位逻辑: 由于代码中有大量的 shuffle 操作,一个 job 会划分成很多个 stage首先要看的,就是数据倾斜发生在第几个 stage 中。可以在任务运行的过程中,观察任务的 UI 界面,可以观察到每一个 stage 中运行的 task 的数据量,从而进一步确定是不是 task 分配的数据不均匀导致了数据倾斜。比如下图中,倒数第三列显示了每个 task 的运行时间。明显可以看到,有的 task 运行特别快,只需要几秒钟就可以运行完;而有的 task 运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,倒数第一列显示了每个 task 处理的数据量,明显可以看到,运行时间特别短的 task 只需要处理几百 KB 的数据即可,而运行时间特别长的 task 需要处理几千 KB 的数据,处理的数据量差了 10 倍。此时更加能够确定是发生了数据倾斜。 某个task莫名其妙内存溢出的情况 这种情况下去定位出问题的代码就比较容易了。建议直接看 yarn-client 模式下本地 log 的异常栈,或者是通过 YARN 查看 yarn-cluster 模式下的 log 中的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有 shuffle 类算子,此时很可能就是这个算子导致了数据倾斜。但是需要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的 bug,以及偶然出现的数据异常,也可能会导致内存溢出。因此还是要按照上面所讲的方法,通过 Spark Web UI 查看报错的那个 stage 的各个 task 的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。 查看导致数据倾斜的key的数据分布情况 知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了 shuffle 操作并且导致了数据倾斜的 RDD/Hive 表,查看一下其中 key 的分布情况。这主要是为之后选择哪一种技术方案提供依据。针对不同的 key 分布与不同的 shuffle 算子组合起来的各种情况,可能需要选择不同的技术方案来解决。此时根据你执行操作的情况不同,可以有很多种查看key分布的方式: ① 如果是 Spark SQL 中的 group by、join 语句导致的数据倾斜,那么就查询一下 SQL 中使用的表的 key 分布情况。② 如果是对 Spark RDD 执行 shuffle 算子导致的数据倾斜,那么可以在 Spark 作业中加入查看 key 分布的代码,比如RDD.countByKey()。 然后对统计出来的各个key出现的次数,collect/take 到客户端打印一下,就可以看到 key 的分布情况。举例来说,对于上面所说的单词计数程序,如果确定了是 stage1 的 reduceByKey 算子导致了数据倾斜,那么就应该看看进行reduceByKey 操作的 RDD 中的 key 分布情况,在这个例子中指的就是 pairs RDD。如下示例,我们可以先对 pairs 采样 10% 的样本数据,然后使用 countByKey 算子统计出每个 key 出现的次数,最后在客户端遍历和打印样本数据中各个 key的出现次数。val sampledPairs = pairs.sample(false, 0.1)val sampledWordCounts = sampledPairs.countByKey()sampledWordCounts.foreach(println(_))
5. 数据倾斜原因总结
数据本身问题 ① key 本身分布不均衡(包括大量的key为空)② key 的设置不合理 spark使用不当的问题 ① shuffle 时的并发度不够② 计算方式有误6. 数据倾斜的后果
spark 中的 stage 的执行时间受限于最后那个执行完成的 task,因此运行缓慢的任务会拖垮整个程序的运行速度(分布式程序运行的速度是由最慢的那个task决定的)。过多的数据在同一个task中运行,将会把 executor 内存撑爆,导致 OOM 内存溢出。spark中数据倾斜的解决方案
1. 解决方案一:使用Hive ETL预处理数据
适用场景:导致数据倾斜的是 Hive 表。如果该 Hive 表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用 Spark 对 Hive 表执行某个分析操作,那么比较适合使用这种技术方案。实现思路:此时可以评估一下,是否可以通过 Hive 来进行数据预处理(即通过 Hive ETL 预先对数据按照 key 进行聚合,或者是预先和其他表进行 join),然后在 Spark 作业中针对的数据源就不是原来的 Hive 表了,而是预处理后的 Hive 表。此时由于数据已经预先进行过聚合或 join 操作了,那么在 Spark 作业中也就不需要使用原先的 shuffle 类算子执行这类操作了。实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在 Spark 中执行 shuffle 类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以 Hive ETL 中进行 group by 或者 join 等 shuffle 操作时,还是会出现数据倾斜,导致 Hive ETL 的速度很慢。我们只是把数据倾斜的发生提前到了 Hive ETL 中,避免 Spark 程序发生数据倾斜而已。优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark 作业的性能会大幅度提升。缺点:治标不治本,Hive ETL 中还是会发生数据倾斜。实践经验:在一些 Java 系统与 Spark 结合使用的项目中,会出现 Java 代码频繁调用 Spark 作业的场景,而且对 Spark 作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的 Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次 Java 调用 Spark 作业时,执行速度都会很快,能够提供更好的用户体验。项目经验:有一个交互式用户行为分析系统中使用了这种方案,该系统主要是允许用户通过 Java Web 系统提交数据分析统计任务,后端通过 Java 提交 Spark 作业进行数据分析统计。要求 Spark 作业速度必须要快,尽量在 10 分钟以内,否则速度太慢,用户体验会很差。所以我们将有些 Spark 作业的 shuffle 操作提前到了 Hive ETL 中,从而让 Spark 直接使用预处理的 Hive 中间表,尽可能地减少 Spark 的 shuffle 操作,大幅度提升了性能,将部分作业的性能提升了 6 倍以上。2. 解决方案二:过滤少数导致倾斜的key
适用场景:如果发现导致倾斜的 key 就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如 99% 的 key 就对应 10 条数据,但是只有一个 key 对应了 100 万数据,从而导致了数据倾斜。实现思路:如果我们判断那少数几个数据量特别多的 key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个 key。 比如,在 Spark SQL 中可以使用 where 子句过滤掉这些 key 或者在 Spark Core 中对 RDD 执行 filter 算子过滤掉这些 key。如果需要每次作业执行时,动态判定哪些 key 的数据量最多然后再进行过滤,那么可以使用 sample 算子对 RDD 进行采样,然后计算出每个 key 的数量,取数据量最多的 key 过滤掉即可。 实现原理:将导致数据倾斜的 key 给过滤掉之后,这些 key 就不会参与计算了,自然不可能产生数据倾斜。优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。缺点:适用场景不多,大多数情况下,导致倾斜的 key 还是很多的,并不是只有少数几个。实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天 Spark 作业在运行的时候突然 OOM 了,追查之后发现,是 Hive 表中的某一个 key 在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个 key 之后,直接在程序中将那些 key 给过滤掉。3. 解决方案三:提高shuffle操作的并行度(效果差)
适用场景:如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。实现思路:在对 RDD 执行 shuffle 算子时,给 shuffle 算子传入一个参数,比如 reduceByKey(_ + _, 1000),该参数就设置了这个 shuffle 算子执行时 shuffle read task 的数量。对于 Spark SQL 中的 shuffle 类语句,比如 group by、join 等,需要设置一个参数,即spark.sql.shuffle.partitions
,该参数代表了 shuffle read task 的并行度,该值默认是 200,对于很多场景来说都有点过小。实现原理:增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task,从而让每个 task 处理比原来更少的数据。举例来说,如果原本有 5 个 key,每个 key 对应 10 条数据,这 5 个 key 都是分配给一个 task 的,那么这个 task 就要处理 50 条数据。而增加了 shuffle read task 以后,每个 task 就分配到一个key,即每个 task 就处理10条数据,那么自然每个 task 的执行时间都会变短了。具体原理如下图所示。优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个 key 对应的数据量有 100 万,那么无论你的 task 数量增加到多少,这个对应着 100 万数据的 key 肯定还是会分配到一个 task 中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用最简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。 4. 解决方案四:两阶段聚合(局部聚合+全局聚合)
适用场景:对 RDD 执行 reduceByKey 等聚合类 shuffle 算子或者在 Spark SQL 中使用 group by语句 进行分组聚合时,比较适用这种方案。实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个 key 都打上一个随机数,比如 10 以内的随机数,此时原先一样的 key 就变成不一样的了,比如 (hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行 reduceByKey 等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个 key 的前缀给去掉,就会变成 (hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如 (hello, 4)。实现原理:将原本相同的 key 通过附加随机前缀的方式,变成多个不同的 key,就可以让原本被一个 task 处理的数据分散到多个 task 上去做局部聚合,进而解决单个 task 处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果具体原理见下图。优点:对于聚合类的 shuffle 操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将 Spark 作业的性能提升数倍以上。缺点:仅仅适用于聚合类的 shuffle 操作,适用范围相对较窄。如果是 join 类的 shuffle 操作,还得用其他的解决方案。 案例:如果使用reduceByKey因为数据倾斜造成运行失败的问题。具体操作流程如下:// 1. 将原始的 key 转化为 随机值 + key (随机值 = Random.nextInt)// 2. 对数据进行 reduceByKey(func)// 3. 将随机值+key 转成 key// 4. 再对数据进行 reduceByKey(func),如果使用 groupBykey 会增大数据倾斜的概率object WordCountAggTest { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("WordCount") val sc = new SparkContext(conf) val array = Array("you you", "you you", "you you", "you you", "you you", "you you", "you you", "jump jump") val rdd = sc.parallelize(array, 8) rdd.flatMap(line => line.split(" ")) .map(word => { val prefix = (new util.Random).nextInt(3) (prefix + "_" + word, 1) }).reduceByKey(_ + _) .map(wc => { val newWord = wc._1.split("_")(1) val count = wc._2 (newWord, count) }).reduceByKey(_ + _) .foreach(wc => { println("单词:" + wc._1 + " 次数:" + wc._2) }) }}