xChar

什么是velox

velox是一个基于C++编写的开源数据库执行加速工具库。它的优势是利用native方法来优化计算,深度利用系统级的优化手段例如向量化技术来加速执行计划。因此,也可以将其称为是一个native compute engine。将velox集成到传统的数据计算引擎中在不少通用场景中可以显著提升计算性能,提高查询效率,不过将其引入生产依然还会有一定的问题。尽管有类似gluten这样的开源项目可以快速将velox集成进spark,从而简化适配传统大数据计算架构的流程,但依然会存在一些适配成本;优化也并非百分百覆盖所有查询条件,因此建议针对自己的使用场景经过较多性能验证和测试评估后再决定是否使用。

列式shuffle

在我的使用场景中,主要是将velox结合到spark的计算流程中,其中一个比较大的适配点是shuffle。因为在velox中的数据结构是列式的,这和传统以行为单位处理shuffle数据的spark有着比较大的区别。如果使用原生shuffle,会存在一个行列转换的开销。如果使用RSS(Remote Shuffle Service),性能的差距会进一步增大,这在大shuffle的任务场景就几乎不可接受。因此我们需要一个原生的列式shuffle来解决这类问题。

在开源项目中gluten中,为velox更好的集成到spark做了很多优化,包括实现了原生的列式shuffle。同时也接入了apache celeborn实现了对RSS的适配。下面就主要介绍一下gluten在这两个场景下的列式shuffle的实现。

Local Shuffle

我们关注shuffle主要就是关注shuffle的writer和reader,其中writer是改造的重点,因为这涉及到如何处理上游准备map的数据。

ColumnarShuffleWriter

如何实现列式shuffle,本质上是如何解决行列转换。因此我们需要对上游的列式数据进行加工处理,再进行shuffle流程。
velox的列式数据结构为ColumnarBatch ,其中包含了所有等待处理的数据。其结构类似下图:

image

对该数据结构处理需要考虑这几个问题:

  1. 如何将行数据和列数据对应起来?
  2. 遇到大数据量如何存储(防止OOM)
  3. 如何保证性能?

第一个问题其实也是最基础的问题。我们知道在spark中总共有三种ShuffleWriter,gluten的列式shuffle writer:ColumnarShuffleWriter主要是基于HashBasedShuffleWriter改造。对于行列关系的映射也主要是基于hash映射。这样做的好处是实现简单,同时避免sort防止大量随机读写影响性能,但是HashBasedShuffleWriter也会产生大量文件。另外对内存的开销也是一个问题。不过gluten也在这个writer上融入了SortBasedShuffleWriter的设计,这点我们在后面的流程中可以感受到。

第二个问题,gluten会在内存中对ColumnBatch切分,每次只处理一部分的数据,这样不仅可以在内存中处理更多的数据,也可以在RSS的场景中降低网络传输带来的损耗。

第三个问题,gluten在很多细节进行了优化。例如,利用arrow来管理内存,对内存进行复用,从而降低OOM的风险,减少spill的概率;充分利用CPU缓存;避免随机写等等。

下面展示ColumnarShuffleWriter的整体流程设计:

列式shuffle

注意,这里基本所有的流程都在velox中实现。

  1. 获取每个record映射成columnBatch
  2. 根据分区器,计算分区和row的对应关系
  3. 构建映射表,完成partition id -> rowid的映射关系
  4. 预分配内存用于多次装载数据
  5. 调用若干split function, 切分数据装进缓存
  6. 如果内存不足,将数据spill到文件
  7. 最后完成write,merge 内存数据和spill文件,形成final file

构建partition2Row的关系

其中比较重要的是构建了这两个数组: partition2RowOffset 和 rowOffset2RowId
这里保存了partition、column和row之间的映射关系,并确定有效的partition(如果partition没有数据进
来,后面就不会给预分配内存)

split function

split阶段会遍历split函数,负责将ColumnarBatch转成的rowVector根据partition 切出来放进每个
partition预分配好的内存,当内存达到溢出要求后,会将内存中的数据spill出来。

rowVector的格式样例如图
image

  • split function主要包含四种函数,分别处理不同类型的column
  • splitFixedWidthValueBuffer 将固定位宽的列切分,一般是column type(例如int,boolean) splitValidityBuffer 将有效的字节值切分,处理null值
  • splitBinaryArray 将二进制队列的数据切分
  • splitComplexType 将复杂类型(struct, map, list)切分

切分前会首先初始化内存Buffer:preAllocPartitionBuffer,确保切分后的数据能完整装进内存
因为切分的数据会被多次遍历,因此在实际场景应当控制每次切分的数据大小尽可能装入CPUL1/L2 缓存,这样可以达到一个比较好的性能

preAllocBuffer

gluten基于arrow来实现内存管理和数据交换。

基于前面切分后的数据,每个partition会预分配一块可重用,可resize的内存用于缓存切分数据

预分配的大小计算公式:

#预分配大小
memLimit = max(sparkCurrentOffHeapMemory / taskNum, 128 * 1024 * 1024)
#缓存中最大可以存放的rows num(newSize)
maxRows = memLimit / bytesPerRow / numPartitions_ / 4

buffe是一块动态伸缩的内存,利用arrow做了精细化的控制:通常触发伸缩的条件(实际内存伸缩的情况会更多)是、

#默认伸缩因子THRESHOLD是0.25
newSize < (1 - THRESHOLD) || newSize > (1 + THRESHOLD)

其完整的内存伸缩情况如下图:
image
其中evict and free data就会把内存的数据spill出去,每次spill的数据会通过arrow 拷贝输出,初始大小是4096,如果不够会自行扩大

spill and merge

ColumnarShuffleWriter的设计虽然是hashBased shuffle,但是gluten在基础上融入了一些unsafe sort的设计思路。
在本地spill的场景中,每次当使用内存达到阈值或者OOM时,会将内存中的数据压缩并多次溢写到
磁盘文件中;当经过多轮columnarBatch处理,最后会把所有溢写文件合并成一个文件

  1. 每个spill文件中数据按照partition id的顺序写入
  2. 合并时会按照partition id 遍历所有文件,实现每个partition数据的合并

ColumnarShuffleReader

reader端的设计其实比较简单,基本可以复用原生的ShuffleReader,因为从map端拉取的数据基本没变,主要需要将拉取的数据转换成ColumnarBatch供下游继续使用。这里只需要重写反序列化器即可实现。

Remote Shuffle

remote columnar shuffle的设计思路和本地shuffle基本相同,主要问题在于如何将数据正确推送到RSS。
在gluten适配celeborn的设计中,重新实现了writer和reader。其基本思路和本地shuffle类似,通过native engine来实现columnarBatch的切分和push,在reader端通过实现反序列化器来获取columnBatch。

image

  1. 获取每个record映射成columnBatch
  2. 根据分区器,计算分区和row的对应关系
  3. 构建映射表,完成partition id -> rowid的映射关系
  4. 预分配内存用于多次装载数据
  5. 调用若干split function, 切分数据装进缓存
  6. 超过缓存上限溢写到celeborn

前面基本几点在Local Shuffle环节均已介绍,推送数据则是通过JNI将celeborn client传递到native engine,负责将溢写的数据push到celeborn,然后在celeborn上完成数据的merge。

Loading comments...