Complex Query Tuning
Complex Query Execution Model
Analytical queries can be divided into simple queries and complex queries. Simple queries typically involve single-table retrieval aggregation or joins between large and small tables, resulting in fast query responses. Complex queries, on the other hand, involve large tables, multiple table joins, and complex logical processing, often leading to slow query responses and high resource consumption. ByConity has optimized its design for complex queries.
Simple queries can adopt a two-stage execution model, with the partial stage executed on the compute nodes and the final stage executed on the service nodes. However, when it comes to executing complex queries involving multiple aggregations or joins, the flexibility of the two-stage execution model is significantly reduced, making query optimization challenging. To better support distributed queries and facilitate the execution plans generated by the optimizer, we have introduced a complex query execution model that supports multiple rounds of distributed execution.
Diagram of Complex Query Execution Model
The execution flow of a complex query is as follows:
- The Query SQL String is parsed into an AST (Abstract Syntax Tree) by the parser.
- The AST undergoes rewriting and optimization to generate an execution plan.
- When the optimizer is enabled, it generates the execution plan.
- The execution plan is divided into multiple PlanSegments.
- A PlanSegment represents an execution fragment in the distributed execution process, which includes:
- The required AST fragment or a logical execution plan fragment composed of PlanNodes.
- Node information for PlanSegment execution.
- Upstream and downstream information for the PlanSegment, including the input stream from upstream and the required input stream for downstream.
- The engine's scheduler constructs a DAG (Directed Acyclic Graph) from these PlanSegments and topologically sorts them before distributing them to all nodes in the cluster.
- Each node receives its PlanSegment and begins executing it.
- PlanSegments containing data sources start reading the data and distributing it to downstream nodes according to shuffle rules.
- PlanSegments with exchange inputs wait for upstream data and continue shuffling the data to various nodes if necessary.
- After multiple stages are completed, the results are returned to the server.
How to Enable Query-Level Knobs
Enabling the optimizer automatically triggers the complex query execution model. This can be done by configuring enable_optimizer=1
or setting dialect_type='ANSI'
.
- Ensuring the Existence of Statistics
Without statistics, the generated query plan may not be optimal. You can use show stats [<db_name>.]<table_name>
to check the statistics.
- Analyzing the Execution Time of Each Step
Using explain analyze sql
can display the execution time of each step.
- Tuning Parameters
ByteHouse supports query-level parameter tuning manually, which means these parameters can be set separately for different queries. Parameters related to exchange operator can also be set in this way. You can specify parameter values for a query via keyword SETTINGS
as follows:
SELECT 1 SETTINGS TEST_KNOB=1;
When this SQL statement is executed, the parameter TEST_KNOB will be forcibly set to 1. Note that setting parameters in this way does not affect the parameter values of other queries.
We show some query-level parameters that have important impacts on performance in the following sentence.
- The exchange operator is responsible for data transmission between PlanSegments.
exchange_source_pipeline_threads
affects the total number of threads for pipeline execution, especially for non-source pipelines (those that read data directly from storage). The default setting is the number of CPU cores. There is no ideal recommended value, but considering halving or doubling the value and observing the results can be helpful. If the query has a high memory footprint, reducing this value may be beneficial.
exchange_timeout_ms
is the exchange timeout duration in milliseconds, with a default value of 100 seconds. If Exchange-related timeout errors occur, increasing this value may be appropriate.
exchange_unordered_output_parallel_size
affects the concurrent ability of Exchange to output data, with a default value of 8. Generally, there is no need to adjust this value. However, if exchange_source_pipeline_threads
is adjusted significantly, it may be beneficial to increase exchange_unordered_output_parallel_size
to enhance upstream output capability.
exchange_enable_block_compress
determines whether to enable Exchange compression, which is enabled by default. If the CPU bottleneck is more significant than the network bottleneck, disabling this option may be worth considering.
exchange_parallel_size
determines the number of partitions that single-partition data is shuffled to downstream, with a default value of 1. Generally, there is no need to adjust this value. Only in scenarios where the exchange is required to be ordered in the future, it is necessary to increase the number of partitions by adjusting exchange_parallel_size
to enhance the concurrent processing capability of downstream operators.
exchange_local_receiver_queue_size
represents the queue size for local exchange receiver. It implements soft flow control through the asynchronization of operators and the synchronization of operator states. It sends two signals, idle and backlog, to the Sender. The Sender lowers the transmission priority of the backlogged Stream and raises the priority of the idle Stream.
exchange_remote_receiver_queue_size
represents the queue size for remote exchange receiver.
exchange_buffer_send_threshold_in_bytes
limits the minimum bytes when exchange will flush send buffer, which determined the backlog threshold size of the receiver.
exchange_buffer_send_threshold_in_row
limits the minimum number of rows when exchange will flush send buffer, which determined the backlog threshold size of the receiver.
- There are some parameters that can take effect for the entire database as well as for a single query.
max_threads
limits the maximum number of threads to execute the request. By default, it is determined automatically.
max_block_size
limits the maximum block size for reading. By default, it is 65536.
- Routing-related parameters are usually used to determine the storage and access paths of data in distributed systems.
group_by_two_level_threshold
represents from what number of keys, a two-level aggregation starts.
group_by_two_level_threshold_bytes
represents from what size of the aggregation state in bytes, a two-level aggregation begins to be used.
- Runtime filter aims to dynamically generate filtering conditions for certain join queries during runtime to reduce the amount of data scanned, avoid unnecessary I/O and network transmissions, and thereby accelerate the queries. There are some parameters related to runtime filter below.
runtime_filter_min_filter_rows
limits the minimum row num to enable runtime filter.
runtime_filter_bloom_build_threshold
limits the threshold of right table to build bloom filter.