提交 27c475f7 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

1. Make storage and cluster using same name space. (#2014)

2. Zookeeper implementation of the cluster module using name space to support using same zookeeper cluster by different OAP clusters.
上级 3f2fb8db
......@@ -36,7 +36,7 @@ Setting fragment example
```yaml
storage:
elasticsearch:
# nameSpace: ${SW_STORAGE_ES_NAMESPACE:""}
# nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import lombok.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.util.StringUtils;
......@@ -25,6 +26,8 @@ import org.apache.skywalking.oap.server.library.util.StringUtils;
* @author peng-yongsheng
*/
class ClusterModuleZookeeperConfig extends ModuleConfig {
@Setter @Getter private String nameSpace;
private String hostPort;
private int baseSleepTimeMs;
private int maxRetries;
......
......@@ -19,21 +19,13 @@
package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.curator.x.discovery.*;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.slf4j.*;
/**
* Use Zookeeper to manage all instances in SkyWalking cluster.
......@@ -71,8 +63,10 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(config.getBaseSleepTimeMs(), config.getMaxRetries());
client = CuratorFrameworkFactory.newClient(config.getHostPort(), retryPolicy);
String path = BASE_PATH + (StringUtil.isEmpty(config.getNameSpace()) ? "" : "/" + config.getNameSpace());
serviceDiscovery = ServiceDiscoveryBuilder.builder(RemoteInstance.class).client(client)
.basePath(BASE_PATH)
.basePath(path)
.watchInstances(true)
.serializer(new SWInstanceSerializer()).build();
......
......@@ -27,6 +27,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
*/
@Getter
public class CoreModuleConfig extends ModuleConfig {
@Setter private String nameSpace;
@Setter private String restHost;
@Setter private int restPort;
@Setter private String restContextPath;
......
......@@ -22,60 +22,26 @@ import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
import org.apache.skywalking.oap.server.core.query.MetricQueryService;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.*;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.*;
import org.apache.skywalking.oap.server.core.remote.annotation.*;
import org.apache.skywalking.oap.server.core.remote.client.*;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -101,7 +67,7 @@ public class CoreModuleProvider extends ModuleProvider {
this.storageAnnotationListener = new StorageAnnotationListener();
this.streamAnnotationListener = new StreamAnnotationListener();
this.streamDataAnnotationContainer = new StreamDataAnnotationContainer();
receiver = new SourceReceiverImpl();
this.receiver = new SourceReceiverImpl();
}
@Override public String name() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.client;
/**
* @author peng-yongsheng
*/
public class NameSpace {
private String nameSpace = "";
public String getNameSpace() {
return nameSpace;
}
public void setNameSpace(String nameSpace) {
this.nameSpace = nameSpace;
}
}
......@@ -25,7 +25,6 @@ import org.apache.http.*;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.*;
import org.elasticsearch.action.admin.indices.create.*;
import org.elasticsearch.action.admin.indices.delete.*;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
......@@ -51,10 +50,10 @@ public class ElasticSearchClient implements Client {
private static final String TYPE = "type";
private final String clusterNodes;
private final NameSpace namespace;
private final String namespace;
private RestHighLevelClient client;
public ElasticSearchClient(String clusterNodes, NameSpace namespace) {
public ElasticSearchClient(String clusterNodes, String namespace) {
this.clusterNodes = clusterNodes;
this.namespace = namespace;
}
......@@ -182,8 +181,8 @@ public class ElasticSearchClient implements Client {
}
private String formatIndexName(String indexName) {
if (Objects.nonNull(namespace) && StringUtils.isNotEmpty(namespace.getNameSpace())) {
return namespace.getNameSpace() + "_" + indexName;
if (StringUtils.isNotEmpty(namespace)) {
return namespace + "_" + indexName;
}
return indexName;
}
......
......@@ -51,7 +51,7 @@ storage:
url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db}
user: ${SW_STORAGE_H2_USER:sa}
# elasticsearch:
# # nameSpace: ${SW_STORAGE_ES_NAMESPACE:""}
# # nameSpace: ${SW_NAMESPACE:""}
# clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
# indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
......
......@@ -16,9 +16,10 @@
cluster:
standalone:
# Please check your ZooKeeper is 3.5+, However, it is also compatible with ZooKeeper 3.4.x. Replace the ZooKeeper 3.5+
# library the oap-libs folder with your ZooKeeper 3.4.x library.
# Please check your ZooKeeper is 3.5+, However, it is also compatible with ZooKeeper 3.4.x. Replace the ZooKeeper 3.5+
# library the oap-libs folder with your ZooKeeper 3.4.x library.
# zookeeper:
# nameSpace: ${SW_NAMESPACE:""}
# hostPort: ${SW_CLUSTER_ZK_HOST_PORT:localhost:2181}
# #Retry Policy
# baseSleepTimeMs: ${SW_CLUSTER_ZK_SLEEP_TIME:1000} # initial amount of time to wait between retries
......@@ -47,8 +48,8 @@ core:
monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18} # Unit is month
storage:
elasticsearch:
# nameSpace: ${SW_STORAGE_ES_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:10.67.27.213:9200}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
......
......@@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.client.NameSpace;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
......@@ -38,13 +37,11 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class);
private final StorageModuleElasticsearchConfig config;
private final NameSpace nameSpace;
private ElasticSearchClient elasticSearchClient;
public StorageModuleElasticsearchProvider() {
super();
this.config = new StorageModuleElasticsearchConfig();
this.nameSpace = new NameSpace();
}
@Override
......@@ -64,7 +61,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException {
elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), nameSpace);
elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getNameSpace());
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
......@@ -87,7 +84,6 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
@Override
public void start() throws ModuleStartException {
try {
nameSpace.setNameSpace(config.getNameSpace());
elasticSearchClient.connect();
StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册