提交 4c9fb563 编写于 作者: A Alexey Milovidov

Return single row when aggregate without key and no data [#METR-22072].

上级 e387d942
......@@ -767,39 +767,43 @@ using ManyAggregatedDataVariants = std::vector<AggregatedDataVariantsPtr>;
*/
/** Агрегирует источник блоков.
/** Aggregates stream of blocks.
*/
class Aggregator
{
public:
struct Params
{
/// Что считать.
/// What to calculate.
Names key_names;
ColumnNumbers keys; /// Номера столбцов - вычисляются позже.
ColumnNumbers keys; /// Column numbers calculated later.
AggregateDescriptions aggregates;
size_t keys_size;
size_t aggregates_size;
/// Настройки приближённого вычисления GROUP BY.
const bool overflow_row; /// Нужно ли класть в AggregatedDataVariants::without_key агрегаты для ключей, не попавших в max_rows_to_group_by.
/// Settings of approximate calculation of GROUP BY.
/// Should accumulate aggregates for keys that wasn't in first 'max_rows_to_group_by' into AggregatedDataVariants::without_key.
const bool overflow_row;
const size_t max_rows_to_group_by;
const OverflowMode group_by_overflow_mode;
/// Для динамической компиляции.
/// Should return empty result instead single row for queries like SELECT count() FROM empty_table.
bool empty_result_for_empty_data = false;
/// For runtime compilation.
Compiler * compiler;
const UInt32 min_count_to_compile;
/// Настройки двухуровневой агрегации (используется для большого количества ключей).
/** При каком количестве ключей или размере состояния агрегации в байтах,
* начинает использоваться двухуровневая агрегация. Достаточно срабатывания хотя бы одного из порогов.
* 0 - соответствующий порог не задан.
/// Settings for two-level aggregation (used in case of large amount of values of keys).
/** At what number of keys OR size of aggregation state in bytes,
* start to use two-level aggregation.
* 0 means threshold is not set.
*/
const size_t group_by_two_level_threshold;
const size_t group_by_two_level_threshold_bytes;
/// Настройки для сброса временных данных в файловую систему (внешняя агрегация).
const size_t max_bytes_before_external_group_by; /// 0 - не использовать внешнюю агрегацию.
/// Settings for storing temporary data in filesystem (aggregation in external memory).
const size_t max_bytes_before_external_group_by; /// 0 - don't use aggregation in external memory.
const std::string tmp_path;
Params(
......@@ -807,9 +811,11 @@ public:
bool overflow_row_, size_t max_rows_to_group_by_, OverflowMode group_by_overflow_mode_,
Compiler * compiler_, UInt32 min_count_to_compile_,
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_, const std::string & tmp_path_)
size_t max_bytes_before_external_group_by_, const std::string & tmp_path_,
bool empty_result_for_empty_data_)
: key_names(key_names_), aggregates(aggregates_), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
empty_result_for_empty_data(empty_result_for_empty_data_),
compiler(compiler_), min_count_to_compile(min_count_to_compile_),
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
max_bytes_before_external_group_by(max_bytes_before_external_group_by_), tmp_path(tmp_path_)
......@@ -819,11 +825,11 @@ public:
keys_size = key_names.size();
}
/// Только параметры, имеющие значение при мердже.
Params(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_)
: Params(key_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, "") {}
/// Only params meaningful for merging states.
Params(const Names & key_names_, const AggregateDescriptions & aggregates_, bool overflow_row_, bool empty_result_for_empty_data_)
: Params(key_names_, aggregates_, overflow_row_, 0, OverflowMode::THROW, nullptr, 0, 0, 0, 0, "", empty_result_for_empty_data_) {}
/// Вычислить номера столбцов в keys и aggregates.
/// Calculate column numbers from its names into 'keys' and 'aggregates'.
void calculateColumnNumbers(const Block & block);
};
......
......@@ -217,6 +217,9 @@ struct Settings
\
/** What aggregate function to use for implementation of count(DISTINCT ...) */ \
M(SettingString, count_distinct_implementation, "uniq") \
\
/** When aggregating without keys (without GROUP BY), return empty result for empty data instead of single row with default values. */ \
M(SettingBool, return_empty_result_when_aggregating_empty_data_without_keys, 0) \
/// Всевозможные ограничения на выполнение запроса.
Limits limits;
......
......@@ -57,10 +57,13 @@ Block AggregatingBlockInputStream::readImpl()
}
Block res;
std::cerr << (isCancelled() || !impl) << ", " << impl->getName() << "\n";
if (isCancelled() || !impl)
return res;
return impl->read();
res = impl->read();
std::cerr << res.dumpStructure() << "\n";
return res;
}
......
......@@ -89,7 +89,7 @@ int main(int argc, char ** argv)
sample.insert(std::move(col));
}
Aggregator::Params params(key_column_names, aggregate_descriptions, false);
Aggregator::Params params(key_column_names, aggregate_descriptions, false, false);
BlockInputStreamPtr stream = std::make_shared<OneBlockInputStream>(block);
stream = std::make_shared<AggregatingBlockInputStream>(stream, params, true);
......
......@@ -14,6 +14,7 @@
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/DataStreams/NullBlockInputStream.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
......@@ -1248,6 +1249,21 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
}
static Block createResultWithDefaultValues(const Block & sample)
{
Block res = sample.cloneEmpty();
std::cerr << res.dumpStructure() << ", " << sample.dumpStructure() << "\n";
/// Insert default value for each column.
size_t columns = res.columns();
for (size_t i = 0; i < columns; ++i)
res.unsafeGetByPosition(i).column->insertDefault();
return res;
}
BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, bool final, size_t max_threads) const
{
if (isCancelled())
......@@ -1261,7 +1277,16 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
/// В какой структуре данных агрегированы данные?
if (data_variants.empty())
{
/// If aggregate without keys, in case of empty data, we must return result with one row contains default states of aggregate functions.
/// For example, "SELECT count() FROM empty_table" should return one row with 0 value.
if (!params.empty_result_for_empty_data && params.keys_size == 0)
{
blocks.push_back(createResultWithDefaultValues(sample));
}
return blocks;
}
std::unique_ptr<ThreadPool> thread_pool;
if (max_threads > 1 && data_variants.sizeWithoutOverflowRow() > 100000 /// TODO Сделать настраиваемый порог.
......@@ -1684,7 +1709,18 @@ std::unique_ptr<IBlockInputStream> Aggregator::mergeAndConvertToBlocks(
non_empty_data.push_back(data);
if (non_empty_data.empty())
return std::make_unique<NullBlockInputStream>();
{
std::cerr << params.empty_result_for_empty_data << ", " << params.keys_size << "\n";
/// If aggregate without keys, in case of empty data, we must return result with one row contains default states of aggregate functions.
/// For example, "SELECT count() FROM empty_table" should return one row with 0 value.
if (!params.empty_result_for_empty_data && params.keys_size == 0)
{
return std::make_unique<OneBlockInputStream>(createResultWithDefaultValues(sample));
}
else
return std::make_unique<NullBlockInputStream>();
}
if (non_empty_data.size() > 1)
{
......
......@@ -892,7 +892,8 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression,
settings.compile ? &context.getCompiler() : nullptr, settings.min_count_to_compile,
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.limits.max_bytes_before_external_group_by, context.getTemporaryPath());
settings.limits.max_bytes_before_external_group_by, context.getTemporaryPath(),
settings.return_empty_result_when_aggregating_empty_data_without_keys);
/// Если источников несколько, то выполняем параллельную агрегацию
if (streams.size() > 1)
......@@ -946,7 +947,7 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina
* но при этом может работать медленнее.
*/
Aggregator::Params params(key_names, aggregates, overflow_row);
Aggregator::Params params(key_names, aggregates, overflow_row, settings.return_empty_result_when_aggregating_empty_data_without_keys);
if (!settings.distributed_aggregation_memory_efficient)
{
......
......@@ -73,7 +73,7 @@ int main(int argc, char ** argv)
DataTypes empty_list_of_types;
aggregate_descriptions[0].function = factory.get("count", empty_list_of_types);
Aggregator::Params params(key_column_names, aggregate_descriptions, false);
Aggregator::Params params(key_column_names, aggregate_descriptions, false, false);
Aggregator aggregator(params);
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册