未验证 提交 b7b55ce6 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #20088 from nvartolomei/nv/replicated-fetches-timeouts

Replicated fetches timeouts
......@@ -83,6 +83,9 @@ struct Settings;
M(UInt64, replicated_max_parallel_fetches_for_host, DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT, "Limit parallel fetches from endpoint (actually pool size).", 0) \
M(UInt64, replicated_max_parallel_sends, 0, "Limit parallel sends.", 0) \
M(UInt64, replicated_max_parallel_sends_for_table, 0, "Limit parallel sends for one table.", 0) \
M(Seconds, replicated_fetches_http_connection_timeout, 0, "HTTP connection timeout for part fetch requests. Inherited from default profile `http_connection_timeout` if not set explicitly.", 0) \
M(Seconds, replicated_fetches_http_send_timeout, 0, "HTTP send timeout for part fetch requests. Inherited from default profile `http_send_timeout` if not set explicitly.", 0) \
M(Seconds, replicated_fetches_http_receive_timeout, 0, "HTTP receive timeout for fetch part requests. Inherited from default profile `http_receive_timeout` if not set explicitly.", 0) \
M(Bool, replicated_can_become_leader, true, "If true, Replicated tables replicas on this node will try to acquire leadership.", 0) \
M(Seconds, zookeeper_session_expiration_check_period, 60, "ZooKeeper session expiration check period, in seconds.", 0) \
M(Bool, detach_old_local_parts_when_cloning_replica, 1, "Do not remove old local parts when repairing lost replica.", 0) \
......
......@@ -2313,7 +2313,8 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
{
String source_replica_path = zookeeper_path + "/replicas/" + part_desc->replica;
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto timeouts = getFetchPartHTTPTimeouts(global_context);
auto [user, password] = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
......@@ -3246,6 +3247,23 @@ void StorageReplicatedMergeTree::exitLeaderElection()
leader_election = nullptr;
}
ConnectionTimeouts StorageReplicatedMergeTree::getFetchPartHTTPTimeouts(const Context & context)
{
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context);
auto settings = getSettings();
if (settings->replicated_fetches_http_connection_timeout.changed)
timeouts.connection_timeout = settings->replicated_fetches_http_connection_timeout;
if (settings->replicated_fetches_http_send_timeout.changed)
timeouts.send_timeout = settings->replicated_fetches_http_send_timeout;
if (settings->replicated_fetches_http_receive_timeout.changed)
timeouts.receive_timeout = settings->replicated_fetches_http_receive_timeout;
return timeouts;
}
bool StorageReplicatedMergeTree::checkReplicaHavePart(const String & replica, const String & part_name)
{
auto zookeeper = getZooKeeper();
......@@ -3661,7 +3679,8 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
else
{
address.fromString(zookeeper->get(source_replica_path + "/host"));
timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
timeouts = getFetchPartHTTPTimeouts(global_context);
user_password = global_context.getInterserverCredentials();
interserver_scheme = global_context.getInterserverScheme();
......
......@@ -507,6 +507,8 @@ private:
/// Exchange parts.
ConnectionTimeouts getFetchPartHTTPTimeouts(const Context & context);
/** Returns an empty string if no one has a part.
*/
String findReplicaHavingPart(const String & part_name, bool active);
......
<yandex>
<background_processing_pool_task_sleep_seconds_when_no_work_max>0.1</background_processing_pool_task_sleep_seconds_when_no_work_max>
</yandex>
#!/usr/bin/env python3
import random
import string
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
'node1', with_zookeeper=True,
main_configs=['configs/server.xml'])
node2 = cluster.add_instance(
'node2', with_zookeeper=True,
main_configs=['configs/server.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_random_string(length):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
def test_no_stall(started_cluster):
for instance in started_cluster.instances.values():
instance.query("""
CREATE TABLE t (key UInt64, data String)
ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '{instance}')
ORDER BY tuple()
PARTITION BY key""")
# Pause node3 until the test setup is prepared
node2.query("SYSTEM STOP FETCHES t")
node1.query("INSERT INTO t SELECT 1, '{}' FROM numbers(500)".format(get_random_string(104857)))
node1.query("INSERT INTO t SELECT 2, '{}' FROM numbers(500)".format(get_random_string(104857)))
with PartitionManager() as pm:
pm.add_network_delay(node1, 2000)
node2.query("SYSTEM START FETCHES t")
# Wait for timeout exceptions to confirm that timeout is triggered.
while True:
conn_timeout_exceptions = int(node2.query(
"""
SELECT count()
FROM system.replication_queue
WHERE last_exception LIKE '%connect timed out%'
"""))
if conn_timeout_exceptions >= 2:
break
time.sleep(0.1)
print("Connection timeouts tested!")
# Increase connection timeout and wait for receive timeouts.
node2.query("""
ALTER TABLE t
MODIFY SETTING replicated_fetches_http_connection_timeout = 30,
replicated_fetches_http_receive_timeout = 1""")
while True:
timeout_exceptions = int(node2.query(
"""
SELECT count()
FROM system.replication_queue
WHERE last_exception LIKE '%e.displayText() = Timeout%'
AND last_exception NOT LIKE '%connect timed out%'
""").strip())
if timeout_exceptions >= 2:
break
time.sleep(0.1)
for instance in started_cluster.instances.values():
# Workaround for DROP TABLE not finishing if it is started while table is readonly.
instance.query("SYSTEM RESTART REPLICA t")
# Cleanup data directory from test results archive.
instance.query("DROP TABLE t SYNC")
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册