提交 c2929666 编写于 作者: A Alexey Milovidov

Fixed error [#METR-21877].

上级 77f178e0
......@@ -268,6 +268,7 @@ add_library (dbms
include/DB/DataStreams/verbosePrintString.h
include/DB/DataStreams/SquashingTransform.h
include/DB/DataStreams/SquashingBlockInputStream.h
include/DB/DataStreams/SquashingBlockOutputStream.h
include/DB/DataTypes/IDataType.h
include/DB/DataTypes/IDataTypeDummy.h
include/DB/DataTypes/DataTypeSet.h
......@@ -748,6 +749,7 @@ add_library (dbms
src/DataStreams/verbosePrintString.cpp
src/DataStreams/SquashingTransform.cpp
src/DataStreams/SquashingBlockInputStream.cpp
src/DataStreams/SquashingBlockOutputStream.cpp
src/DataTypes/DataTypeString.cpp
src/DataTypes/DataTypeFixedString.cpp
......
......@@ -7,22 +7,11 @@
namespace DB
{
/** Merging consequtive blocks of stream to specified minimum size.
*
* (But if one of input blocks has already at least specified size,
* then don't merge it with neighbours, even if neighbours are small.)
*
* Used to prepare blocks to adequate size for INSERT queries,
* because such storages as Memory, StripeLog, Log, TinyLog...
* store or compress data in blocks exactly as passed to it,
* and blocks of small size are not efficient.
*
* Order of data is kept.
/** Merging consecutive blocks of stream to specified minimum size.
*/
class SquashingBlockInputStream : public IProfilingBlockInputStream
{
public:
/// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored.
SquashingBlockInputStream(BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes);
String getName() const override { return "Squashing"; }
......@@ -39,6 +28,7 @@ protected:
private:
SquashingTransform transform;
bool all_read = false;
};
}
#pragma once
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/DataStreams/SquashingTransform.h>
namespace DB
{
/** Merging consecutive blocks of stream to specified minimum size.
*/
class SquashingBlockOutputStream : public IBlockOutputStream
{
public:
SquashingBlockOutputStream(BlockOutputStreamPtr & dst, size_t min_block_size_rows, size_t min_block_size_bytes);
void write(const Block & block) override;
void flush() override;
void writePrefix() override;
void writeSuffix() override;
private:
BlockOutputStreamPtr output;
SquashingTransform transform;
bool all_written = false;
void finalize();
};
}
......@@ -4,11 +4,26 @@
namespace DB
{
/** Merging consecutive passed blocks to specified minimum size.
*
* (But if one of input blocks has already at least specified size,
* then don't merge it with neighbours, even if neighbours are small.)
*
* Used to prepare blocks to adequate size for INSERT queries,
* because such storages as Memory, StripeLog, Log, TinyLog...
* store or compress data in blocks exactly as passed to it,
* and blocks of small size are not efficient.
*
* Order of data is kept.
*/
class SquashingTransform
{
public:
/// Conditions on rows and bytes are OR-ed. If one of them is zero, then corresponding condition is ignored.
SquashingTransform(size_t min_block_size_rows, size_t min_block_size_bytes);
/// When not ready, you need to pass more blocks to add function.
struct Result
{
bool ready = false;
......@@ -18,6 +33,9 @@ public:
Result(Block && block_) : ready(true), block(std::move(block_)) {}
};
/** Add next block and possibly returns squashed block.
* At end, you need to pass empty block. As the result for last (empty) block, you will get last Result with ready = true.
*/
Result add(Block && block);
private:
......@@ -25,7 +43,6 @@ private:
size_t min_block_size_bytes;
Block accumulated_block;
bool all_read = false;
void append(Block && block);
......
......@@ -13,9 +13,16 @@ SquashingBlockInputStream::SquashingBlockInputStream(BlockInputStreamPtr & src,
Block SquashingBlockInputStream::readImpl()
{
if (all_read)
return {};
while (true)
{
SquashingTransform::Result result = transform.add(children[0]->read());
Block block = children[0]->read();
if (!block)
all_read = true;
SquashingTransform::Result result = transform.add(std::move(block));
if (result.ready)
return result.block;
}
......
#include <DB/DataStreams/SquashingBlockOutputStream.h>
namespace DB
{
SquashingBlockOutputStream::SquashingBlockOutputStream(BlockOutputStreamPtr & dst, size_t min_block_size_rows, size_t min_block_size_bytes)
: output(dst), transform(min_block_size_rows, min_block_size_bytes)
{
}
void SquashingBlockOutputStream::write(const Block & block)
{
SquashingTransform::Result result = transform.add(Block(block));
if (result.ready)
output->write(result.block);
}
void SquashingBlockOutputStream::finalize()
{
if (all_written)
return;
all_written = true;
SquashingTransform::Result result = transform.add({});
if (result.ready && result.block)
output->write(result.block);
}
void SquashingBlockOutputStream::flush()
{
finalize();
output->flush();
}
void SquashingBlockOutputStream::writePrefix()
{
output->writePrefix();
}
void SquashingBlockOutputStream::writeSuffix()
{
finalize();
output->writeSuffix();
}
}
......@@ -12,14 +12,8 @@ SquashingTransform::SquashingTransform(size_t min_block_size_rows, size_t min_bl
SquashingTransform::Result SquashingTransform::add(Block && block)
{
if (all_read)
return true;
if (!block)
{
all_read = true;
return Result(std::move(accumulated_block));
}
/// Just read block is alredy enough.
if (isEnoughSize(block.rowsInFirstColumn(), block.bytes()))
......
......@@ -5,7 +5,7 @@
#include <DB/DataStreams/AddingDefaultBlockOutputStream.h>
#include <DB/DataStreams/PushingToViewsBlockOutputStream.h>
#include <DB/DataStreams/NullAndDoCopyBlockInputStream.h>
#include <DB/DataStreams/SquashingBlockInputStream.h>
#include <DB/DataStreams/SquashingBlockOutputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/Parsers/ASTInsertQuery.h>
......@@ -79,14 +79,21 @@ BlockIO InterpreterInsertQuery::execute()
NamesAndTypesListPtr required_columns = std::make_shared<NamesAndTypesList>(table->getColumnsList());
/// Создаем кортеж из нескольких стримов, в которые будем писать данные.
BlockOutputStreamPtr out =
std::make_shared<ProhibitColumnsBlockOutputStream>(
std::make_shared<AddingDefaultBlockOutputStream>(
std::make_shared<MaterializingBlockOutputStream>(
std::make_shared<PushingToViewsBlockOutputStream>(query.database, query.table, context, query_ptr)),
required_columns, table->column_defaults, context, static_cast<bool>(context.getSettingsRef().strict_insert_defaults)),
table->materialized_columns);
/// Создаем конвейер из нескольких стримов, в которые будем писать данные.
BlockOutputStreamPtr out;
out = std::make_shared<PushingToViewsBlockOutputStream>(query.database, query.table, context, query_ptr);
out = std::make_shared<MaterializingBlockOutputStream>(out);
out = std::make_shared<AddingDefaultBlockOutputStream>(out,
required_columns, table->column_defaults, context, static_cast<bool>(context.getSettingsRef().strict_insert_defaults));
out = std::make_shared<ProhibitColumnsBlockOutputStream>(out, table->materialized_columns);
out = std::make_shared<SquashingBlockOutputStream>(out,
context.getSettingsRef().min_insert_block_size_rows,
context.getSettingsRef().min_insert_block_size_bytes);
BlockIO res;
res.out_sample = getSampleBlock();
......@@ -101,10 +108,6 @@ BlockIO InterpreterInsertQuery::execute()
InterpreterSelectQuery interpreter_select{query.select, context};
BlockInputStreamPtr in = interpreter_select.execute().in;
in = std::make_shared<SquashingBlockInputStream>(in,
context.getSettingsRef().min_insert_block_size_rows,
context.getSettingsRef().min_insert_block_size_bytes);
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(in, out);
res.in_sample = interpreter_select.getSampleBlock();
}
......
......@@ -18,7 +18,6 @@
#include <DB/DataStreams/AsynchronousBlockInputStream.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/DataStreams/SquashingBlockInputStream.h>
#include <DB/Interpreters/executeQuery.h>
#include <DB/Interpreters/Quota.h>
......
......@@ -18,26 +18,26 @@ namespace DB
{
/// Состояние обработки запроса.
/// State of query processing.
struct QueryState
{
/// Идентификатор запроса.
/// Identifier of the query.
String query_id;
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
Protocol::Compression::Enum compression = Protocol::Compression::Disable;
/// Откуда читать данные для INSERT-а.
/// From where to read data for INSERT.
std::shared_ptr<ReadBuffer> maybe_compressed_in;
BlockInputStreamPtr block_in;
/// Куда писать возвращаемые данные.
/// Where to write result data.
std::shared_ptr<WriteBuffer> maybe_compressed_out;
BlockOutputStreamPtr block_out;
/// Текст запроса.
/// Query text.
String query;
/// Потоки блоков, с помощью которых выполнять запрос.
/// Streams of blocks, that are processing the query.
BlockIO io;
/// Отменен ли запрос
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册