xChar

一些笔记的归档。

Hadoop CommitProtocol

hadoop commitProtocol(全称HadoopMapReduceCommitProtocol)是一套用于用于提交文件到文件系统的规则。是hadoop抽象出来的,为了实现存算分离,兼容不同底层文件系统的机制。

这里我们假定:job为某种需要并行计算的查询作业;在大数据计算框架中,这种作业往往由多个task组成,以DAG的方式触发执行;job由调度程序JobManager调起,并将task分发到各个executor中执行作业,最后聚合并返回最终结果。

那么我们可以将commitProtocol的基本执行逻辑简化成如下方法的集合:

  1. setupJob(): 在执行程序初始化job,例如创建必要的临时目录等
  2. setupTask(): 在executor初始化task,创建临时目录等
  3. needTaskCommit(): 是否需要提交task
  4. commitTask(): 提交task,将task产生的临时输出提交到最终的输出路径
  5. abortTask(): 当需要终止task时,需要清理task产生的临时文件
  6. commitJob(): 当确保所有task提交完成后,将提交整个job的最终结果
  7. abortJob(): 终止并清理job

image (1)

commitProtocol 相当于是指导Mapreduce作业具体运行在存储系统上的一套标准。考虑到在MapReduce作业一般都是大规模的耗时作业,其中难免会存在各种异常导致部分task失败。因此commitProtocol的实现也需要格外考虑数据正确性和事务一致性。

FileOutputCommitter

和HadoopCommitProtol专注于作业整体的提交流程不同,committer专注于具体的commit操作(taskCommit和jobCommit)。在hadoop原生实现中,主要实现在FileOutputCommitter中,并针对性能和隔离性实现了两种算法

commitV1

在commitTask时,数据文件会从task临时目录rename到job临时目录中,并在commitJob时,从job临时目录rename到正式目录。由于数据在job完成前保持良好的隔离性,失败作业可快速从上次临时目录中恢复数据。

commitV2

在commitTask时,数据文件会从task临时目录rename到job的正式目录,从而在commitJob时只需要确认数据完整并进行成功标记即可。但是由于commiTask阶段的rename操作对外可见,因此隔离性不佳,当作业失败时,也无法快速恢复。

Spark CommitProtocol

commitProtocol在Hadoop的经典实现是yarn 和spark上。由于现在基本上都从MR作业转向了spark作业,所以这里也只介绍spark的commitProtocol协议。

spark的commitProtocol实际上在hadoopCommitProtocol基础上进行了封装,使其适配spark的计算执行模型,同时补齐了hadoopCommitProtocol的部分缺陷,提升了扩展性。

除此之外,因为继承了hadoopCommitProtocol, spark自然也使用FileOutputCommitter用于写入文件。如果想要接入其他committer,也可以通过继承并重写commitProtocol的方式去实现,或者显式指定committer,这在spark中均有配置实现。

image-20231014130948900

其与hadoopComitProtocol的差异在于:

  1. 确保了task attempt 进行commitTask时的正确性(OutputCommitCoordinator)

spark 引入了OutputCommitCoodinator 跟踪每个stage里 task attempt的状态,如果某个task attempt成功,后续的所有attempt 发起的commitTask都会被拒绝。通过这种方式保证commitTask执行的正确性

image (2)

  1. 支持hive外表数据读写和动态分区重写

在hive表查询中,数据可能写到内部表,也可能写到外部表。外部表的数据往往存储在不同的位置。为了支持这种需要输出数据到不同路径的操作,spark commitProtocol允许记录文件的绝对路径newTaskTempFileAbsPath().。在读取文件时将文件的绝对路径保存下来,并在写入时获取绝对路径写入。

动态分区重写指的是spark 在写入数据得时候允许只重写指定分区的数据。避免重写整张表,增大开销。
什么是动态分区:

若sql中未指定分区字段的具体值,使得该分区可以在计算中自动推断出,即为动态分区。

例子:

insert overwrite table ${table_name} partition (dayno=${v_day}, hour=${v_dayhour_hh}, region)
select * from ${query_table_name}
where dayno = ${v_day};

