未验证 提交 7c8a683c 编写于 作者: 彭勇升 pengys 提交者: GitHub

gRPC client usage improve (#1946)

* Close the clients which are unreachable.
Remote client manager test case and comments.

* Test the GRPCRemote client.

* 1. Catch the throwable for onComplete method cause of it will throw exception when connection lost.
2. Check the gRPC channel state, send message when state is ready, wait 5 second when state is not ready.

Notice: gRPC channel getState with true parameter will trigger reconnect operation.

* gRPC client will reconnect to the server when network recorvered.

* Recovery application.yml

* Recovery proto module commit id.

* no message

* Fixed compile error.
上级 c6ada8c9
......@@ -19,17 +19,12 @@
package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.Channel;
import java.util.List;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import java.util.List;
import org.apache.skywalking.apm.agent.core.boot.*;
import org.apache.skywalking.apm.agent.core.context.*;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.logging.api.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
......@@ -37,8 +32,7 @@ import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.apm.network.language.agent.v2.TraceSegmentReportServiceGrpc;
import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.BUFFER_SIZE;
import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SIZE;
import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.*;
import static org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
/**
......@@ -111,18 +105,18 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
}
});
for (TraceSegment segment : data) {
try {
try {
for (TraceSegment segment : data) {
UpstreamSegment upstreamSegment = segment.transform();
upstreamSegmentStreamObserver.onNext(upstreamSegment);
} catch (Throwable t) {
logger.error(t, "Transform and send UpstreamSegment to collector fail.");
}
}
upstreamSegmentStreamObserver.onCompleted();
upstreamSegmentStreamObserver.onCompleted();
status.wait4Finish();
segmentUplinkedCounter += data.size();
status.wait4Finish();
segmentUplinkedCounter += data.size();
} catch (Throwable t) {
logger.error(t, "Transform and send UpstreamSegment to collector fail.");
}
} else {
segmentAbandonedCounter += data.size();
}
......
......@@ -17,7 +17,8 @@
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm</artifactId>
<groupId>org.apache.skywalking</groupId>
......@@ -237,6 +238,24 @@
<artifactId>grpc-testing</artifactId>
<version>${grpc.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
......
......@@ -19,25 +19,14 @@
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import com.google.common.util.concurrent.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.slf4j.*;
/**
* Read collector pod info from api-server of kubernetes, then using all containerIp list to
......@@ -63,7 +52,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
}
@Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
this.port = remoteInstance.getPort();
this.port = remoteInstance.getAddress().getPort();
submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
}
......@@ -99,7 +88,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery
switch (event.getType()) {
case "ADDED":
case "MODIFIED":
cache.put(event.getUid(), new RemoteInstance(event.getHost(), port, event.getUid().equals(this.uid)));
cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid))));
break;
case "DELETED":
cache.remove(event.getUid());
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.junit.Test;
import static org.hamcrest.core.Is.is;
......@@ -29,44 +30,43 @@ public class KubernetesCoordinatorTest {
private KubernetesCoordinator coordinator;
@Test
public void assertAdded() throws InterruptedException {
PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(watch, () -> "1");
coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
}
@Test
public void assertModified() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
coordinator = new KubernetesCoordinator(watch, () -> "1");
coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.3"));
assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.3"));
}
@Test
public void assertDeleted() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(watch, () -> "1");
coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(1));
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
}
@Test
public void assertError() throws InterruptedException {
PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
coordinator = new KubernetesCoordinator(watch, () -> "1");
coordinator.registerRemote(new RemoteInstance("0.0.0.0", 8454, true));
coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true)));
watch.await();
assertThat(coordinator.queryRemoteNodes().size(), is(2));
assertThat(coordinator.queryRemoteNodes().stream().filter(RemoteInstance::isSelf).findFirst().get().getHost(), is("10.0.0.1"));
assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1"));
}
}
\ No newline at end of file
......@@ -18,11 +18,8 @@
package org.apache.skywalking.oap.server.cluster.plugin.standalone;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import java.util.*;
import org.apache.skywalking.oap.server.core.cluster.*;
/**
* A cluster manager simulator. Work in memory only. Also return the current instance.
......@@ -35,7 +32,7 @@ public class StandaloneManager implements ClusterNodesQuery, ClusterRegister {
@Override public void registerRemote(RemoteInstance remoteInstance) {
this.remoteInstance = remoteInstance;
this.remoteInstance.setSelf(true);
this.remoteInstance.getAddress().setSelf(true);
}
@Override
......
......@@ -19,14 +19,15 @@
package org.apache.skywalking.oap.server.cluster.plugin.standalone;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.junit.*;
public class StandaloneManagerTest {
@Test
public void test() {
StandaloneManager standaloneManager = new StandaloneManager();
RemoteInstance remote1 = new RemoteInstance("A", 100, true);
RemoteInstance remote2 = new RemoteInstance("B", 100, false);
RemoteInstance remote1 = new RemoteInstance(new Address("A", 100, true));
RemoteInstance remote2 = new RemoteInstance(new Address("B", 100, false));
standaloneManager.registerRemote(remote1);
Assert.assertEquals(remote1, standaloneManager.queryRemoteNodes().get(0));
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import java.util.*;
import org.apache.curator.x.discovery.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.slf4j.*;
/**
......@@ -31,7 +32,7 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
private final ServiceDiscovery<RemoteInstance> serviceDiscovery;
private volatile ServiceCache<RemoteInstance> serviceCache;
private volatile RemoteInstance selfInstance;
private volatile Address selfAddress;
ZookeeperCoordinator(ServiceDiscovery<RemoteInstance> serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
......@@ -44,8 +45,8 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
ServiceInstance<RemoteInstance> thisInstance = ServiceInstance.<RemoteInstance>builder()
.name(remoteNamePath)
.id(UUID.randomUUID().toString())
.address(remoteInstance.getHost())
.port(remoteInstance.getPort())
.address(remoteInstance.getAddress().getHost())
.port(remoteInstance.getAddress().getPort())
.payload(remoteInstance)
.build();
......@@ -57,7 +58,7 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
serviceCache.start();
this.selfInstance = remoteInstance;
this.selfAddress = remoteInstance.getAddress();
} catch (Exception e) {
throw new ServiceRegisterException(e.getMessage());
}
......@@ -70,10 +71,10 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
serviceInstances.forEach(serviceInstance -> {
RemoteInstance instance = serviceInstance.getPayload();
if (instance.equals(selfInstance)) {
instance.setSelf(true);
if (instance.getAddress().equals(selfAddress)) {
instance.getAddress().setSelf(true);
} else {
instance.setSelf(false);
instance.getAddress().setSelf(false);
}
remoteInstanceDetails.add(instance);
});
......
......@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.curator.test.TestingServer;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.library.module.*;
import org.junit.*;
......@@ -52,7 +53,7 @@ public class ClusterModuleZookeeperProviderTestCase {
ClusterRegister moduleRegister = provider.getService(ClusterRegister.class);
ClusterNodesQuery clusterNodesQuery = provider.getService(ClusterNodesQuery.class);
RemoteInstance remoteInstance = new RemoteInstance("ProviderAHost", 1000, true);
RemoteInstance remoteInstance = new RemoteInstance(new Address("ProviderAHost", 1000, true));
moduleRegister.registerRemote(remoteInstance);
......@@ -63,8 +64,8 @@ public class ClusterModuleZookeeperProviderTestCase {
continue;
}
Assert.assertEquals(1, detailsList.size());
Assert.assertEquals("ProviderAHost", detailsList.get(0).getHost());
Assert.assertEquals(1000, detailsList.get(0).getPort());
Assert.assertEquals("ProviderAHost", detailsList.get(0).getAddress().getHost());
Assert.assertEquals(1000, detailsList.get(0).getAddress().getPort());
}
}
......
......@@ -17,7 +17,8 @@
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oap-server</artifactId>
<groupId>org.apache.skywalking</groupId>
......@@ -33,6 +34,11 @@
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-module</artifactId>
......@@ -63,6 +69,12 @@
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-testing</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
......
......@@ -22,55 +22,26 @@ import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
import org.apache.skywalking.oap.server.core.query.MetricQueryService;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.*;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.*;
import org.apache.skywalking.oap.server.core.remote.annotation.*;
import org.apache.skywalking.oap.server.core.remote.client.*;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -186,7 +157,7 @@ public class CoreModuleProvider extends ModuleProvider {
throw new ModuleStartException(e.getMessage(), e);
}
RemoteInstance gRPCServerInstance = new RemoteInstance(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
PersistenceTimer.INSTANCE.start(getManager());
......
......@@ -69,11 +69,11 @@ public class NonMergeDataCollection<STORAGE_DATA extends StorageData> implements
}
@Override public boolean containsKey(STORAGE_DATA key) {
throw new UnsupportedOperationException("None merge data collection not support containsKey operation.");
throw new UnsupportedOperationException("Close merge data collection not support containsKey operation.");
}
@Override public STORAGE_DATA get(STORAGE_DATA key) {
throw new UnsupportedOperationException("None merge data collection not support get operation.");
throw new UnsupportedOperationException("Close merge data collection not support get operation.");
}
@Override public void put(STORAGE_DATA value) {
......
......@@ -18,44 +18,26 @@
package org.apache.skywalking.oap.server.core.cluster;
import java.util.Objects;
import lombok.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.remote.client.Address;
/**
* @author peng-yongsheng
*/
@Getter
public class RemoteInstance implements Comparable<RemoteInstance> {
@Getter private final String host;
@Getter private final int port;
@Getter @Setter private boolean isSelf = false;
private final Address address;
public RemoteInstance(String host, int port, boolean isSelf) {
this.host = host;
this.port = port;
this.isSelf = isSelf;
}
@Override public int compareTo(RemoteInstance o) {
return toString().compareTo(o.toString());
public RemoteInstance(Address address) {
this.address = address;
}
@Override public String toString() {
return host + ":" + String.valueOf(port);
return address.toString();
}
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
RemoteInstance instance = (RemoteInstance)o;
return port == instance.port &&
Objects.equals(host, instance.host);
}
@Override public int hashCode() {
return Objects.hash(host, port);
@Override public int compareTo(RemoteInstance o) {
return this.address.compareTo(o.getAddress());
}
}
......@@ -103,7 +103,7 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
InventoryProcess.INSTANCE.in(networkAddress);
} else {
logger.warn("Network address {} heartbeat, but not found in storage.");
logger.warn("Network getAddress {} heartbeat, but not found in storage.");
}
}
......
......@@ -81,7 +81,7 @@ public class ServiceInstanceInventoryRegister implements IServiceInstanceInvento
@Override public int getOrCreate(int serviceId, int addressId, long registerTime) {
if (logger.isDebugEnabled()) {
logger.debug("get or create service instance by address id, service id: {}, address id: {}, registerTime: {}", serviceId, addressId, registerTime);
logger.debug("get or create service instance by getAddress id, service id: {}, getAddress id: {}, registerTime: {}", serviceId, addressId, registerTime);
}
int serviceInstanceId = getServiceInstanceInventoryCache().getServiceInstanceId(serviceId, addressId);
......
......@@ -25,27 +25,36 @@ import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGe
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.slf4j.*;
/**
* This class is Server-side streaming RPC implementation. It's a common service for OAP servers
* to receive message from each others.
* The stream data id is used to find the object to deserialize message.
* The next worker id is used to find the worker to process message.
*
* @author peng-yongsheng
*/
public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBase implements GRPCHandler {
private static final Logger logger = LoggerFactory.getLogger(RemoteServiceHandler.class);
private final ModuleManager moduleManager;
private final ModuleDefineHolder moduleDefineHolder;
private StreamDataClassGetter streamDataClassGetter;
public RemoteServiceHandler(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
public RemoteServiceHandler(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
}
@Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
if (Objects.isNull(streamDataClassGetter)) {
streamDataClassGetter = moduleManager.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
synchronized (RemoteServiceHandler.class) {
if (Objects.isNull(streamDataClassGetter)) {
streamDataClassGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
}
}
}
return new StreamObserver<RemoteMessage>() {
......@@ -59,8 +68,8 @@ public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBas
StreamData streamData = streamDataClass.newInstance();
streamData.deserialize(remoteData);
WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
} catch (InstantiationException | IllegalAccessException e) {
logger.warn(e.getMessage());
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.client;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
/**
* @author peng-yongsheng
*/
@Getter
public class Address implements Comparable<Address> {
private final String host;
private final int port;
@Setter private boolean isSelf;
public Address(String host, int port, boolean isSelf) {
this.host = host;
this.port = port;
this.isSelf = isSelf;
}
@Override public int hashCode() {
return toString().hashCode();
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Address address = (Address)obj;
return host.equals(address.host) && port == address.port;
}
@Override public String toString() {
return host + Const.ID_SPLIT + port;
}
@Override public int compareTo(Address o) {
return this.toString().compareTo(o.toString());
}
}
......@@ -18,13 +18,13 @@
package org.apache.skywalking.oap.server.core.remote.client;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
......@@ -32,27 +32,82 @@ import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.slf4j.*;
/**
* This is a wrapper of the gRPC client for sending message to each other OAP server.
* It contains a block queue to buffering the message and sending the message by batch.
*
* @author peng-yongsheng
*/
public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClient> {
public class GRPCRemoteClient implements RemoteClient {
private static final Logger logger = LoggerFactory.getLogger(GRPCRemoteClient.class);
private final GRPCClient client;
private final DataCarrier<RemoteMessage> carrier;
private final int channelSize;
private final int bufferSize;
private final Address address;
private final StreamDataClassGetter streamDataClassGetter;
private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0);
private GRPCClient client;
private DataCarrier<RemoteMessage> carrier;
private boolean isConnect;
public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInstance remoteInstance, int channelSize,
public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, Address address, int channelSize,
int bufferSize) {
this.streamDataClassGetter = streamDataClassGetter;
this.client = new GRPCClient(remoteInstance.getHost(), remoteInstance.getPort());
this.client.initialize();
this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
this.carrier.consume(new RemoteMessageConsumer(), 1);
this.address = address;
this.channelSize = channelSize;
this.bufferSize = bufferSize;
}
@Override public void connect() {
if (!isConnect) {
this.getClient().connect();
this.getDataCarrier().consume(new RemoteMessageConsumer(), 1);
this.isConnect = true;
}
}
/**
* Get channel state by the true value of request connection.
*
* @return a channel when the state to be ready
*/
ManagedChannel getChannel() {
return getClient().getChannel();
}
GRPCClient getClient() {
if (Objects.isNull(client)) {
synchronized (GRPCRemoteClient.class) {
if (Objects.isNull(client)) {
this.client = new GRPCClient(address.getHost(), address.getPort());
}
}
}
return client;
}
RemoteServiceGrpc.RemoteServiceStub getStub() {
return RemoteServiceGrpc.newStub(getChannel());
}
DataCarrier<RemoteMessage> getDataCarrier() {
if (Objects.isNull(this.carrier)) {
synchronized (GRPCRemoteClient.class) {
if (Objects.isNull(this.carrier)) {
this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
}
}
}
return this.carrier;
}
/**
* Push stream data which need to send to another OAP server.
*
* @param nextWorkerId the id of a worker which will process this stream data.
* @param streamData the entity contains the values.
*/
@Override public void push(int nextWorkerId, StreamData streamData) {
int streamDataId = streamDataClassGetter.findIdByClass(streamData.getClass());
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
......@@ -60,7 +115,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
builder.setStreamDataId(streamDataId);
builder.setRemoteData(streamData.serialize());
this.carrier.produce(builder.build());
this.getDataCarrier().produce(builder.build());
}
class RemoteMessageConsumer implements IConsumer<RemoteMessage> {
......@@ -68,12 +123,15 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
}
@Override public void consume(List<RemoteMessage> remoteMessages) {
StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
for (RemoteMessage remoteMessage : remoteMessages) {
streamObserver.onNext(remoteMessage);
try {
StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
for (RemoteMessage remoteMessage : remoteMessages) {
streamObserver.onNext(remoteMessage);
}
streamObserver.onCompleted();
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
streamObserver.onCompleted();
}
@Override public void onError(List<RemoteMessage> remoteMessages, Throwable t) {
......@@ -84,9 +142,14 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
}
}
/**
* Create a gRPC stream observer to sending stream data, one stream observer
* could send multiple stream data by a single consume.
* The max number of concurrency allowed at the same time is 10.
*
* @return stream observer
*/
private StreamObserver<RemoteMessage> createStreamObserver() {
RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel());
int sleepTotalMillis = 0;
int sleepMillis = 10;
while (concurrentStreamObserverNumber.incrementAndGet() > 10) {
......@@ -105,7 +168,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
}
}
return stub.call(new StreamObserver<Empty>() {
return getStub().call(new StreamObserver<Empty>() {
@Override public void onNext(Empty empty) {
}
......@@ -120,15 +183,20 @@ public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClie
});
}
@Override public int compareTo(GRPCRemoteClient o) {
return this.client.toString().compareTo(o.client.toString());
@Override public void close() {
if (Objects.nonNull(this.carrier)) {
this.carrier.shutdownConsumers();
}
if (Objects.nonNull(this.client)) {
this.client.shutdown();
}
}
public String getHost() {
return client.getHost();
@Override public Address getAddress() {
return address;
}
public int getPort() {
return client.getPort();
@Override public int compareTo(RemoteClient o) {
return address.compareTo(o.getAddress());
}
}
......@@ -23,11 +23,13 @@ import org.apache.skywalking.oap.server.core.remote.data.StreamData;
/**
* @author peng-yongsheng
*/
public interface RemoteClient {
public interface RemoteClient extends Comparable<RemoteClient> {
String getHost();
Address getAddress();
int getPort();
void connect();
void close();
void push(int nextWorkerId, StreamData streamData);
}
......@@ -27,37 +27,61 @@ import org.apache.skywalking.oap.server.library.module.*;
import org.slf4j.*;
/**
* This class manages the connections between OAP servers. There is a task schedule that will
* automatically query a server list from the cluster module. Such as Zookeeper cluster module
* or Kubernetes cluster module.
*
* @author peng-yongsheng
*/
public class RemoteClientManager implements Service {
private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);
private final ModuleManager moduleManager;
private final ModuleDefineHolder moduleDefineHolder;
private StreamDataClassGetter streamDataClassGetter;
private ClusterNodesQuery clusterNodesQuery;
private final List<RemoteClient> clientsA;
private final List<RemoteClient> clientsB;
private volatile List<RemoteClient> usingClients;
public RemoteClientManager(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
public RemoteClientManager(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
this.clientsA = new LinkedList<>();
this.clientsB = new LinkedList<>();
this.usingClients = clientsA;
}
public void start() {
this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
this.streamDataClassGetter = moduleManager.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 5, 5, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 1, 5, TimeUnit.SECONDS);
}
private void refresh() {
if (logger.isDebugEnabled()) {
logger.debug("Refresh remote nodes collection.");
}
/**
* Query OAP server list from the cluster module and create a new connection
* for the new node. Make the OAP server orderly because of each of the server
* will send stream data to each other by hash code.
*/
void refresh() {
try {
if (Objects.isNull(clusterNodesQuery)) {
synchronized (RemoteClientManager.class) {
if (Objects.isNull(clusterNodesQuery)) {
this.clusterNodesQuery = moduleDefineHolder.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
}
}
}
if (Objects.isNull(streamDataClassGetter)) {
synchronized (RemoteClientManager.class) {
if (Objects.isNull(streamDataClassGetter)) {
this.streamDataClassGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(StreamDataClassGetter.class);
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("Refresh remote nodes collection.");
}
List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
Collections.sort(instanceList);
......@@ -66,12 +90,11 @@ public class RemoteClientManager implements Service {
}
if (!compare(instanceList)) {
buildNewClients(instanceList);
reBuildRemoteClients(instanceList);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
public List<RemoteClient> getRemoteClient() {
......@@ -94,34 +117,68 @@ public class RemoteClientManager implements Service {
}
}
private void buildNewClients(List<RemoteInstance> remoteInstances) {
/**
* Compare clients between exist clients and remote instance collection. Move
* the clients into new client collection which are alive to avoid create a
* new channel. Shutdown the clients which could not find in cluster config.
*
* Create a gRPC client for remote instance except for self-instance.
*
* @param remoteInstances Remote instance collection by query cluster config.
*/
private synchronized void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
getFreeClients().clear();
Map<String, RemoteClient> currentClientsMap = new HashMap<>();
this.usingClients.forEach(remoteClient -> currentClientsMap.put(address(remoteClient.getHost(), remoteClient.getPort()), remoteClient));
Map<Address, RemoteClient> remoteClients = new HashMap<>();
getRemoteClient().forEach(client -> remoteClients.put(client.getAddress(), client));
Map<Address, Action> tempRemoteClients = new HashMap<>();
getRemoteClient().forEach(client -> tempRemoteClients.put(client.getAddress(), Action.Close));
remoteInstances.forEach(remoteInstance -> {
String address = address(remoteInstance.getHost(), remoteInstance.getPort());
RemoteClient client;
if (currentClientsMap.containsKey(address)) {
client = currentClientsMap.get(address);
if (tempRemoteClients.containsKey(remoteInstance.getAddress())) {
tempRemoteClients.put(remoteInstance.getAddress(), Action.Leave);
} else {
if (remoteInstance.isSelf()) {
client = new SelfRemoteClient(remoteInstance.getHost(), remoteInstance.getPort());
} else {
client = new GRPCRemoteClient(streamDataClassGetter, remoteInstance, 1, 3000);
}
tempRemoteClients.put(remoteInstance.getAddress(), Action.Create);
}
});
tempRemoteClients.forEach((address, action) -> {
switch (action) {
case Leave:
if (remoteClients.containsKey(address)) {
getFreeClients().add(remoteClients.get(address));
}
break;
case Create:
if (address.isSelf()) {
RemoteClient client = new SelfRemoteClient(address);
getFreeClients().add(client);
} else {
RemoteClient client = new GRPCRemoteClient(streamDataClassGetter, address, 1, 3000);
client.connect();
getFreeClients().add(client);
}
break;
}
getFreeClients().add(client);
});
Collections.sort(getFreeClients());
switchCurrentClients();
tempRemoteClients.forEach((address, action) -> {
if (Action.Close.equals(action) && remoteClients.containsKey(address)) {
remoteClients.get(address).close();
}
});
getFreeClients().clear();
}
private boolean compare(List<RemoteInstance> remoteInstances) {
if (usingClients.size() == remoteInstances.size()) {
for (int i = 0; i < usingClients.size(); i++) {
if (!address(usingClients.get(i).getHost(), usingClients.get(i).getPort()).equals(address(remoteInstances.get(i).getHost(), remoteInstances.get(i).getPort()))) {
if (!usingClients.get(i).getAddress().equals(remoteInstances.get(i).getAddress())) {
return false;
}
}
......@@ -131,7 +188,7 @@ public class RemoteClientManager implements Service {
}
}
private String address(String host, int port) {
return host + String.valueOf(port);
enum Action {
Close, Leave, Create
}
}
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.remote.client;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
......@@ -26,23 +27,28 @@ import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
*/
public class SelfRemoteClient implements RemoteClient {
private final String host;
private final int port;
private final Address address;
public SelfRemoteClient(String host, int port) {
this.host = host;
this.port = port;
public SelfRemoteClient(Address address) {
this.address = address;
}
@Override public String getHost() {
return host;
@Override public Address getAddress() {
return address;
}
@Override public int getPort() {
return port;
@Override public void connect() {
}
@Override public void close() {
throw new UnexpectedException("Self remote client invoked to close.");
}
@Override public void push(int nextWorkerId, StreamData streamData) {
WorkerInstances.INSTANCES.get(nextWorkerId).in(streamData);
}
@Override public int compareTo(RemoteClient o) {
return address.compareTo(o.getAddress());
}
}
......@@ -20,29 +20,20 @@ package org.apache.skywalking.oap.server.core.storage.ttl;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import lombok.Setter;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.DataTTL;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.storage.Downsampling;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -67,8 +58,8 @@ public enum DataTTLKeeperTimer {
private void delete() {
List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).isSelf()) {
logger.info("The selected first address is {}. Skip.", remoteInstances.get(0).toString());
if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
logger.info("The selected first getAddress is {}. Skip.", remoteInstances.get(0).toString());
return;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote;
import io.grpc.inprocess.*;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.*;
import static org.mockito.Mockito.*;
/**
* @author peng-yongsheng
*/
public class RemoteServiceHandlerTestCase {
@Rule
public final GrpcCleanupRule gRPCCleanup = new GrpcCleanupRule();
@Test
public void callTest() throws DuplicateProviderException, ProviderNotFoundException, IOException {
final int streamDataClassId = 1;
final int testWorkerId = 1;
ModuleManagerTesting moduleManager = new ModuleManagerTesting();
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
StreamDataClassGetter classGetter = mock(StreamDataClassGetter.class);
Class<?> dataClass = TestRemoteData.class;
when(classGetter.findClassById(streamDataClassId)).thenReturn((Class<StreamData>)dataClass);
moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
WorkerInstances.INSTANCES.put(testWorkerId, new TestWorker());
String serverName = InProcessServerBuilder.generateName();
gRPCCleanup.register(InProcessServerBuilder
.forName(serverName).directExecutor().addService(new RemoteServiceHandler(moduleManager)).build().start());
RemoteServiceGrpc.RemoteServiceStub remoteServiceStub = RemoteServiceGrpc.newStub(
gRPCCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
StreamObserver<RemoteMessage> streamObserver = remoteServiceStub.call(new StreamObserver<Empty>() {
@Override public void onNext(Empty empty) {
}
@Override public void onError(Throwable throwable) {
}
@Override public void onCompleted() {
}
});
RemoteMessage.Builder remoteMessage = RemoteMessage.newBuilder();
remoteMessage.setStreamDataId(streamDataClassId);
remoteMessage.setNextWorkerId(testWorkerId);
RemoteData.Builder remoteData = RemoteData.newBuilder();
remoteData.addDataStrings("test1");
remoteData.addDataStrings("test2");
remoteData.addDataLongs(10);
remoteData.addDataLongs(20);
remoteMessage.setRemoteData(remoteData);
streamObserver.onNext(remoteMessage.build());
streamObserver.onCompleted();
}
static class TestRemoteData extends StreamData {
private String str1;
private String str2;
private long long1;
private long long2;
@Override public int remoteHashCode() {
return 10;
}
@Override public void deserialize(RemoteData remoteData) {
str1 = remoteData.getDataStrings(0);
str2 = remoteData.getDataStrings(1);
long1 = remoteData.getDataLongs(0);
long2 = remoteData.getDataLongs(1);
Assert.assertEquals("test1", str1);
Assert.assertEquals("test2", str2);
Assert.assertEquals(10, long1);
Assert.assertEquals(20, long2);
}
@Override public RemoteData.Builder serialize() {
return null;
}
}
static class TestWorker extends AbstractWorker {
public TestWorker() {
super(1);
}
@Override public void in(Object o) {
TestRemoteData data = (TestRemoteData)o;
Assert.assertEquals("test1", data.str1);
Assert.assertEquals("test2", data.str2);
Assert.assertEquals(10, data.long1);
Assert.assertEquals(20, data.long2);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.client;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.junit.Assert;
import static org.mockito.Mockito.spy;
/**
* @author peng-yongsheng
*/
public class GRPCRemoteClientRealClient {
public static void main(String[] args) throws InterruptedException {
Address address = new Address("localhost", 10000, false);
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(new TestClassGetter(), address, 1, 10));
remoteClient.connect();
for (int i = 0; i < 10000; i++) {
remoteClient.push(1, new TestStreamData());
TimeUnit.SECONDS.sleep(1);
}
TimeUnit.MINUTES.sleep(10);
}
public static class TestClassGetter implements StreamDataClassGetter {
@Override public int findIdByClass(Class streamDataClass) {
return 1;
}
@Override public Class<StreamData> findClassById(int id) {
Class<?> clazz = TestStreamData.class;
return (Class<StreamData>)clazz;
}
}
public static class TestStreamData extends StreamData {
private long value;
@Override public int remoteHashCode() {
return 0;
}
@Override public void deserialize(RemoteData remoteData) {
this.value = remoteData.getDataLongs(0);
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataLongs(987);
return builder;
}
}
static class TestWorker extends AbstractWorker {
public TestWorker(int workerId) {
super(workerId);
}
@Override public void in(Object o) {
TestStreamData streamData = (TestStreamData)o;
Assert.assertEquals(987, streamData.value);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.client;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.testing.module.*;
/**
* @author peng-yongsheng
*/
public class GRPCRemoteClientRealServer {
public static void main(String[] args) throws ServerException, InterruptedException {
ModuleManagerTesting moduleManager = new ModuleManagerTesting();
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, new GRPCRemoteClientRealClient.TestClassGetter());
GRPCServer server = new GRPCServer("localhost", 10000);
server.initialize();
server.addHandler(new RemoteServiceHandler(moduleManager));
server.start();
TimeUnit.MINUTES.sleep(10);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.client;
import io.grpc.testing.GrpcServerRule;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.*;
import static org.mockito.Mockito.*;
/**
* @author peng-yongsheng
*/
public class GRPCRemoteClientTestCase {
private final int nextWorkerId = 1;
private ModuleManagerTesting moduleManager;
private StreamDataClassGetter classGetter;
@Rule public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@Before
public void before() {
moduleManager = new ModuleManagerTesting();
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
classGetter = mock(StreamDataClassGetter.class);
moduleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, classGetter);
TestWorker worker = new TestWorker(nextWorkerId);
WorkerInstances.INSTANCES.put(nextWorkerId, worker);
}
@Test
public void testPush() throws InterruptedException {
grpcServerRule.getServiceRegistry().addService(new RemoteServiceHandler(moduleManager));
Address address = new Address("not-important", 11, false);
GRPCRemoteClient remoteClient = spy(new GRPCRemoteClient(classGetter, address, 1, 10));
remoteClient.connect();
doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel();
when(classGetter.findIdByClass(TestStreamData.class)).thenReturn(1);
Class<?> dataClass = TestStreamData.class;
when(classGetter.findClassById(1)).thenReturn((Class<StreamData>)dataClass);
for (int i = 0; i < 12; i++) {
remoteClient.push(nextWorkerId, new TestStreamData());
}
TimeUnit.SECONDS.sleep(1);
}
public static class TestStreamData extends StreamData {
private long value;
@Override public int remoteHashCode() {
return 0;
}
@Override public void deserialize(RemoteData remoteData) {
this.value = remoteData.getDataLongs(0);
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataLongs(987);
return builder;
}
}
class TestWorker extends AbstractWorker {
public TestWorker(int workerId) {
super(workerId);
}
@Override public void in(Object o) {
TestStreamData streamData = (TestStreamData)o;
Assert.assertEquals(987, streamData.value);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.client;
import java.util.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.*;
import static org.mockito.Mockito.*;
/**
* @author peng-yongsheng
*/
public class RemoteClientManagerTestCase {
@Test
public void refresh() {
ModuleManagerTesting moduleManager = new ModuleManagerTesting();
ModuleDefineTesting clusterModuleDefine = new ModuleDefineTesting();
moduleManager.put(ClusterModule.NAME, clusterModuleDefine);
ModuleDefineTesting coreModuleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, coreModuleDefine);
ClusterNodesQuery clusterNodesQuery = mock(ClusterNodesQuery.class);
clusterModuleDefine.provider().registerServiceImplementation(ClusterNodesQuery.class, clusterNodesQuery);
StreamDataClassGetter streamDataClassGetter = mock(StreamDataClassGetter.class);
coreModuleDefine.provider().registerServiceImplementation(StreamDataClassGetter.class, streamDataClassGetter);
RemoteClientManager clientManager = new RemoteClientManager(moduleManager);
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
clientManager.refresh();
List<RemoteClient> remoteClients = clientManager.getRemoteClient();
Assert.assertEquals("host1", remoteClients.get(0).getAddress().getHost());
Assert.assertEquals("host2", remoteClients.get(1).getAddress().getHost());
Assert.assertEquals("host3", remoteClients.get(2).getAddress().getHost());
Assert.assertTrue(remoteClients.get(0) instanceof GRPCRemoteClient);
Assert.assertTrue(remoteClients.get(1) instanceof SelfRemoteClient);
Assert.assertTrue(remoteClients.get(2) instanceof GRPCRemoteClient);
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupTwoInstances());
clientManager.refresh();
remoteClients = clientManager.getRemoteClient();
Assert.assertEquals("host1", remoteClients.get(0).getAddress().getHost());
Assert.assertEquals("host2", remoteClients.get(1).getAddress().getHost());
Assert.assertEquals("host4", remoteClients.get(2).getAddress().getHost());
Assert.assertEquals("host5", remoteClients.get(3).getAddress().getHost());
Assert.assertTrue(remoteClients.get(0) instanceof GRPCRemoteClient);
Assert.assertTrue(remoteClients.get(1) instanceof SelfRemoteClient);
Assert.assertTrue(remoteClients.get(2) instanceof GRPCRemoteClient);
Assert.assertTrue(remoteClients.get(3) instanceof GRPCRemoteClient);
}
private List<RemoteInstance> groupOneInstances() {
List<RemoteInstance> instances = new ArrayList<>();
instances.add(new RemoteInstance(new Address("host3", 100, false)));
instances.add(new RemoteInstance(new Address("host1", 100, false)));
instances.add(new RemoteInstance(new Address("host2", 100, true)));
return instances;
}
private List<RemoteInstance> groupTwoInstances() {
List<RemoteInstance> instances = new ArrayList<>();
instances.add(new RemoteInstance(new Address("host5", 100, false)));
instances.add(new RemoteInstance(new Address("host1", 100, false)));
instances.add(new RemoteInstance(new Address("host2", 100, true)));
instances.add(new RemoteInstance(new Address("host4", 100, false)));
return instances;
}
}
......@@ -22,7 +22,8 @@ package org.apache.skywalking.oap.server.library.client;
* @author peng-yongsheng
*/
public interface Client {
void initialize() throws ClientException;
void connect() throws ClientException;
void shutdown();
}
......@@ -59,7 +59,7 @@ public class ElasticSearchClient implements Client {
this.namespace = namespace;
}
@Override public void initialize() {
@Override public void connect() {
List<HttpHost> pairsList = parseClusterNodes(clusterNodes);
client = new RestHighLevelClient(
......
......@@ -21,12 +21,15 @@ package org.apache.skywalking.oap.server.library.client.grpc;
import io.grpc.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.library.client.Client;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class GRPCClient implements Client {
private static final Logger logger = LoggerFactory.getLogger(GRPCClient.class);
@Getter private final String host;
@Getter private final int port;
......@@ -38,12 +41,16 @@ public class GRPCClient implements Client {
this.port = port;
}
@Override public void initialize() {
@Override public void connect() {
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
}
@Override public void shutdown() {
channel.shutdownNow();
try {
channel.shutdownNow();
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
public ManagedChannel getChannel() {
......
......@@ -18,19 +18,12 @@
package org.apache.skywalking.oap.server.library.client.jdbc.hikaricp;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import com.zaxxer.hikari.*;
import java.sql.*;
import java.util.Properties;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.ClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* JDBC Client uses HikariCP connection management lib to execute SQL.
......@@ -47,7 +40,7 @@ public class JDBCHikariCPClient implements Client {
hikariConfig = new HikariConfig(properties);
}
@Override public void initialize() throws ClientException {
@Override public void connect() {
dataSource = new HikariDataSource(hikariConfig);
}
......
......@@ -48,7 +48,7 @@ public class ElasticSearchClientTestCase {
builder.endObject();
ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null);
client.initialize();
client.connect();
String indexName = "test";
client.createIndex(indexName, settings, builder);
......
......@@ -43,10 +43,10 @@ enum ValueType {
// A point in time.
TIMESTAMP = 5;
// An IP address.
// An IP getAddress.
IP_ADDRESS = 6;
// An email address.
// An email getAddress.
EMAIL_ADDRESS = 7;
// A URI.
......
......@@ -59,7 +59,7 @@ public class NetworkAddressRegisterServletHandler extends JettyJsonHandler {
String networkAddress = networkAddresses.get(i).getAsString();
if (logger.isDebugEnabled()) {
logger.debug("network address register, network address: {}", networkAddress);
logger.debug("network getAddress register, network getAddress: {}", networkAddress);
}
int addressId = networkAddressInventoryRegister.getOrCreate(networkAddress);
......
......@@ -92,7 +92,7 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
if (networkAddressId == 0) {
if (logger.isDebugEnabled()) {
logger.debug("network address: {} from service id: {} exchange failed", standardBuilder.getNetworkAddress(), serviceId);
logger.debug("network getAddress: {} from service id: {} exchange failed", standardBuilder.getNetworkAddress(), serviceId);
}
return false;
} else {
......
......@@ -99,7 +99,7 @@ public class SpringSleuthSegmentBuilderTest implements SegmentListener {
}
@Override public int getOrCreate(int serviceId, int addressId, long registerTime) {
String key = "VitualAppCode:" + serviceId + ",address:" + addressId;
String key = "VitualAppCode:" + serviceId + ",getAddress:" + addressId;
if (applicationInstRegister.containsKey(key)) {
return applicationInstRegister.get(key);
} else {
......
......@@ -20,7 +20,7 @@ cluster:
# library the oap-libs folder with your ZooKeeper 3.4.x library.
# zookeeper:
# hostPort: localhost:2181
# # Retry Policy
# #Retry Policy
# baseSleepTimeMs: 1000 # initial amount of time to wait between retries
# maxRetries: 3 # max number of times to retry
# kubernetes:
......
......@@ -34,6 +34,7 @@
<logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.core.alarm.AlarmStandardPersistence" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.core" level="INFO"/>
<logger name="org.apache.skywalking.oap.server.core.remote.client" level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.library.buffer" level="INFO"/>
<Root level="DEBUG">
<AppenderRef ref="Console"/>
......
......@@ -88,7 +88,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
public void start() throws ModuleStartException {
try {
nameSpace.setNameSpace(config.getNameSpace());
elasticSearchClient.initialize();
elasticSearchClient.connect();
StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber());
installer.install(elasticSearchClient);
......
......@@ -19,43 +19,13 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2;
import java.util.Properties;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.ClientException;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.cache.*;
import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AggregationQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AlarmQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2BatchDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2EndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDeleteDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2RegisterLockDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstaller;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.*;
import org.slf4j.*;
/**
* H2 Storage provider is for demonstration and preview only.
......@@ -117,7 +87,7 @@ public class H2StorageProvider extends ModuleProvider {
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
try {
h2Client.initialize();
h2Client.connect();
H2TableInstaller installer = new H2TableInstaller(getManager());
installer.install(h2Client);
......@@ -125,8 +95,6 @@ public class H2StorageProvider extends ModuleProvider {
new H2RegisterLockInstaller().install(h2Client);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
} catch (ClientException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册