想通过配置调用agg方法, 不知如何构造spark.sql.Column, 所以研究了下, 记录下:
agg方法:
def agg(expr: Column, exprs: Column*): DataFrame = {
toDF((expr +: exprs).map {
case typed: TypedColumn[_, ……继续阅读 »
wangting
3年前 (2020-11-06) 471浏览
6个赞
Dependency
Spark将RDD之间的依赖分为窄依赖和宽依赖,源码中对应的定义分别为NarrowDependency和ShuffleDependency。
其中NarrowDependency又有两种OneToOneDependency和RangeDependency两种实现。
NarrowDependency
OneToOneDependency……继续阅读 »
wangting
4年前 (2019-11-18) 875浏览
4个赞
相关参数介绍:
参数
默认值
说明
spark.sql.output.codec
none
使用的序列化方式,如snappy,lzo等,默认为不压缩。
spark.sql.output.coalesceNum
200
写入到指定目录的数据合并后文件的个数,仅当spark.sql.output.merge设为true时生效。
……继续阅读 »
wangting
4年前 (2019-11-06) 1277浏览
4个赞
通过GeneratedBlockHandler实现对数据的存储和确认逻辑。
/** Class to handle blocks generated by the block generator. */
private final class GeneratedBlockHandler extends BlockGeneratorList……继续阅读 »
wangting
4年前 (2019-11-01) 867浏览
3个赞
Feature:Spark支持GBK文件读取功能
需求分析
Spark默认的Text输入格式处理同Hadoop相同,均默认以UTF-8编码处理。
如果源本件本身是其他格式编码的,如GBK,在Spark处理后读出就会是乱码。
例如一张有关餐馆信息的数据可能是:
4504812|聚品面馆
5623102|金米粒抓饭
5623112|湘下茶饭
……继续阅读 »
wangting
4年前 (2019-11-01) 1292浏览
3个赞
调度
调度相关配置说明详见下表:
配置项
默认值
说明
spark.scheduler.mode
FIFO
提交到同一个SparkContext的job的调度策略,有FIFO和FAIR两种。
Spark SQL
SQL组件相关配置说明详见下表:
配置项
默认值
……继续阅读 »
wangting
4年前 (2019-10-29) 1019浏览
4个赞
Dataset是1.6版本引入的新的实验接口, 整合了RDD的优势(强类型, 支持lambda方法)和Spark SQL执行引擎的各种优化. Dataset可以由JVM对象来构造并且使用transformation来变换(map, flatMap, filter等等). Dataset本身是DataFrame API的一种扩展, 它提供了类型安全, 面向对象……继续阅读 »
wangting
4年前 (2019-10-17) 1101浏览
3个赞
如果是读取hdfs的文件,一般来说,partition的数量等于文件的数量。
如果单个文件的大小大于hdfs的分块大小,partition的数量就等于 “文件大小/分块大小”。
同时,也可以使用rdd的repartition方法重新划分partition。
另外,在使用聚合函数比如 reducebykey, groupbykey,可以通过指定partitio……继续阅读 »
wangting
4年前 (2019-10-14) 1048浏览
6个赞
Spark 1.4执行SQL查询过程中发现driver端Spark作业进程占用内存过大
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
5109 liuxinmi 20 0 9152m 8.5g 18m S 99.9 13.5 7:04.13 java
15632 liuxinmi 20 0……继续阅读 »
wangting
4年前 (2019-10-12) 790浏览
4个赞
报错信息如下:
org.apache.spark.SparkException: Job aborted due to stage failure:
Serialized task 6:0 was 12518780 bytes which exceeds spark.akka.frameSize (10485760 bytes).
Consider usin……继续阅读 »
wangting
4年前 (2019-10-12) 1230浏览
4个赞
执行类似 xxx.write.format(“parquet”).save(‘xxxx’) 的操作。
虽然UI上显示completed
但是后台会hold主进程 一段时间,有时候几分钟,有时候十几分钟
最后hang住的原因是hadoop2.x commit job是在driver端单线程做的,当结……继续阅读 »
wangting
4年前 (2019-10-12) 905浏览
4个赞
对query按PV排序的时候, 特别慢, 原因是query往往很长尾, 排序的时候分桶不均匀, 排序时候在PV后面加一个随机数可以解决。
queryList.sortBy(x => (x.pv, Random.nextInt), ascending = false)
……继续阅读 »
wangting
5年前 (2019-09-16) 2261浏览
4个赞
上篇文章讲到DAGScheduler会把job划分为多个Stage,每个Stage中都会创建一批Task,然后把Task封装为TaskSet提交到TaskScheduler。
这里我们来一起看下TaskScheduler是如何把Task分配到应用程序的Executor上去执行。
重点是这里的task分配算法。
如下图是DagScheduler中把TaskSe……继续阅读 »
wangting
5年前 (2019-07-13) 1467浏览
5个赞
DAGScheduler的主要作用有2个:
一、把job划分成多个Stage(Stage内部并行运行,整个作业按照Stage的顺序依次执行)
二、提交任务
以下分别介绍下DAGScheduler是如何做这2件事情的,然后再跟源码看下DAGScheduler的实现。
一、如何把Job划分成多个Stage
1) 回顾下宽依赖和窄依赖
窄依赖:父RDD的每个分……继续阅读 »
wangting
5年前 (2019-06-24) 1472浏览
5个赞
Master作为Spark Standalone模式中的核心,如果Master出现异常,则整个集群的运行情况和资源都无法进行管理,整个集群将处于无法工作的状态。
Spark在设计的时候考虑到了这种情况,Master可以起一个或者多个Standby Master,当Master出现异常的时候,Standy Master 将根据一定规则确定一个接管Master。……继续阅读 »
wangting
5年前 (2019-06-14) 1026浏览
4个赞
SparkContext是整个spark程序通往集群的唯一通道,他是程序的起点,也是程序的终点。
我们的每一个spark个程序都需要先创建SparkContext,接着调用SparkContext的方法, 比如说 sc.textFile(filepath),程序最后也会调用sc.stop()来退出。
让我们来一起看下SparkContext里面到底是如何实现……继续阅读 »
wangting
5年前 (2019-06-07) 2080浏览
4个赞
之前在 大话Spark(2)里讲过Spark Yarn-Client的运行模式,有同学反馈与Cluster模式没有对比, 这里我重新整理了三张图分别看下Standalone,Yarn-Client 和 Yarn-Cluster的运行流程。
1、独立(Standalone)运行模式

独立运行模式是Spark自身实现的资源调度框架,由客户端、Master节点……继续阅读 »
wangting
5年前 (2019-06-05) 1369浏览
3个赞
Shuffle本意是 混洗, 洗牌的意思, 在MapReduce过程中需要各节点上同一类数据汇集到某一节点进行计算,把这些分布在不同节点的数据按照一定的规则聚集到一起的过程成为Shuffle.
在Hadoop的MapReduce框架中, Shuffle是连接Map和Reduce之间的桥梁, Map的数据要用到Reduce中必须经过Shuffle这个环节. 由……继续阅读 »
wangting
5年前 (2019-05-25) 5476浏览
2个赞
本文以WordCount为例, 画图说明spark程序的执行过程
WordCount就是统计一段数据中每个单词出现的次数,
例如hello spark hello you 这段文本中hello出现2次, spark出现1次, you出现1次.
先上完整代码:
object WordCount {
def main(args: Array[String])……继续阅读 »
wangting
5年前 (2019-05-24) 2080浏览
3个赞
Spark On Yarn 有两种运行模式:
Yarn – Cluster
Yarn – Client
他们的主要区别是:
Cluster: Spark的Driver在App Master主进程内运行, 该进程由集群上的YARN管理, 客户端可以在启动App Master后退出.
Client: Driver在提交作业的Clien……继续阅读 »
wangting
5年前 (2019-05-21) 3263浏览
4个赞