未验证 提交 536b7d23 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Support connectTimeout and socketTimeout settings (#7214)

上级 0975809e
......@@ -61,6 +61,7 @@ Release Notes.
* Performance: enhance persistent session mechanism, about differentiating cache timeout for different dimensionality
metrics. The timeout of the cache for minute and hour level metrics has been prolonged to ~5 min.
* Performance: Add L1 aggregation flush period, which reduce the CPU load and help young GC.
* Support connectTimeout and socketTimeout settings for ElasticSearch6 and ElasticSearch7 storages.
#### UI
......
......@@ -81,6 +81,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | nameSpace | Prefix of indexes created and used by SkyWalking. | SW_NAMESPACE | - |
| - | - | clusterNodes | ElasticSearch cluster nodes for client connection.| SW_STORAGE_ES_CLUSTER_NODES |localhost|
| - | - | protocol | HTTP or HTTPs. | SW_STORAGE_ES_HTTP_PROTOCOL | HTTP|
| - | - | connectTimeout | Connect timeout of ElasticSearch client. Unit is ms. | SW_STORAGE_ES_CONNECT_TIMEOUT | 500|
| - | - | socketTimeout | Socket timeout of ElasticSearch client. Unit is ms. | SW_STORAGE_ES_SOCKET_TIMEOUT | 30000|
| - | - | user| User name of ElasticSearch cluster| SW_ES_USER | - |
| - | - | password | Password of ElasticSearch cluster | SW_ES_PASSWORD | - |
| - | - | trustStorePath | Trust JKS file path. Only work when user name and password opened | SW_STORAGE_ES_SSL_JKS_PATH | - |
......@@ -104,6 +106,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | nameSpace | Prefix of indexes created and used by SkyWalking. | SW_NAMESPACE | - |
| - | - | clusterNodes | ElasticSearch cluster nodes for client connection.| SW_STORAGE_ES_CLUSTER_NODES |localhost|
| - | - | protocol | HTTP or HTTPs. | SW_STORAGE_ES_HTTP_PROTOCOL | HTTP|
| - | - | connectTimeout | Connect timeout of ElasticSearch client. Unit is ms. | SW_STORAGE_ES_CONNECT_TIMEOUT | 500|
| - | - | socketTimeout | Socket timeout of ElasticSearch client. Unit is ms. | SW_STORAGE_ES_SOCKET_TIMEOUT | 30000|
| - | - | user| User name of ElasticSearch cluster| SW_ES_USER | - |
| - | - | password | Password of ElasticSearch cluster | SW_ES_PASSWORD | - |
| - | - | trustStorePath | Trust JKS file path. Only work when user name and password opened | SW_STORAGE_ES_SSL_JKS_PATH | - |
......
......@@ -120,6 +120,8 @@ storage:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
connectTimeout: ${SW_STORAGE_ES_CONNECT_TIMEOUT:500}
socketTimeout: ${SW_STORAGE_ES_SOCKET_TIMEOUT:30000}
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""}
......@@ -146,6 +148,8 @@ storage:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
connectTimeout: ${SW_STORAGE_ES_CONNECT_TIMEOUT:500}
socketTimeout: ${SW_STORAGE_ES_SOCKET_TIMEOUT:30000}
trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""}
trustStorePass: ${SW_STORAGE_ES_SSL_JKS_PASS:""}
dayStep: ${SW_STORAGE_DAY_STEP:1} # Represent the number of days in the one minute/hour/day index.
......
......@@ -119,6 +119,8 @@ public class ElasticSearchClient implements Client, HealthCheckable {
protected volatile RestHighLevelClient client;
protected DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
protected final ReentrantLock connectLock = new ReentrantLock();
private final int connectTimeout;
private final int socketTimeout;
public ElasticSearchClient(String clusterNodes,
String protocol,
......@@ -126,7 +128,9 @@ public class ElasticSearchClient implements Client, HealthCheckable {
String trustStorePass,
String user,
String password,
List<IndexNameConverter> indexNameConverters) {
List<IndexNameConverter> indexNameConverters,
int connectTimeout,
int socketTimeout) {
this.clusterNodes = clusterNodes;
this.protocol = protocol;
this.user = user;
......@@ -134,6 +138,8 @@ public class ElasticSearchClient implements Client, HealthCheckable {
this.indexNameConverters = indexNameConverters;
this.trustStorePath = trustStorePath;
this.trustStorePass = trustStorePass;
this.connectTimeout = connectTimeout;
this.socketTimeout = socketTimeout;
}
@Override
......@@ -183,6 +189,11 @@ public class ElasticSearchClient implements Client, HealthCheckable {
} else {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
}
builder.setRequestConfigCallback(
requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(connectTimeout)
.setSocketTimeout(socketTimeout)
);
return new RestHighLevelClient(builder);
}
......
......@@ -69,7 +69,7 @@ public class ITElasticSearchClient {
final String esAddress = System.getProperty("elastic.search.address");
final String esProtocol = System.getProperty("elastic.search.protocol");
client = new ElasticSearchClient(esAddress, esProtocol, "", "", "test", "test",
indexNameConverters(namespace)
indexNameConverters(namespace), 500, 6000
);
client.connect();
}
......
......@@ -29,6 +29,18 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
private String nameSpace;
private String clusterNodes;
String protocol = "http";
/**
* Connect timeout of ElasticSearch client.
*
* @since 8.7.0
*/
private int connectTimeout = 500;
/**
* Socket timeout of ElasticSearch client.
*
* @since 8.7.0
*/
private int socketTimeout = 30000;
/**
* Since 6.4.0, the index of metrics and traces data in minute/hour/month precision are organized in days. ES
* storage creates new indexes in every day.
......
......@@ -163,11 +163,11 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
elasticSearchClient = new ElasticSearchClient(
config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
.getTrustStorePass(), config.getUser(), config.getPassword(),
indexNameConverters(config.getNameSpace())
indexNameConverters(config.getNameSpace()), config.getConnectTimeout(), config.getSocketTimeout()
);
this.registerServiceImplementation(
IBatchDAO.class,
new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
.getFlushInterval(), config.getConcurrentRequests())
);
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
......
......@@ -161,7 +161,7 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
elasticSearch7Client = new ElasticSearch7Client(
config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
.getTrustStorePass(), config.getUser(), config.getPassword(),
indexNameConverters(config.getNameSpace())
indexNameConverters(config.getNameSpace()), config.getConnectTimeout(), config.getSocketTimeout()
);
this.registerServiceImplementation(
IBatchDAO.class,
......
......@@ -78,9 +78,6 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
/**
*
*/
@Slf4j
public class ElasticSearch7Client extends ElasticSearchClient {
public ElasticSearch7Client(final String clusterNodes,
......@@ -89,10 +86,12 @@ public class ElasticSearch7Client extends ElasticSearchClient {
final String trustStorePass,
final String user,
final String password,
List<IndexNameConverter> indexNameConverters) {
List<IndexNameConverter> indexNameConverters,
int connectTimeout,
int socketTimeout) {
super(
clusterNodes, protocol, trustStorePath, trustStorePass, user, password,
indexNameConverters
indexNameConverters, connectTimeout, socketTimeout
);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册