未验证 提交 0d03b8c0 编写于 作者: A Anton Popov 提交者: GitHub

Merge pull request #21889 from CurtizJ/fix-aggregation-in-order-1

Fix bugs in aggregation by primary key
......@@ -1220,29 +1220,35 @@ Block Aggregator::prepareBlockAndFill(
return res;
}
void Aggregator::fillAggregateColumnsWithSingleKey(
AggregatedDataVariants & data_variants,
MutableColumns & final_aggregate_columns)
void Aggregator::addSingleKeyToAggregateColumns(
const AggregatedDataVariants & data_variants,
MutableColumns & aggregate_columns) const
{
AggregatedDataWithoutKey & data = data_variants.without_key;
const auto & data = data_variants.without_key;
for (size_t i = 0; i < params.aggregates_size; ++i)
{
auto & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]);
}
}
void Aggregator::addArenasToAggregateColumns(
const AggregatedDataVariants & data_variants,
MutableColumns & aggregate_columns) const
{
for (size_t i = 0; i < params.aggregates_size; ++i)
{
ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*final_aggregate_columns[i]);
for (auto & pool : data_variants.aggregates_pools)
{
auto & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);
for (const auto & pool : data_variants.aggregates_pools)
column_aggregate_func.addArena(pool);
}
column_aggregate_func.getData().push_back(data + offsets_of_aggregate_states[i]);
}
data = nullptr;
}
void Aggregator::createStatesAndFillKeyColumnsWithSingleKey(
AggregatedDataVariants & data_variants,
Columns & key_columns,
size_t key_row,
MutableColumns & final_key_columns)
MutableColumns & final_key_columns) const
{
AggregateDataPtr place = data_variants.aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(place);
......
......@@ -1295,14 +1295,18 @@ protected:
AggregateFunctionInstructions & instructions,
NestedColumnsHolder & nested_columns_holder);
void fillAggregateColumnsWithSingleKey(
AggregatedDataVariants & data_variants,
MutableColumns & final_aggregate_columns);
void addSingleKeyToAggregateColumns(
const AggregatedDataVariants & data_variants,
MutableColumns & aggregate_columns) const;
void addArenasToAggregateColumns(
const AggregatedDataVariants & data_variants,
MutableColumns & aggregate_columns) const;
void createStatesAndFillKeyColumnsWithSingleKey(
AggregatedDataVariants & data_variants,
Columns & key_columns, size_t key_row,
MutableColumns & final_key_columns);
MutableColumns & final_key_columns) const;
};
......
......@@ -24,11 +24,13 @@ FinishAggregatingInOrderAlgorithm::FinishAggregatingInOrderAlgorithm(
const Block & header_,
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
SortDescription description_)
SortDescription description_,
size_t max_block_size_)
: header(header_)
, num_inputs(num_inputs_)
, params(params_)
, description(std::move(description_))
, max_block_size(max_block_size_)
{
/// Replace column names in description to positions.
for (auto & column_description : description)
......@@ -56,6 +58,13 @@ void FinishAggregatingInOrderAlgorithm::consume(Input & input, size_t source_num
IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
{
if (!inputs_to_update.empty())
{
Status status(inputs_to_update.back());
inputs_to_update.pop_back();
return status;
}
/// Find the input with smallest last row.
std::optional<size_t> best_input;
for (size_t i = 0; i < num_inputs; ++i)
......@@ -94,16 +103,30 @@ IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
states[i].to_row = (it == indices.end() ? states[i].num_rows : *it);
}
Status status(*best_input);
status.chunk = aggregate();
addToAggregation();
/// At least one chunk should be fully aggregated.
assert(!inputs_to_update.empty());
Status status(inputs_to_update.back());
inputs_to_update.pop_back();
/// Do not merge blocks, if there are too few rows.
if (accumulated_rows >= max_block_size)
status.chunk = aggregate();
return status;
}
Chunk FinishAggregatingInOrderAlgorithm::aggregate()
{
BlocksList blocks;
auto aggregated = params->aggregator.mergeBlocks(blocks, false);
blocks.clear();
accumulated_rows = 0;
return {aggregated.getColumns(), aggregated.rows()};
}
void FinishAggregatingInOrderAlgorithm::addToAggregation()
{
for (size_t i = 0; i < num_inputs; ++i)
{
const auto & state = states[i];
......@@ -112,7 +135,7 @@ Chunk FinishAggregatingInOrderAlgorithm::aggregate()
if (state.to_row - state.current_row == state.num_rows)
{
blocks.emplace_back(header.cloneWithColumns(states[i].all_columns));
blocks.emplace_back(header.cloneWithColumns(state.all_columns));
}
else
{
......@@ -125,10 +148,11 @@ Chunk FinishAggregatingInOrderAlgorithm::aggregate()
}
states[i].current_row = states[i].to_row;
}
accumulated_rows += blocks.back().rows();
auto aggregated = params->aggregator.mergeBlocks(blocks, false);
return {aggregated.getColumns(), aggregated.rows()};
if (!states[i].isValid())
inputs_to_update.push_back(i);
}
}
}
......@@ -37,7 +37,8 @@ public:
const Block & header_,
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
SortDescription description_);
SortDescription description_,
size_t max_block_size_);
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
......@@ -45,6 +46,7 @@ public:
private:
Chunk aggregate();
void addToAggregation();
struct State
{
......@@ -66,8 +68,13 @@ private:
size_t num_inputs;
AggregatingTransformParamsPtr params;
SortDescription description;
size_t max_block_size;
Inputs current_inputs;
std::vector<State> states;
std::vector<size_t> inputs_to_update;
BlocksList blocks;
size_t accumulated_rows = 0;
};
}
......@@ -16,13 +16,15 @@ public:
const Block & header,
size_t num_inputs,
AggregatingTransformParamsPtr params,
SortDescription description)
SortDescription description,
size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, true,
header,
num_inputs,
params,
std::move(description))
std::move(description),
max_block_size)
{
}
......
......@@ -100,7 +100,8 @@ void AggregatingStep::transformPipeline(QueryPipeline & pipeline, const BuildQue
pipeline.getHeader(),
pipeline.getNumStreams(),
transform_params,
group_by_sort_description);
group_by_sort_description,
max_block_size);
pipeline.addTransform(std::move(transform));
aggregating_sorted = collector.detachProcessors(1);
......
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Core/SortCursor.h>
#include <ext/range.h>
namespace DB
{
......@@ -58,6 +59,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
LOG_TRACE(log, "Aggregating in order");
is_consume_started = true;
}
src_rows += rows;
src_bytes += chunk.bytes();
......@@ -82,58 +84,55 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
res_aggregate_columns.resize(params->params.aggregates_size);
for (size_t i = 0; i < params->params.keys_size; ++i)
{
res_key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
}
for (size_t i = 0; i < params->params.aggregates_size; ++i)
{
res_aggregate_columns[i] = res_header.safeGetByPosition(i + params->params.keys_size).type->createColumn();
}
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns);
params->aggregator.addArenasToAggregateColumns(variants, res_aggregate_columns);
++cur_block_size;
}
ssize_t mid = 0;
ssize_t high = 0;
ssize_t low = -1;
/// Will split block into segments with the same key
while (key_end != rows)
{
high = rows;
/// Find the first position of new (not current) key in current chunk
while (high - low > 1)
{
mid = (low + high) / 2;
if (!less(res_key_columns, key_columns, cur_block_size - 1, mid, group_by_description))
low = mid;
else
high = mid;
}
key_end = high;
auto indices = ext::range(key_begin, rows);
auto it = std::upper_bound(indices.begin(), indices.end(), cur_block_size - 1,
[&](size_t lhs_row, size_t rhs_row)
{
return less(res_key_columns, key_columns, lhs_row, rhs_row, group_by_description);
});
key_end = (it == indices.end() ? rows : *it);
/// Add data to aggr. state if interval is not empty. Empty when haven't found current key in new block.
if (key_begin != key_end)
{
params->aggregator.executeOnIntervalWithoutKeyImpl(variants.without_key, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
}
low = key_begin = key_end;
/// We finalize last key aggregation state if a new key found.
if (key_begin != rows)
if (key_end != rows)
{
params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
params->aggregator.addSingleKeyToAggregateColumns(variants, res_aggregate_columns);
/// If res_block_size is reached we have to stop consuming and generate the block. Save the extra rows into new chunk.
if (cur_block_size == res_block_size)
{
Columns source_columns = chunk.detachColumns();
for (auto & source_column : source_columns)
source_column = source_column->cut(key_begin, rows - key_begin);
source_column = source_column->cut(key_end, rows - key_end);
current_chunk = Chunk(source_columns, rows - key_begin);
current_chunk = Chunk(source_columns, rows - key_end);
src_rows -= current_chunk.getNumRows();
block_end_reached = true;
need_generate = true;
cur_block_size = 0;
variants.without_key = nullptr;
/// Arenas cannot be destroyed here, since later, in FinalizingSimpleTransform
/// there will be finalizeChunk(), but even after
/// finalizeChunk() we cannot destroy arena, since some memory
......@@ -155,10 +154,13 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
}
/// We create a new state for the new key and update res_key_columns
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_begin, res_key_columns);
params->aggregator.createStatesAndFillKeyColumnsWithSingleKey(variants, key_columns, key_end, res_key_columns);
++cur_block_size;
}
key_begin = key_end;
}
block_end_reached = false;
}
......@@ -234,7 +236,10 @@ IProcessor::Status AggregatingInOrderTransform::prepare()
void AggregatingInOrderTransform::generate()
{
if (cur_block_size && is_consume_finished)
params->aggregator.fillAggregateColumnsWithSingleKey(variants, res_aggregate_columns);
{
params->aggregator.addSingleKeyToAggregateColumns(variants, res_aggregate_columns);
variants.without_key = nullptr;
}
Block res = res_header.cloneEmpty();
......
94950
84950
74950
64950
54950
=======
94950
84950
74950
64950
54950
DROP TABLE IF EXISTS group_by_pk;
CREATE TABLE group_by_pk (k UInt64, v UInt64)
ENGINE = MergeTree ORDER BY k PARTITION BY v % 50;
INSERT INTO group_by_pk SELECT number / 100, number FROM numbers(1000);
SELECT sum(v) AS s FROM group_by_pk GROUP BY k ORDER BY s DESC LIMIT 5
SETTINGS optimize_aggregation_in_order = 1, max_block_size = 1;
SELECT '=======';
SELECT sum(v) AS s FROM group_by_pk GROUP BY k ORDER BY s DESC LIMIT 5
SETTINGS optimize_aggregation_in_order = 0, max_block_size = 1;
DROP TABLE IF EXISTS group_by_pk;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册