diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java index 104f3167c396a9965aad00b2e9f22d1cfdd583d8..e367f4ddb4d5b89657634a051c87f674fa46a5f8 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.remote.client; import io.grpc.stub.StreamObserver; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; @@ -40,6 +41,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable carrier; private final StreamDataClassGetter streamDataClassGetter; + private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0); public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInstance remoteInstance, int channelSize, int bufferSize) { @@ -67,6 +69,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable remoteMessages) { StreamObserver streamObserver = createStreamObserver(); + for (RemoteMessage remoteMessage : remoteMessages) { streamObserver.onNext(remoteMessage); } @@ -84,67 +87,39 @@ public class GRPCRemoteClient implements RemoteClient, Comparable createStreamObserver() { RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel()); - StreamStatus status = new StreamStatus(false); + int sleepTotalMillis = 0; + int sleepMillis = 10; + while (concurrentStreamObserverNumber.incrementAndGet() > 10) { + concurrentStreamObserverNumber.addAndGet(-1); + + try { + Thread.sleep(sleepMillis); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + + sleepTotalMillis += sleepMillis; + + if (sleepTotalMillis > 60000) { + logger.warn("Remote client block times over 60 seconds."); + } + } + return stub.call(new StreamObserver() { @Override public void onNext(Empty empty) { } @Override public void onError(Throwable throwable) { + concurrentStreamObserverNumber.addAndGet(-1); logger.error(throwable.getMessage(), throwable); } @Override public void onCompleted() { - status.finished(); + concurrentStreamObserverNumber.addAndGet(-1); } }); } - class StreamStatus { - - private final Logger logger = LoggerFactory.getLogger(StreamStatus.class); - - private volatile boolean status; - - StreamStatus(boolean status) { - this.status = status; - } - - public boolean isFinish() { - return status; - } - - void finished() { - this.status = true; - } - - /** - * @param maxTimeout max wait time, milliseconds. - */ - public void wait4Finish(long maxTimeout) { - long time = 0; - while (!status) { - if (time > maxTimeout) { - break; - } - try2Sleep(5); - time += 5; - } - } - - /** - * Try to sleep, and ignore the {@link InterruptedException} - * - * @param millis the length of time to sleep in milliseconds - */ - private void try2Sleep(long millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } - } - } - @Override public int compareTo(GRPCRemoteClient o) { return this.client.toString().compareTo(o.client.toString()); }