开启动态分区重写的条件:

  1. spark.sql.sources.partitionOverwriteMode=dynamic
  2. SaveMode=overwrite
  3. sql中存在动态分区的字段

动态分区重写的基本逻辑

  1. 如果开启动态分区重写,spark会将job目录下生成的临时数据文件生成在staging目录下
  2. 针对外部表的数据路径,在每次rename 绝对路径的分区文件前,需要先清理并重建其父级目录避免rename时不存在(overwrite)
  3. 针对内部表的数据路径,由于commitJob后路径仍然在staging目录下,所以需要将所有文件和目录从staging目录rename到正式目录

参考issue:[SPARK-35106]

存在的问题

基于之前一篇文章, 当我们通过Hadoop MapReduce或者Spark将数据写入到S3时,就会存在相当严重的的性能问题。

在s3中,只提供了以下六种基础操作,所以其他操作只是这几种操作的衍生。例如rename =list + copy + delete.这导致rename操作是非原子性的。传统文件rename操作需要在s3上实现CREATE + DELETE操作,目录的rename则还需要额外LIST 所有文件的操作,这导致rename操作不是原子的,而且随着文件数的增加,rename速度会越来越慢。

操作名解释
PUT对象的原子写操作
GET获取对象
HEAD获取对象的元数据
LIST基于前缀列出所有对象
COPY复制单个对象
DELETE删除对象

而rename在传统的commit算法中是不可或缺的一环,这将直接导致commit算法的性能和正确性受到了挑战。

另外,在spark中也存在staging操作。staging是spark作业中重要的步骤,在动态分区重写的时候,必须要通过staging来保障数据的一致性。但是staging中同样存在文件的rename操作,在s3的场景,这样的操作会带来很大的开销。

S3A Committer

随着在s3上处理数据规模的增大,除了上述提到了6种基础操作,AWS还额外提供了两种操作用来应对大规模的数据传输。

  • Bulk Delete
  • Multipart Upload
    后者将是解决s3 在提交文件性能问题上的关键。

简单介绍一下Multipart Upload机制(后面简称MPU):
multiPartUpload主要分为三个阶段

  1. 初始化
  2. 上传块
  3. 完成最终POST操作

参考 aws 官方文档

在上传过程中,已上传的块在s3中是不可见的,但是会占用实际存储。只有当s3最后POST操作完成后,s3的manifest才会写入该文件,该文件才会在s3上可见。

因此。hadoop 的开发人员充分利用了该机制,设计了两种S3A Committer: Staging CommitterMagic Committer

Staging Committer

在stagingCommitter上,数据在taskAttempt时被加载到本地进行计算,并在commitTask的时候上传到s3上,此时文件被直接MPU到最终的job目录上,但是文件信息被上传到HDFS上进行 传统的FileOutputCommitter的V1算法操作。commitJob阶段,最终在hdfs上完成最后的rename后,执行最终POST操作,上传的文件在s3上才最终可见

Magic Committer

在magicCommitter上,数据从一开始就允许直接写到s3上,不需要落到本地。magicCommitter会在job目录下创建一个_magic目录,taskAttempt最后输出的文件都已magic文件流的形式MPU到这个目录下。注意此时并没有完成MPU,该次MPU的manifest会写到文件同名的一个.pending文件中(同样在_magic目录下)。在commitTask时,由于文件数据已经上
传到s3上,只需要加载taskAttempt路径下所有.pending文件,聚合成.pendingset文件上传。在commitJob阶段,则是获取所有.pendingset文件,这里就已经获取到所有需要commit的文件的manifest,因此依次完成最终POST操作即可。


更多详细内容,可以去阅读Hadoop中committer实现的源码来深入学习。

总的来说,S3A Committer 都是通过延迟文件最后提交操作来避免rename操作。基于当前s3 的版本,两种committer基本都能解决性能问题。不过,hadoop对committer的实现更多考虑了通用性,而针对部分特化的业务场景,则没有给到足够的支持。(简单讲就是还无法做到开箱即用)例如在spark的动态分区重写机制以及写入hive外部表的机制,仍然需要自己实现。

Loading comments...