未验证 提交 3eafda28 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #2360 from apache/async-context

Async core APIs in java agent
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.agent.core.context;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
......@@ -28,41 +27,38 @@ import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
*/
public interface AbstractTracerContext {
/**
* Prepare for the cross-process propagation.
* How to initialize the carrier, depends on the implementation.
* Prepare for the cross-process propagation. How to initialize the carrier, depends on the implementation.
*
* @param carrier to carry the context for crossing process.
*/
void inject(ContextCarrier carrier);
/**
* Build the reference between this segment and a cross-process segment.
* How to build, depends on the implementation.
* Build the reference between this segment and a cross-process segment. How to build, depends on the
* implementation.
*
* @param carrier carried the context from a cross-process segment.
*/
void extract(ContextCarrier carrier);
/**
* Capture a snapshot for cross-thread propagation.
* It's a similar concept with ActiveSpan.Continuation in OpenTracing-java
* How to build, depends on the implementation.
* Capture a snapshot for cross-thread propagation. It's a similar concept with ActiveSpan.Continuation in
* OpenTracing-java How to build, depends on the implementation.
*
* @return the {@link ContextSnapshot} , which includes the reference context.
*/
ContextSnapshot capture();
/**
* Build the reference between this segment and a cross-thread segment.
* How to build, depends on the implementation.
* Build the reference between this segment and a cross-thread segment. How to build, depends on the
* implementation.
*
* @param snapshot from {@link #capture()} in the parent thread.
*/
void continued(ContextSnapshot snapshot);
/**
* Get the global trace id, if needEnhance.
* How to build, depends on the implementation.
* Get the global trace id, if needEnhance. How to build, depends on the implementation.
*
* @return the string represents the id.
*/
......@@ -102,7 +98,21 @@ public interface AbstractTracerContext {
* Finish the given span, and the given span should be the active span of current tracing context(stack)
*
* @param span to finish
* @return true when context should be clear.
*/
boolean stopSpan(AbstractSpan span);
/**
* Notify this context, current span is going to be finished async in another thread.
*
* @return The current context
*/
void stopSpan(AbstractSpan span);
AbstractTracerContext awaitFinishAsync();
/**
* The given span could be stopped officially.
*
* @param span to be stopped.
*/
void asyncStop(AsyncSpan span);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.context;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
/**
* Span could use these APIs to active and extend its lift cycle across thread.
*
* This is typical used in async plugin, especially RPC plugins.
*
* @author wusheng
*/
public interface AsyncSpan {
/**
* The span finish at current tracing context, but the current span is still alive, until {@link #asyncFinish}
* called.
*
* This method must be called
*
* 1. In original thread(tracing context).
* 2. Current span is active span.
*
* During alive, tags, logs and attributes of the span could be changed, in any thread.
*
* The execution times of {@link #prepareForAsync} and {@link #asyncFinish()} must match.
*
* @return the current span
*/
AbstractSpan prepareForAsync();
/**
* Notify the span, it could be finished.
*
* The execution times of {@link #prepareForAsync} and {@link #asyncFinish()} must match.
*
* @return the current span
*/
AbstractSpan asyncFinish();
}
......@@ -18,14 +18,11 @@
package org.apache.skywalking.apm.agent.core.context;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.boot.*;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.context.trace.*;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.logging.api.*;
import org.apache.skywalking.apm.agent.core.sampling.SamplingService;
import org.apache.skywalking.apm.util.StringUtil;
......@@ -39,7 +36,7 @@ import org.apache.skywalking.apm.util.StringUtil;
*
* @author wusheng
*/
public class ContextManager implements TracingContextListener, BootService, IgnoreTracerContextListener {
public class ContextManager implements BootService {
private static final ILog logger = LogManager.getLogger(ContextManager.class);
private static ThreadLocal<AbstractTracerContext> CONTEXT = new ThreadLocal<AbstractTracerContext>();
private static ThreadLocal<RuntimeContext> RUNTIME_CONTEXT = new ThreadLocal<RuntimeContext>();
......@@ -59,7 +56,7 @@ public class ContextManager implements TracingContextListener, BootService, Igno
} else {
if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue()
&& RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue()
) {
) {
context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);
} else {
/**
......@@ -152,6 +149,14 @@ public class ContextManager implements TracingContextListener, BootService, Igno
}
}
public static AbstractTracerContext awaitFinishAsync(AbstractSpan span) {
AbstractSpan activeSpan = activeSpan();
if (span != activeSpan) {
throw new RuntimeException("Span is not the active in current context.");
}
return get().awaitFinishAsync();
}
public static AbstractSpan activeSpan() {
return get().activeSpan();
}
......@@ -161,7 +166,9 @@ public class ContextManager implements TracingContextListener, BootService, Igno
}
public static void stopSpan(AbstractSpan span) {
get().stopSpan(span);
if (get().stopSpan(span)) {
CONTEXT.remove();
}
}
@Override
......@@ -171,8 +178,6 @@ public class ContextManager implements TracingContextListener, BootService, Igno
@Override
public void boot() {
ContextManagerExtendService service = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
service.registerListeners(this);
}
@Override
......@@ -184,16 +189,6 @@ public class ContextManager implements TracingContextListener, BootService, Igno
}
@Override
public void afterFinished(TraceSegment traceSegment) {
CONTEXT.remove();
}
@Override
public void afterFinished(IgnoredTracerContext traceSegment) {
CONTEXT.remove();
}
public static boolean isActive() {
return get() != null;
}
......
......@@ -18,9 +18,7 @@
package org.apache.skywalking.apm.agent.core.context;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.boot.*;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.sampling.SamplingService;
......@@ -45,11 +43,6 @@ public class ContextManagerExtendService implements BootService {
}
public void registerListeners(ContextManager manager) {
TracingContext.ListenerManager.add(manager);
IgnoredTracerContext.ListenerManager.add(manager);
}
public AbstractTracerContext createTraceContext(String operationName, boolean forceSampling) {
AbstractTracerContext context;
int suffixIdx = operationName.lastIndexOf(".");
......
......@@ -16,17 +16,14 @@
*
*/
package org.apache.skywalking.apm.agent.core.context;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.NoopSpan;
import java.util.*;
import org.apache.skywalking.apm.agent.core.context.trace.*;
/**
* The <code>IgnoredTracerContext</code> represent a context should be ignored.
* So it just maintains the stack with an integer depth field.
* The <code>IgnoredTracerContext</code> represent a context should be ignored. So it just maintains the stack with an
* integer depth field.
*
* All operations through this will be ignored, and keep the memory and gc cost as low as possible.
*
......@@ -88,11 +85,20 @@ public class IgnoredTracerContext implements AbstractTracerContext {
}
@Override
public void stopSpan(AbstractSpan span) {
public boolean stopSpan(AbstractSpan span) {
stackDepth--;
if (stackDepth == 0) {
ListenerManager.notifyFinish(this);
}
return stackDepth == 0;
}
@Override public AbstractTracerContext awaitFinishAsync() {
return this;
}
@Override public void asyncStop(AsyncSpan span) {
}
public static class ListenerManager {
......
......@@ -18,25 +18,14 @@
package org.apache.skywalking.apm.agent.core.context;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.EntrySpan;
import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
import org.apache.skywalking.apm.agent.core.context.trace.LocalSpan;
import org.apache.skywalking.apm.agent.core.context.trace.NoopExitSpan;
import org.apache.skywalking.apm.agent.core.context.trace.NoopSpan;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef;
import org.apache.skywalking.apm.agent.core.context.trace.WithPeerInfo;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryManager;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.dictionary.PossibleFound;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.context.trace.*;
import org.apache.skywalking.apm.agent.core.dictionary.*;
import org.apache.skywalking.apm.agent.core.logging.api.*;
import org.apache.skywalking.apm.agent.core.sampling.SamplingService;
import org.apache.skywalking.apm.util.StringUtil;
......@@ -80,6 +69,13 @@ public class TracingContext implements AbstractTracerContext {
*/
private int spanIdGenerator;
/**
* The counter indicates
*/
private AtomicInteger asyncSpanCounter;
private volatile boolean isRunningInAsyncMode;
private ReentrantLock asyncFinishLock;
/**
* Initialize all fields with default value.
*/
......@@ -89,6 +85,9 @@ public class TracingContext implements AbstractTracerContext {
if (samplingService == null) {
samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
}
asyncSpanCounter = new AtomicInteger(0);
isRunningInAsyncMode = false;
asyncFinishLock = new ReentrantLock();
}
/**
......@@ -392,7 +391,7 @@ public class TracingContext implements AbstractTracerContext {
* @param span to finish
*/
@Override
public void stopSpan(AbstractSpan span) {
public boolean stopSpan(AbstractSpan span) {
AbstractSpan lastSpan = peek();
if (lastSpan == span) {
if (lastSpan instanceof AbstractTracingSpan) {
......@@ -407,9 +406,41 @@ public class TracingContext implements AbstractTracerContext {
throw new IllegalStateException("Stopping the unexpected span = " + span);
}
if (activeSpanStack.isEmpty()) {
this.finish();
if (checkFinishConditions()) {
finish();
}
return activeSpanStack.isEmpty();
}
@Override public AbstractTracerContext awaitFinishAsync() {
isRunningInAsyncMode = true;
asyncSpanCounter.addAndGet(1);
return this;
}
@Override public void asyncStop(AsyncSpan span) {
asyncSpanCounter.addAndGet(-1);
if (checkFinishConditions()) {
finish();
}
}
private boolean checkFinishConditions() {
if (isRunningInAsyncMode) {
asyncFinishLock.lock();
}
try {
if (activeSpanStack.isEmpty() && asyncSpanCounter.get() == 0) {
return true;
}
} finally {
if (isRunningInAsyncMode) {
asyncFinishLock.unlock();
}
}
return false;
}
/**
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.agent.core.context.trace;
import java.util.Map;
import org.apache.skywalking.apm.agent.core.context.AsyncSpan;
import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
import org.apache.skywalking.apm.network.trace.component.Component;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
......@@ -28,7 +29,7 @@ import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
*
* @author wusheng
*/
public interface AbstractSpan {
public interface AbstractSpan extends AsyncSpan {
/**
* Set the component id, which defines in {@link ComponentsDefine}
*
......
......@@ -18,15 +18,10 @@
package org.apache.skywalking.apm.agent.core.context.trace;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
import org.apache.skywalking.apm.agent.core.context.util.KeyValuePair;
import org.apache.skywalking.apm.agent.core.context.util.TagValuePair;
import org.apache.skywalking.apm.agent.core.context.util.ThrowableTransformer;
import java.util.*;
import org.apache.skywalking.apm.agent.core.context.*;
import org.apache.skywalking.apm.agent.core.context.tag.*;
import org.apache.skywalking.apm.agent.core.context.util.*;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.network.language.agent.SpanType;
import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
......@@ -45,6 +40,9 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
protected String operationName;
protected int operationId;
protected SpanLayer layer;
protected boolean isInAsyncMode = false;
protected volatile AbstractTracerContext context;
/**
* The start time of this Span.
*/
......@@ -322,4 +320,20 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
refs.add(ref);
}
}
@Override public AbstractSpan prepareForAsync() {
context = ContextManager.awaitFinishAsync(this);
isInAsyncMode = true;
return this;
}
@Override public AbstractSpan asyncFinish() {
if (!isInAsyncMode) {
throw new RuntimeException("Span is not in async mode, please use '#prepareForAsync' to active.");
}
this.endTime = System.currentTimeMillis();
context.asyncStop(this);
return this;
}
}
......@@ -115,4 +115,12 @@ public class NoopSpan implements AbstractSpan {
@Override public AbstractSpan setPeer(String remotePeer) {
return this;
}
@Override public AbstractSpan prepareForAsync() {
return this;
}
@Override public AbstractSpan asyncFinish() {
return this;
}
}
......@@ -69,16 +69,13 @@ public class ServiceManagerTest {
private void assertIgnoreTracingContextListener() throws Exception {
List<TracingContextListener> listeners = getFieldValue(IgnoredTracerContext.ListenerManager.class, "LISTENERS");
assertThat(listeners.size(), is(1));
assertThat(listeners.contains(ServiceManager.INSTANCE.findService(ContextManager.class)), is(true));
assertThat(listeners.size(), is(0));
}
private void assertTracingContextListener() throws Exception {
List<TracingContextListener> listeners = getFieldValue(TracingContext.ListenerManager.class, "LISTENERS");
assertThat(listeners.size(), is(2));
assertThat(listeners.size(), is(1));
assertThat(listeners.contains(ServiceManager.INSTANCE.findService(ContextManager.class)), is(true));
assertThat(listeners.contains(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class)), is(true));
}
......
......@@ -160,6 +160,42 @@ SpanLayer is the catalog of span. Here are 5 values:
Component IDs are defined and reserved by SkyWalking project.
For component name/ID extension, please follow [cComponent library definition and extension](Component-library-settings.md) document.
### Advanced APIs
#### Async Span APIs
There is a set of advanced APIs in Span, which work specific for async scenario. When tags, logs, attributes(including end time) of the span
needs to set in another thread, you should use these APIs.
```java
/**
* The span finish at current tracing context, but the current span is still alive, until {@link #asyncFinish}
* called.
*
* This method must be called<br/>
* 1. In original thread(tracing context).
* 2. Current span is active span.
*
* During alive, tags, logs and attributes of the span could be changed, in any thread.
*
* The execution times of {@link #prepareForAsync} and {@link #asyncFinish()} must match.
*
* @return the current span
*/
AbstractSpan prepareForAsync();
/**
* Notify the span, it could be finished.
*
* The execution times of {@link #prepareForAsync} and {@link #asyncFinish()} must match.
*
* @return the current span
*/
AbstractSpan asyncFinish();
```
1. Call `#prepareForAsync` in original context.
1. Propagate the span to any other thread.
1. After all set, call `#asyncFinish` in any thread.
1. Tracing context will be finished and report to backend when all spans's `#prepareForAsync` finished(Judged by count of API execution).
## Develop a plugin
### Abstract
The basic method to trace is intercepting a Java method, by using byte code manipulation tech and AOP concept.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册