banner
bladedragon

bladedragon

Introduction to Columnar Shuffle Based on Velox

What is velox#

velox is an open-source database execution acceleration toolkit written in C++. Its advantage is to optimize calculations using native methods and leverage system-level optimization techniques such as vectorization technology to accelerate execution plans. Therefore, it can also be called a native compute engine. Integrating velox into traditional data computing engines can significantly improve computational performance and query efficiency in many common scenarios. However, there may still be some adaptation costs when introducing it into production. Although there are open-source projects like gluten that can quickly integrate velox into Spark, simplifying the process of adapting to traditional big data computing architectures, there are still some adaptation costs. Optimization does not cover all query conditions 100%, so it is recommended to use it after performing more performance verification and testing evaluations for your own use cases.

Columnar Shuffle#

In my use case, the main integration point of velox into the Spark computing process is shuffle. Because the data structure in velox is columnar, which is quite different from Spark, which processes shuffle data on a row-by-row basis. If the native shuffle is used, there will be a cost of row-column conversion. If the Remote Shuffle Service (RSS) is used, the performance loss will be further increased, which is almost unacceptable in scenarios with large shuffles. Therefore, we need a native columnar shuffle to solve this problem.

In the open-source project gluten, many optimizations have been made to better integrate velox into Spark, including the implementation of a native columnar shuffle. It also integrates with Apache Celeborn to adapt to RSS. The following mainly introduces the implementation of columnar shuffle in gluten in these two scenarios.

Local Shuffle#

We focus on the shuffle writer and reader, with the writer being the main focus of the transformation as it involves how to handle the data prepared by the upstream map.

ColumnarShuffleWriter#

How to implement columnar shuffle is essentially how to solve the row-column conversion. Therefore, we need to process the upstream columnar data and then perform the shuffle process.
The columnar data structure of velox is ColumnarBatch, which contains all the data waiting to be processed. Its structure is similar to the following figure:

image

There are several issues to consider when processing this data structure:

  1. How to associate row data with column data?
  2. How to store large amounts of data (to prevent OOM)?
  3. How to ensure performance?

The first issue is actually the most basic one. We know that there are three types of ShuffleWriters in Spark, and the columnar shuffle writer in gluten, ColumnarShuffleWriter, is mainly based on the transformation of HashBasedShuffleWriter. The mapping of row-column relationships is also mainly based on hash mapping. The advantage of this approach is that it is simple to implement and avoids sorting to prevent a large number of random reads and writes from affecting performance. However, HashBasedShuffleWriter will also generate a large number of files. The memory overhead is also a problem. However, gluten also incorporates the design of SortBasedShuffleWriter into this writer, which we can feel in the subsequent process.

For the second issue, gluten will split the ColumnBatch in memory, and only process a portion of the data at a time. This allows more data to be processed in memory and reduces the impact of network transmission in the RSS scenario.

For the third issue, gluten has optimized many details. For example, it uses Arrow to manage memory and reuse memory to reduce the risk of OOM and reduce the probability of spillage. It fully utilizes CPU cache and avoids random writes, and so on.

The overall process design of ColumnarShuffleWriter is shown below:

Columnar Shuffle

Note that almost all processes are implemented in velox.

  1. Get each record mapped to a columnBatch.
  2. Calculate the correspondence between partitions and rows based on the partitioner.
  3. Build a mapping table to complete the mapping relationship between partition id and row id.
  4. Pre-allocate memory for loading data multiple times.
  5. Call several split functions to split and load data into the cache.
  6. If there is not enough memory, spill the data to a file.
  7. Finally, complete the write operation and merge the data in memory and the spilled files to form the final file.

Building the partition2Row relationship#

One of the important steps is to build the two arrays: partition2RowOffset and rowOffset2RowId. These arrays store the mapping relationship between partition, column, and row, and determine the valid partitions (if a partition does not have incoming data, it will not be allocated pre-allocated memory later).

Split function#

The split phase traverses split functions, which are responsible for splitting the rowVector converted from ColumnarBatch into each partition's pre-allocated memory based on the partition. When the memory reaches the overflow requirement, the data in memory will be spilled.

The format of the rowVector is as follows:
image

  • The split function mainly includes four functions, each handling different types of columns.
  • splitFixedWidthValueBuffer splits columns with fixed bit width, usually column types (such as int, boolean).
  • splitValidityBuffer splits valid byte values to handle null values.
  • splitBinaryArray splits data of binary queues.
  • splitComplexType splits complex types (struct, map, list).

Before splitting, the memory buffer preAllocPartitionBuffer is initialized to ensure that the split data can be completely loaded into memory. Because the split data will be traversed multiple times, the size of each split should be controlled to fit into the CPU L1/L2 cache as much as possible, which can achieve better performance.

preAllocBuffer#

gluten uses Arrow to implement memory management and data exchange.

Based on the split data mentioned earlier, each partition will pre-allocate a reusable and resizable memory buffer to cache the split data.

The formula for calculating the pre-allocated size is as follows:

# Pre-allocated size
memLimit = max(sparkCurrentOffHeapMemory / taskNum, 128 * 1024 * 1024)
# The maximum number of rows that can be stored in the cache (newSize)
maxRows = memLimit / bytesPerRow / numPartitions_ / 4

The buffer is a dynamically scalable memory block, and Arrow is used for fine-grained control. The condition for triggering memory scaling (there will be more actual memory scaling situations) is:

# The default scaling factor THRESHOLD is 0.25
newSize < (1 - THRESHOLD) || newSize > (1 + THRESHOLD)

The complete memory scaling situation is as follows:
image
The "evict and free data" will spill the data in memory. Each spilled data will be copied and output through Arrow. The initial size is 4096, and it will be increased if it is not enough.

Spill and merge#

Although the design of ColumnarShuffleWriter is a hash-based shuffle, gluten incorporates some unsafe sort design ideas on top of it.
In the local spill scenario, when the memory usage reaches the threshold or encounters OOM, the data in memory will be compressed and spilled to disk files multiple times. After multiple rounds of processing ColumnarBatch, all spilled files will be merged into one file.

  1. The data in each spill file is written in the order of partition id.
  2. When merging, all files will be traversed according to the partition id to merge the data of each partition.

ColumnarShuffleReader#

The design of the reader side is relatively simple, and the native ShuffleReader can be reused in most cases because the data pulled from the map side has not changed much. The main task is to convert the pulled data into ColumnarBatch for downstream use. Only the deserializer needs to be rewritten to achieve this.

Remote Shuffle#

The design ideas of remote columnar shuffle are basically the same as local shuffle. The main problem lies in how to correctly push the data to RSS. In the design of gluten adapting to Celeborn, the writer and reader are re-implemented. The basic idea is similar to local shuffle, using the native engine to implement columnarBatch splitting and pushing, and using a custom deserializer on the reader side to obtain columnarBatch.

image

  1. Get each record mapped to a columnBatch.
  2. Calculate the correspondence between partitions and rows based on the partitioner.
  3. Build a mapping table to complete the mapping relationship between partition id and row id.
  4. Pre-allocate memory for loading data multiple times.
  5. Call several split functions to split and load data into the cache.
  6. Spill the data to Celeborn when the cache exceeds the limit.

The basic steps mentioned earlier have been introduced in the Local Shuffle section. The data is pushed to Celeborn by passing the Celeborn client to the native engine through JNI, which is responsible for pushing the spilled data to Celeborn, and then completing the data merge on Celeborn.

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.