未验证 提交 30ece2c2 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #22510 from vdimir/merge-join-lc-bug-fix

Convert right block to full in MergeJoin
...@@ -76,6 +76,7 @@ int nullableCompareAt(const IColumn & left_column, const IColumn & right_column, ...@@ -76,6 +76,7 @@ int nullableCompareAt(const IColumn & left_column, const IColumn & right_column,
return left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint); return left_column.compareAt(lhs_pos, rhs_pos, right_column, null_direction_hint);
} }
/// Get first and last row from sorted block
Block extractMinMax(const Block & block, const Block & keys) Block extractMinMax(const Block & block, const Block & keys)
{ {
if (block.rows() == 0) if (block.rows() == 0)
...@@ -86,7 +87,7 @@ Block extractMinMax(const Block & block, const Block & keys) ...@@ -86,7 +87,7 @@ Block extractMinMax(const Block & block, const Block & keys)
for (size_t i = 0; i < columns.size(); ++i) for (size_t i = 0; i < columns.size(); ++i)
{ {
const auto & src_column = block.getByName(keys.getByPosition(i).name); const auto & src_column = block.getByName(min_max.getByPosition(i).name);
columns[i]->insertFrom(*src_column.column, 0); columns[i]->insertFrom(*src_column.column, 0);
columns[i]->insertFrom(*src_column.column, block.rows() - 1); columns[i]->insertFrom(*src_column.column, block.rows() - 1);
...@@ -465,6 +466,7 @@ MergeJoin::MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right ...@@ -465,6 +466,7 @@ MergeJoin::MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right
table_join->splitAdditionalColumns(right_sample_block, right_table_keys, right_columns_to_add); table_join->splitAdditionalColumns(right_sample_block, right_table_keys, right_columns_to_add);
JoinCommon::removeLowCardinalityInplace(right_table_keys); JoinCommon::removeLowCardinalityInplace(right_table_keys);
JoinCommon::removeLowCardinalityInplace(right_sample_block, table_join->keyNamesRight());
const NameSet required_right_keys = table_join->requiredRightKeys(); const NameSet required_right_keys = table_join->requiredRightKeys();
for (const auto & column : right_table_keys) for (const auto & column : right_table_keys)
...@@ -485,6 +487,7 @@ MergeJoin::MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right ...@@ -485,6 +487,7 @@ MergeJoin::MergeJoin(std::shared_ptr<TableJoin> table_join_, const Block & right
left_blocks_buffer = std::make_shared<SortedBlocksBuffer>(left_sort_description, max_bytes); left_blocks_buffer = std::make_shared<SortedBlocksBuffer>(left_sort_description, max_bytes);
} }
/// Has to be called even if totals are empty
void MergeJoin::setTotals(const Block & totals_block) void MergeJoin::setTotals(const Block & totals_block)
{ {
totals = totals_block; totals = totals_block;
......
...@@ -76,12 +76,15 @@ private: ...@@ -76,12 +76,15 @@ private:
Block right_table_keys; Block right_table_keys;
Block right_columns_to_add; Block right_columns_to_add;
SortedBlocksWriter::Blocks right_blocks; SortedBlocksWriter::Blocks right_blocks;
/// Each block stores first and last row from corresponding sorted block on disk
Blocks min_max_right_blocks; Blocks min_max_right_blocks;
std::shared_ptr<SortedBlocksBuffer> left_blocks_buffer; std::shared_ptr<SortedBlocksBuffer> left_blocks_buffer;
std::shared_ptr<RowBitmaps> used_rows_bitmap; std::shared_ptr<RowBitmaps> used_rows_bitmap;
mutable std::unique_ptr<Cache> cached_right_blocks; mutable std::unique_ptr<Cache> cached_right_blocks;
std::vector<std::shared_ptr<Block>> loaded_right_blocks; std::vector<std::shared_ptr<Block>> loaded_right_blocks;
std::unique_ptr<SortedBlocksWriter> disk_writer; std::unique_ptr<SortedBlocksWriter> disk_writer;
/// Set of files with sorted blocks
SortedBlocksWriter::SortedFiles flushed_right_blocks; SortedBlocksWriter::SortedFiles flushed_right_blocks;
Block totals; Block totals;
std::atomic<bool> is_in_memory{true}; std::atomic<bool> is_in_memory{true};
......
SET join_algorithm = 'partial_merge';
SET max_bytes_in_join = '100';
CREATE TABLE foo_lc (n LowCardinality(String)) ENGINE = Memory;
CREATE TABLE foo (n String) ENGINE = Memory;
INSERT INTO foo SELECT toString(number) AS n FROM system.numbers LIMIT 1025;
INSERT INTO foo_lc SELECT toString(number) AS n FROM system.numbers LIMIT 1025;
SELECT 1025 == count(n) FROM foo_lc AS t1 ANY LEFT JOIN foo_lc AS t2 ON t1.n == t2.n;
SELECT 1025 == count(n) FROM foo AS t1 ANY LEFT JOIN foo_lc AS t2 ON t1.n == t2.n;
SELECT 1025 == count(n) FROM foo_lc AS t1 ANY LEFT JOIN foo AS t2 ON t1.n == t2.n;
SELECT 1025 == count(n) FROM foo_lc AS t1 ALL LEFT JOIN foo_lc AS t2 ON t1.n == t2.n;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册