一些笔记的归档。
hadoop commitProtocol(全称HadoopMapReduceCommitProtocol)是一套用于用于提交文件到文件系统的规则。是hadoop抽象出来的,为了实现存算分离,兼容不同底层文件系统的机制。
这里我们假定:job为某种需要并行计算的查询作业;在大数据计算框架中,这种作业往往由多个task组成,以DAG的方式触发执行;job由调度程序JobManager调起,并将task分发到各个executor中执行作业,最后聚合并返回最终结果。
那么我们可以将commitProtocol的基本执行逻辑简化成如下方法的集合:
setupJob()
: 在执行程序初始化job,例如创建必要的临时目录等setupTask()
: 在executor初始化task,创建临时目录等needTaskCommit()
: 是否需要提交taskcommitTask()
: 提交task,将task产生的临时输出提交到最终的输出路径abortTask()
: 当需要终止task时,需要清理task产生的临时文件commitJob()
: 当确保所有task提交完成后,将提交整个job的最终结果abortJob()
: 终止并清理jobcommitProtocol 相当于是指导Mapreduce作业具体运行在存储系统上的一套标准。考虑到在MapReduce作业一般都是大规模的耗时作业,其中难免会存在各种异常导致部分task失败。因此commitProtocol的实现也需要格外考虑数据正确性和事务一致性。
和HadoopCommitProtol专注于作业整体的提交流程不同,committer专注于具体的commit操作(taskCommit和jobCommit)。在hadoop原生实现中,主要实现在FileOutputCommitter中,并针对性能和隔离性实现了两种算法
在commitTask时,数据文件会从task临时目录rename到job临时目录中,并在commitJob时,从job临时目录rename到正式目录。由于数据在job完成前保持良好的隔离性,失败作业可快速从上次临时目录中恢复数据。
在commitTask时,数据文件会从task临时目录rename到job的正式目录,从而在commitJob时只需要确认数据完整并进行成功标记即可。但是由于commiTask阶段的rename操作对外可见,因此隔离性不佳,当作业失败时,也无法快速恢复。
commitProtocol在Hadoop的经典实现是yarn 和spark上。由于现在基本上都从MR作业转向了spark作业,所以这里也只介绍spark的commitProtocol协议。
spark的commitProtocol实际上在hadoopCommitProtocol基础上进行了封装,使其适配spark的计算执行模型,同时补齐了hadoopCommitProtocol的部分缺陷,提升了扩展性。
除此之外,因为继承了hadoopCommitProtocol, spark自然也使用FileOutputCommitter用于写入文件。如果想要接入其他committer,也可以通过继承并重写commitProtocol的方式去实现,或者显式指定committer,这在spark中均有配置实现。
其与hadoopComitProtocol的差异在于:
spark 引入了OutputCommitCoodinator 跟踪每个stage里 task attempt的状态,如果某个task attempt成功,后续的所有attempt 发起的commitTask都会被拒绝。通过这种方式保证commitTask执行的正确性
在hive表查询中,数据可能写到内部表,也可能写到外部表。外部表的数据往往存储在不同的位置。为了支持这种需要输出数据到不同路径的操作,spark commitProtocol允许记录文件的绝对路径newTaskTempFileAbsPath().。在读取文件时将文件的绝对路径保存下来,并在写入时获取绝对路径写入。
动态分区重写指的是spark 在写入数据得时候允许只重写指定分区的数据。避免重写整张表,增大开销。
什么是动态分区:
若sql中未指定分区字段的具体值,使得该分区可以在计算中自动推断出,即为动态分区。
例子:
insert overwrite table ${table_name} partition (dayno=${v_day}, hour=${v_dayhour_hh}, region)
select * from ${query_table_name}
where dayno = ${v_day};
开启动态分区重写的条件:
spark.sql.sources.partitionOverwriteMode=dynamic
SaveMode=overwrite
动态分区重写的基本逻辑
参考issue:[SPARK-35106]
基于之前一篇文章, 当我们通过Hadoop MapReduce或者Spark将数据写入到S3时,就会存在相当严重的的性能问题。
在s3中,只提供了以下六种基础操作,所以其他操作只是这几种操作的衍生。例如rename =list + copy + delete
.这导致rename操作是非原子性的。传统文件rename操作需要在s3上实现CREATE + DELETE操作,目录的rename则还需要额外LIST 所有文件的操作,这导致rename操作不是原子的,而且随着文件数的增加,rename速度会越来越慢。
操作名 | 解释 |
---|---|
PUT | 对象的原子写操作 |
GET | 获取对象 |
HEAD | 获取对象的元数据 |
LIST | 基于前缀列出所有对象 |
COPY | 复制单个对象 |
DELETE | 删除对象 |
而rename在传统的commit算法中是不可或缺的一环,这将直接导致commit算法的性能和正确性受到了挑战。
另外,在spark中也存在staging操作。staging是spark作业中重要的步骤,在动态分区重写的时候,必须要通过staging来保障数据的一致性。但是staging中同样存在文件的rename操作,在s3的场景,这样的操作会带来很大的开销。
随着在s3上处理数据规模的增大,除了上述提到了6种基础操作,AWS还额外提供了两种操作用来应对大规模的数据传输。
简单介绍一下Multipart Upload机制(后面简称MPU):
multiPartUpload主要分为三个阶段
参考 aws 官方文档
在上传过程中,已上传的块在s3中是不可见的,但是会占用实际存储。只有当s3最后POST操作完成后,s3的manifest才会写入该文件,该文件才会在s3上可见。
因此。hadoop 的开发人员充分利用了该机制,设计了两种S3A Committer: Staging Committer 和Magic Committer
在stagingCommitter上,数据在taskAttempt时被加载到本地进行计算,并在commitTask的时候上传到s3上,此时文件被直接MPU到最终的job目录上,但是文件信息被上传到HDFS上进行 传统的FileOutputCommitter的V1算法操作。commitJob阶段,最终在hdfs上完成最后的rename后,执行最终POST操作,上传的文件在s3上才最终可见
在magicCommitter上,数据从一开始就允许直接写到s3上,不需要落到本地。magicCommitter会在job目录下创建一个_magic目录,taskAttempt最后输出的文件都已magic文件流的形式MPU到这个目录下。注意此时并没有完成MPU,该次MPU的manifest会写到文件同名的一个.pending文件中(同样在_magic目录下)。在commitTask时,由于文件数据已经上
传到s3上,只需要加载taskAttempt路径下所有.pending文件,聚合成.pendingset文件上传。在commitJob阶段,则是获取所有.pendingset文件,这里就已经获取到所有需要commit的文件的manifest,因此依次完成最终POST操作即可。
更多详细内容,可以去阅读Hadoop中committer实现的源码来深入学习。
总的来说,S3A Committer 都是通过延迟文件最后提交操作来避免rename操作。基于当前s3 的版本,两种committer基本都能解决性能问题。不过,hadoop对committer的实现更多考虑了通用性,而针对部分特化的业务场景,则没有给到足够的支持。(简单讲就是还无法做到开箱即用)例如在spark的动态分区重写机制以及写入hive外部表的机制,仍然需要自己实现。