banner
bladedragon

bladedragon

Troubleshooting Spark Job Long Tail Issues and Considerations for Optimizing Small Files

However, recently when testing spark on k8s, I encountered some performance issues, so I recorded the troubleshooting process and did a case review.

20220202163506_2db51

Case Reproduction#

We use an underlying AWS eks cluster and build a traditional Hadoop YARN + Spark environment on top of it. In simple terms, we convert specific components of YARN, such as resourceManager and nodeManager, into pods in k8s, but the upper-level scheduling logic remains unchanged, creating a two-level scheduling system. The specific implementation and scheduling logic of this system are not relevant to the performance optimization case we are discussing today. We just need to understand that this is a traditional YARN Spark environment running on k8s.
The task I used is a small task from our production environment, with an expected runtime of 5 minutes, a medium amount of data, and the output table data is about 2,000 rows. The original production environment used an AWS EMR on EC2 cluster, which can be understood as a traditional container host cluster. Now, after migrating this task to the eks cluster, the runtime has reached 43 minutes, so I observed its history UI:

IMG_export_20230415_163937706

IMG_export_20230415_163947995

It was found that the execution time of the SQL stages is similar to that of the production environment, with each stage taking only a few minutes. However, the entire spark job took 43 minutes to complete, indicating that there is a performance issue at the end of the job, leading to the occurrence of a long tail phenomenon.

Preliminary Analysis#

Since the same SQL as the production task is used, and the amount of data read and written is exactly the same, we can exclude the impact caused by business logic.
After checking the driver logs, it was found that there were almost no effective logs after the stage ended, and all tasks had already been executed. Observing the cluster, the utilization rate of the executors was also very low.
Since there is a time-consuming issue, there must be time-consuming threads running in the background, so I checked the thread dump of the spark driver.

The real reason I found was that it was stuck on the s3 rename operation.

Performance Issues Caused by s3 Rename Operation#

First, let's talk about rename. Spark's rename refers to the process of generating temporary files for reading and writing data during the job submission process to maintain data consistency. When a task is completed, the temporary file will be renamed as the official file. When a job is completed, all files in the temporary directory of the job will be renamed as official files.
The directory format is roughly as shown in the figure:

image

The driver will select the appropriate output committer through FileFormatWriter and start the writer job. The committer will determine how to submit the job and tasks. The submission process is shown in the following figure:

image

The rename operation occurs during the submission of the job and tasks. The specific rename strategy depends on the strategy of the committer. The details of the committer will be mentioned later.

Why does the rename operation on s3 cause performance issues?
AWS's s3, including most object storage systems, treats directories as objects themselves. Therefore, renaming a directory in s3 requires a list-copy-delete operation, which is much more expensive compared to a simple rename operation in a file system such as HDFS. In Spark, a large number of small files may be generated, and even for HDFS, renaming tens of thousands of small files can still be optimized for performance, not to mention s3.

Spark's File Commit Protocol#

Before discussing how to optimize, let's review the Spark file commit process related to this. From the previous figure, we can see that during the job submission process, Spark actually calls the committer of Hadoop to adopt a specific commit strategy. The committer needs to solve the following problems mainly:

  1. Handling data consistency issues caused by failed file writes and retries
  2. Ensuring data correctness when multiple tasks write to the same file during speculative execution
  3. Improving the efficiency of massive file read and merge operations

Currently, Hadoop provides two file commit methods, which can be switched through mapreduce.fileoutputcommitter.algorithm.version.

FileOutputCommitter V1#

Commit Process

  1. First, the TaskAttempt writes TaskAttempt Data to a temporary directory: ${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}
  2. When the task data is written and can be submitted after the task is completed, the commitTask is executed to move the above directory to the official directory: ${target_dir}/_temporary/${appAttempt}/${taskAttempt}
  3. When all tasks are completed, the commitJob is executed to move all files and directories under ${target_dir}/_temporary to the ${target_dir} official directory, and add the _SUCCESS identifier to the current directory to indicate a successful commit.

Data Consistency Issues

  1. During the TaskAttempt write stage, if a task write fails and needs to be retried, it only needs to rewrite all files under /_temporary/ in the ${taskAttempt} directory, and the original official Attempt directory can be retained.
  2. If an application retry occurs, the previously submitted data can be directly recovered by renaming the official directory files under the ${appAttempt} directory to the current ${appAttempt} directory.
  3. Since there are two renames, V1 is actually a two-phase commit. The data consistency before and after the rename can be guaranteed. The only possibility of data inconsistency is during the rename process.

Performance Issues
The strong consistency of V1 brings negative effects, which is the two rename operations that may cause time-consuming issues, especially in the commitJob stage. Since the commit is executed by the driver in a single-threaded and serial manner, if a large number of files need to be renamed, the time-consuming may be very long.

FileOutputCommitter V2#

  1. First, the TaskAttempt writes TaskAttempt Data to a temporary directory: ${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}
  2. When the task data is written and can be submitted after the task is completed, the commitTask is executed to move the above directory to ${target_dir}. Note that this is a direct move to the official directory.
  3. When all tasks are completed, since all data is already saved in the official directory, the commitJob is simply executed by adding the _SUCCESS identifier to indicate a successful commit.

Data Consistency Issues

  1. During the TaskAttempt write stage, if a task write fails and needs to be retried, the task data may have already been moved to the official directory, so dirty data may occur.
  2. If an application retry occurs, since the previously submitted data already exists in the official directory, no additional renaming operation is required. The retry can continue with other data. However, the already submitted data may not be completely correct, and there may be dirty data among them.
  3. It can be seen that V2 sacrifices a certain level of data consistency and chooses an eventual consistency solution. Due to the lack of intermediate processes to ensure data correctness, only the _SUCCESS identifier can determine whether the data is correct. At the same time, this brings another problem. Due to the existence of dirty data, these dirty data may not be correctly cleaned up during long-running tasks, resulting in additional storage overhead.

Performance Issues
The reason why V2 adopts an eventual consistency solution is to reduce the time-consuming overhead caused by excessive rename operations in V1. Compared with V1, V2 only needs to rename to the official directory after the task is completed, and it can be done through parallel operations of task threads, greatly reducing the execution time.

Small File Optimization#

Although the different algorithms of the committer provide choices in terms of consistency and performance, they all have their pros and cons. However, in actual scenarios, everyone's choice always hopes to have both.

fm=173&fmt=auto&h=212&img_JPEG=&s=198008D41E4200570CB830AA0300E012&u=1102103420,837649793&w=393

In addition to optimizing during the rename stage, optimizing the performance killer: small files, has also become an effective method.

Existing Optimizations in Spark:#

In Spark, there are built-in optimizations for small files from the perspective of file generation:

  • spark.files.openCostInBytes: Use this parameter to set the estimated size of opening a file. Setting it higher can improve the speed of partitioning small files.

From the perspective of business considerations, the general idea is to reduce the number of partitions to merge small files into large files:

  • Use coalesce or repartition operations to merge partitions.
  • Reduce the use of dynamic partitioning or use distribute by to control the number of reducers.
  • Use partitioned tables more to reduce the number of partitions generated during queries.
  • Or use more advanced file compression formats to improve the performance of small file processing.

AWS's Special Optimization:#

Since we use AWS EMR in the production environment, we also have some understanding of AWS's measures for small file optimization on s3.

  1. Multi-upload: The principle is to improve the performance of s3 read and write operations by using concurrent reading and writing of file fragments. Based on this, EMRFS S3-optimized Committer and s3a Committer (open source) have been derived. Note that these committers use the FileOutputCommitter V2 method by default, so the issues existing in V2 will also exist in these committers.
  2. Using HDFS acceleration: In EMR, considering that the file system has better performance for operations like rename, can't we rename on the file system first and then submit to s3? In EMR, before submitting the file to s3, it is uploaded to a HDFS-like file system for rename or file merge operations, and then uploaded to s3. This can bring significant benefits in terms of performance compared to pure s3 read and write. Of course, the downside is that maintaining a separate file system has higher costs.

Other Optimization Ideas:#

Our team has also optimized the file merge of small files. The basic idea is to create a new job for merging small files at the end of the job, using the SqlHadoopMapReduceCommitProtocol to achieve plug-in extension.
The merge process is to obtain the partition information of the data after the commitTask, then group and merge them, and finally, during the commitJob, directly move the merged files to the official directory. The basic idea is shown in the following figure:

application-and-practice-of-spark-small-file-mergi1

The advantages of merging small files in this way are:

  • This feature is plug-and-play and has low invasiveness to the original code.
  • It has obvious advantages in scenarios with a large number of small files.

Disadvantages:

  • A new job is started for optimization, and two additional stages are added at the end of the task for small file merging. This will introduce more tasks and bring some time-consuming overhead.

Conclusion#

By enabling this feature, I reran the task, and the final runtime was significantly reduced:

IMG_export_20230416_174701801

Of course, the optimization is not completely finished. The overall runtime on the eks cluster is still higher than that of the original EMR task. However, the in-depth troubleshooting of this issue will be shared next time when there is time.

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.