提交 49f50a24 编写于 作者: K kezhenxu94 提交者: wu-sheng

Support upgrade backend w/o rebooting agents (#3170)

* Support upgrade backend w/o rebooting agents
上级 2b274f3c
......@@ -45,7 +45,11 @@ pipeline {
// thus save unnecessary E2E builds(which is expensive)
sh './mvnw checkstyle:check apache-rat:check'
sh './mvnw -Dcheckstyle.skip -Drat.skip -T2 -Dmaven.compile.fork -Dmaven.compiler.maxmem=3072 -DskipTests clean package'
sh 'tar -zxf dist/apache-skywalking-apm-bin.tar.gz -C dist'
// Some of the tests will modify files in the distribution folder, e.g. cluster test will modify the application.yml
// so we give each test a separate distribution folder here
sh 'mkdir -p dist-for-single-node-service && tar -zxf dist/apache-skywalking-apm-bin.tar.gz -C dist-for-single-node-service'
sh 'mkdir -p dist-for-cluster && tar -zxf dist/apache-skywalking-apm-bin.tar.gz -C dist-for-cluster'
sh 'mkdir -p dist-for-agent-reboot && tar -zxf dist/apache-skywalking-apm-bin.tar.gz -C dist-for-agent-reboot'
}
}
......@@ -62,6 +66,12 @@ pipeline {
sh './mvnw -Dbuild.id=${BUILD_ID} -f test/e2e/pom.xml -pl e2e-cluster/test-runner -am verify'
}
}
stage('Run Agent Reboot Tests') {
steps {
sh './mvnw -Dbuild.id=${BUILD_ID} -f test/e2e/pom.xml -pl e2e-agent-reboot -am verify'
}
}
}
}
}
......@@ -78,8 +88,8 @@ pipeline {
// the container can not be deleted by `deleteDir()`, we need to mount it again and delete them
// inside the container
//
// Delete `/dist` folder
sh 'docker run -v $(pwd)/dist:/sw alpine sleep 10 && rm -rf /sw/*'
// Delete all distribution folder
sh 'docker run -v $(pwd):/sw alpine sleep 10 && rm -rf /sw/dist-for-cluster/* /sw/dist-for-single-node-service/* /sw/dist-for-agent-reboot/*'
deleteDir()
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.network.trace.component.command;
import org.apache.skywalking.apm.network.common.Command;
/**
* @author kezhenxu94
*/
public class CommandDeserializer {
public static BaseCommand deserialize(final Command command) {
final String commandName = command.getCommand();
if (ServiceResetCommand.NAME.equals(commandName)) {
return ServiceResetCommand.DESERIALIZER.deserialize(command);
}
throw new UnsupportedCommandException(command);
}
}
......@@ -23,6 +23,6 @@ import org.apache.skywalking.apm.network.common.Command;
/**
* @author peng-yongsheng
*/
public interface Deserializable {
void deserialize(Command command);
public interface Deserializable<T extends BaseCommand> {
T deserialize(Command command);
}
......@@ -19,19 +19,38 @@
package org.apache.skywalking.apm.network.trace.component.command;
import org.apache.skywalking.apm.network.common.Command;
import org.apache.skywalking.apm.network.common.KeyStringValuePair;
import java.util.List;
/**
* Clear the service metadata cache and other metadata caches belong to it, and re-register them.
*
* @author peng-yongsheng
*/
public class ServiceResetCommand extends BaseCommand implements Serializable {
public class ServiceResetCommand extends BaseCommand implements Serializable, Deserializable<ServiceResetCommand> {
public static final Deserializable<ServiceResetCommand> DESERIALIZER = new ServiceResetCommand("");
public static final String NAME = "ServiceMetadataReset";
public ServiceResetCommand(String serialNumber) {
super("ServiceMetadataReset", serialNumber);
super(NAME, serialNumber);
}
@Override public Command.Builder serialize() {
@Override
public Command.Builder serialize() {
return commandBuilder();
}
@Override
public ServiceResetCommand deserialize(Command command) {
final List<KeyStringValuePair> argsList = command.getArgsList();
String serialNumber = null;
for (final KeyStringValuePair pair : argsList) {
if ("SerialNumber".equals(pair.getKey())) {
serialNumber = pair.getValue();
break;
}
}
return new ServiceResetCommand(serialNumber);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.network.trace.component.command;
import org.apache.skywalking.apm.network.common.Command;
/**
* @author kezhenxu94
*/
public class UnsupportedCommandException extends RuntimeException {
private final Command command;
public UnsupportedCommandException(final Command command) {
this.command = command;
}
public Command getCommand() {
return command;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.agent.core.commands;
import org.apache.skywalking.apm.network.common.Command;
/**
* Indicates that the execution of a command failed
*
* @author Zhang Xin
* @author kezhenxu94
*/
public class CommandExecutionException extends Throwable {
private final Command command;
/**
* Constructs a new {@code ExecuteFailedException} with null detail message
* and the command whose execution failed
*
* @param command the command whose execution failed
*/
public CommandExecutionException(final Command command) {
this(null, command);
}
/**
* Constructs a new {@code ExecuteFailedException} with given detail message
* and the command whose execution failed
*
* @param message the detail message of the exception
* @param command the command whose execution failed
*/
public CommandExecutionException(final String message, final Command command) {
super(message);
this.command = command;
}
public Command command() {
return command;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.agent.core.commands;
import org.apache.skywalking.apm.network.trace.component.command.BaseCommand;
/**
* Command executor that can handle a given command, implementations are required to be stateless,
* i.e. the previous execution of a command cannot affect the next execution of another command.
*
* @author Zhang Xin
* @author kezhenxu94
*/
public interface CommandExecutor {
/**
* @param command the command that is to be executed
* @throws CommandExecutionException when the executor failed to execute the command
*/
void execute(BaseCommand command) throws CommandExecutionException;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.agent.core.commands;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.commands.executor.NoopCommandExecutor;
import org.apache.skywalking.apm.agent.core.commands.executor.ServiceResetCommandExecutor;
import org.apache.skywalking.apm.network.trace.component.command.BaseCommand;
import org.apache.skywalking.apm.network.trace.component.command.ServiceResetCommand;
import java.util.HashMap;
import java.util.Map;
/**
* Command executor service, acts like a routing executor that controls all commands' execution,
* is responsible for managing all the mappings between commands and their executors,
* one can simply invoke {@link #execute(BaseCommand)} and it will routes the
* command to corresponding executor.
*
* Registering command executor for new command in {@link #commandExecutorMap}
* is required to support new command.
*
* @author Zhang Xin
* @author kezhenxu94
*/
@DefaultImplementor
public class CommandExecutorService implements BootService, CommandExecutor {
private Map<String, CommandExecutor> commandExecutorMap;
@Override
public void prepare() throws Throwable {
commandExecutorMap = new HashMap<String, CommandExecutor>();
// Register all the supported commands with their executors here
commandExecutorMap.put(ServiceResetCommand.NAME, new ServiceResetCommandExecutor());
}
@Override
public void boot() throws Throwable {
}
@Override
public void onComplete() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
}
@Override
public void execute(final BaseCommand command) throws CommandExecutionException {
executorForCommand(command).execute(command);
}
private CommandExecutor executorForCommand(final BaseCommand command) {
final CommandExecutor executor = commandExecutorMap.get(command.getCommand());
if (executor != null) {
return executor;
}
return NoopCommandExecutor.INSTANCE;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.agent.core.commands;
import java.util.Deque;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @author Zhang Xin
*/
public class CommandSerialNumberCache {
private static final int DEFAULT_MAX_CAPACITY = 64;
private final Deque<String> queue;
private final int maxCapacity;
public CommandSerialNumberCache() {
this(DEFAULT_MAX_CAPACITY);
}
public CommandSerialNumberCache(int maxCapacity) {
queue = new LinkedBlockingDeque<String>(maxCapacity);
this.maxCapacity = maxCapacity;
}
public void add(String number) {
if (queue.size() >= maxCapacity) {
queue.pollFirst();
}
queue.add(number);
}
public boolean contain(String command) {
return queue.contains(command);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.agent.core.commands;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.network.common.Command;
import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.trace.component.command.BaseCommand;
import org.apache.skywalking.apm.network.trace.component.command.CommandDeserializer;
import org.apache.skywalking.apm.network.trace.component.command.UnsupportedCommandException;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@DefaultImplementor
public class CommandService implements BootService, Runnable {
private static final ILog LOGGER = LogManager.getLogger(CommandService.class);
private volatile boolean isRunning = true;
private ExecutorService executorService = Executors.newSingleThreadExecutor();
private LinkedBlockingQueue<BaseCommand> commands = new LinkedBlockingQueue<BaseCommand>(64);
private CommandSerialNumberCache serialNumberCache = new CommandSerialNumberCache();
@Override
public void prepare() throws Throwable {
}
@Override
public void boot() throws Throwable {
executorService.submit(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(final Throwable t) {
LOGGER.error(t, "CommandService failed to execute commands");
}
}));
}
@Override
public void run() {
final CommandExecutorService commandExecutorService = ServiceManager.INSTANCE.findService(CommandExecutorService.class);
while (isRunning) {
try {
BaseCommand command = commands.take();
if (isCommandExecuted(command)) {
continue;
}
commandExecutorService.execute(command);
serialNumberCache.add(command.getSerialNumber());
} catch (InterruptedException e) {
LOGGER.error(e, "Failed to take commands.");
} catch (CommandExecutionException e) {
LOGGER.error(e, "Failed to execute command[{}].", e.command().getCommand());
} catch (Throwable e) {
LOGGER.error(e, "There is unexpected exception");
}
}
}
private boolean isCommandExecuted(BaseCommand command) {
return serialNumberCache.contain(command.getSerialNumber());
}
@Override
public void onComplete() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
isRunning = false;
commands.drainTo(new ArrayList<BaseCommand>());
executorService.shutdown();
}
public void receiveCommand(Commands commands) {
for (Command command : commands.getCommandsList()) {
try {
BaseCommand baseCommand = CommandDeserializer.deserialize(command);
if (isCommandExecuted(baseCommand)) {
LOGGER.warn("Command[{}] is executed, ignored", baseCommand.getCommand());
continue;
}
boolean success = this.commands.offer(baseCommand);
if (!success && LOGGER.isWarnEnable()) {
LOGGER.warn("Command[{}, {}] cannot add to command list. because the command list is full.",
baseCommand.getCommand(), baseCommand.getSerialNumber());
}
} catch (UnsupportedCommandException e) {
if (LOGGER.isWarnEnable()) {
LOGGER.warn("Received unsupported command[{}].", e.getCommand().getCommand());
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.agent.core.commands.executor;
import org.apache.skywalking.apm.agent.core.commands.CommandExecutionException;
import org.apache.skywalking.apm.agent.core.commands.CommandExecutor;
import org.apache.skywalking.apm.network.trace.component.command.BaseCommand;
/**
* A dummy executor that does nothing when executing a command
*/
public enum NoopCommandExecutor implements CommandExecutor {
INSTANCE;
@Override
public void execute(final BaseCommand command) throws CommandExecutionException {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.apm.agent.core.commands.executor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.commands.CommandExecutionException;
import org.apache.skywalking.apm.agent.core.commands.CommandExecutor;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.dictionary.EndpointNameDictionary;
import org.apache.skywalking.apm.agent.core.dictionary.NetworkAddressDictionary;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.ServiceAndEndpointRegisterClient;
import org.apache.skywalking.apm.network.trace.component.command.BaseCommand;
import org.apache.skywalking.apm.network.trace.component.command.ServiceResetCommand;
/**
* Command executor that executes the {@link ServiceResetCommand} command
*
* @author Zhang Xin
* @author kezhenxu94
*/
public class ServiceResetCommandExecutor implements CommandExecutor {
private static final ILog LOGGER = LogManager.getLogger(ServiceResetCommandExecutor.class);
@Override
public void execute(final BaseCommand command) throws CommandExecutionException {
LOGGER.warn("Received ServiceResetCommand, a re-register task is scheduled.");
ServiceManager.INSTANCE.findService(ServiceAndEndpointRegisterClient.class).coolDown();
RemoteDownstreamConfig.Agent.SERVICE_ID = DictionaryUtil.nullValue();
RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID = DictionaryUtil.nullValue();
RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME = DictionaryUtil.nullValue();
NetworkAddressDictionary.INSTANCE.clear();
EndpointNameDictionary.INSTANCE.clear();
}
}
......@@ -93,6 +93,13 @@ public class Config {
* How depth the agent goes, when log cause exceptions.
*/
public static int CAUSE_EXCEPTION_DEPTH = 5;
/**
* How long should the agent wait (in minute)
* before re-registering to the OAP server
* after receiving reset command
*/
public static int COOL_DOWN_THRESHOLD = 10;
}
public static class Collector {
......
......@@ -32,5 +32,7 @@ public class RemoteDownstreamConfig {
public volatile static int SERVICE_ID = DictionaryUtil.nullValue();
public volatile static int SERVICE_INSTANCE_ID = DictionaryUtil.nullValue();
public volatile static long INSTANCE_REGISTERED_TIME = DictionaryUtil.nullValue();
}
}
......@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.EntrySpan;
......@@ -453,7 +454,7 @@ public class TracingContext implements AbstractTracerContext {
try {
if (activeSpanStack.isEmpty() && running && (!isRunningInAsyncMode || asyncSpanCounter.get() == 0)) {
TraceSegment finishedSegment = segment.finish(isLimitMechanismWorking());
/**
/*
* Recheck the segment if the segment contains only one span.
* Because in the runtime, can't sure this segment is part of distributed trace.
*
......@@ -464,6 +465,16 @@ public class TracingContext implements AbstractTracerContext {
finishedSegment.setIgnore(true);
}
}
/*
* Check that the segment is created after the agent (re-)registered to backend,
* otherwise the segment may be created when the agent is still rebooting and should
* be ignored
*/
if (segment.createTime() < RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME) {
finishedSegment.setIgnore(true);
}
TracingContext.ListenerManager.notifyFinish(finishedSegment);
running = false;
......
......@@ -72,6 +72,8 @@ public class TraceSegment {
private boolean isSizeLimited = false;
private final long createTime;
/**
* Create a default/empty trace segment, with current time as start time, and generate a new segment id.
*/
......@@ -80,6 +82,7 @@ public class TraceSegment {
this.spans = new LinkedList<AbstractTracingSpan>();
this.relatedGlobalTraces = new DistributedTraceIds();
this.relatedGlobalTraces.append(new NewDistributedTraceId());
this.createTime = System.currentTimeMillis();
}
/**
......@@ -195,4 +198,8 @@ public class TraceSegment {
public int getApplicationInstanceId() {
return RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID;
}
public long createTime() {
return this.createTime;
}
}
......@@ -91,6 +91,10 @@ public enum EndpointNameDictionary {
}
}
public void clear() {
endpointDictionary.clear();
}
private class OperationNameKey {
private int serviceId;
private String endpointName;
......
......@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.skywalking.apm.network.common.KeyIntValuePair;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.apm.network.register.v2.NetAddressMapping;
import org.apache.skywalking.apm.network.register.v2.NetAddresses;
import org.apache.skywalking.apm.network.register.v2.RegisterGrpc;
......@@ -38,15 +37,15 @@ import static org.apache.skywalking.apm.agent.core.conf.Config.Dictionary.SERVIC
*/
public enum NetworkAddressDictionary {
INSTANCE;
private Map<String, Integer> applicationDictionary = new ConcurrentHashMap<String, Integer>();
private Map<String, Integer> serviceDictionary = new ConcurrentHashMap<String, Integer>();
private Set<String> unRegisterServices = new ConcurrentSet<String>();
public PossibleFound find(String networkAddress) {
Integer applicationId = applicationDictionary.get(networkAddress);
Integer applicationId = serviceDictionary.get(networkAddress);
if (applicationId != null) {
return new Found(applicationId);
} else {
if (applicationDictionary.size() + unRegisterServices.size() < SERVICE_CODE_BUFFER_SIZE) {
if (serviceDictionary.size() + unRegisterServices.size() < SERVICE_CODE_BUFFER_SIZE) {
unRegisterServices.add(networkAddress);
}
return new NotFound();
......@@ -61,9 +60,13 @@ public enum NetworkAddressDictionary {
if (networkAddressMappings.getAddressIdsCount() > 0) {
for (KeyIntValuePair keyWithIntegerValue : networkAddressMappings.getAddressIdsList()) {
unRegisterServices.remove(keyWithIntegerValue.getKey());
applicationDictionary.put(keyWithIntegerValue.getKey(), keyWithIntegerValue.getValue());
serviceDictionary.put(keyWithIntegerValue.getKey(), keyWithIntegerValue.getValue());
}
}
}
}
public void clear() {
this.serviceDictionary.clear();
}
}
......@@ -28,6 +28,7 @@ import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.commands.CommandService;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
......@@ -40,6 +41,7 @@ import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.language.agent.JVMMetric;
import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricCollection;
import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricReportServiceGrpc;
......@@ -61,7 +63,7 @@ public class JVMService implements BootService, Runnable {
@Override
public void prepare() throws Throwable {
queue = new LinkedBlockingQueue(Config.Jvm.BUFFER_SIZE);
queue = new LinkedBlockingQueue<JVMMetric>(Config.Jvm.BUFFER_SIZE);
sender = new Sender();
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(sender);
}
......@@ -138,7 +140,8 @@ public class JVMService implements BootService, Runnable {
if (buffer.size() > 0) {
builder.addAllMetrics(buffer);
builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
stub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(builder.build());
Commands commands = stub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(builder.build());
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
} catch (Throwable t) {
logger.error(t, "send JVM metrics to Collector fail.");
......
......@@ -28,6 +28,7 @@ import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.commands.CommandService;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
......@@ -36,6 +37,7 @@ import org.apache.skywalking.apm.agent.core.dictionary.NetworkAddressDictionary;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.os.OSUtil;
import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.common.KeyIntValuePair;
import org.apache.skywalking.apm.network.register.v2.RegisterGrpc;
import org.apache.skywalking.apm.network.register.v2.Service;
......@@ -61,6 +63,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
private volatile RegisterGrpc.RegisterBlockingStub registerBlockingStub;
private volatile ServiceInstancePingGrpc.ServiceInstancePingBlockingStub serviceInstancePingStub;
private volatile ScheduledFuture<?> applicationRegisterFuture;
private volatile long coolDownStartTime = -1;
@Override
public void statusChanged(GRPCChannelStatus status) {
......@@ -107,6 +110,18 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
@Override
public void run() {
logger.debug("ServiceAndEndpointRegisterClient running, status:{}.", status);
if (coolDownStartTime > 0) {
final long coolDownDurationInMillis = TimeUnit.MINUTES.toMillis(Config.Agent.COOL_DOWN_THRESHOLD);
if (System.currentTimeMillis() - coolDownStartTime < coolDownDurationInMillis) {
logger.warn("The agent is cooling down, won't register itself");
return;
} else {
logger.warn("The agent is re-registering itself to backend");
}
}
coolDownStartTime = -1;
boolean shouldTry = true;
while (GRPCChannelStatus.CONNECTED.equals(status) && shouldTry) {
shouldTry = false;
......@@ -142,11 +157,13 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
int serviceInstanceId = serviceInstance.getValue();
if (serviceInstanceId != DictionaryUtil.nullValue()) {
RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID = serviceInstanceId;
RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME = System.currentTimeMillis();
}
}
}
} else {
serviceInstancePingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doPing(ServiceInstancePingPkg.newBuilder()
final Commands commands = serviceInstancePingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
.doPing(ServiceInstancePingPkg.newBuilder()
.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID)
.setTime(System.currentTimeMillis())
.setServiceInstanceUUID(INSTANCE_UUID)
......@@ -154,6 +171,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
}
}
......@@ -163,4 +181,8 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
}
}
}
public void coolDown() {
this.coolDownStartTime = System.currentTimeMillis();
}
}
......@@ -24,6 +24,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.*;
import org.apache.skywalking.apm.agent.core.commands.CommandService;
import org.apache.skywalking.apm.agent.core.context.*;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.logging.api.*;
......@@ -90,7 +91,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
@Override
......
......@@ -23,3 +23,5 @@ org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager
org.apache.skywalking.apm.agent.core.jvm.JVMService
org.apache.skywalking.apm.agent.core.remote.ServiceAndEndpointRegisterClient
org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
org.apache.skywalking.apm.agent.core.commands.CommandService
org.apache.skywalking.apm.agent.core.commands.CommandExecutorService
......@@ -55,7 +55,7 @@ public class ServiceManagerTest {
public void testServiceDependencies() throws Exception {
HashMap<Class, BootService> registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices");
assertThat(registryService.size(), is(7));
assertThat(registryService.size(), is(9));
assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class));
assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class));
......
......@@ -9,7 +9,7 @@ of starting backend.
## application.yml
The core concept behind this setting file is, SkyWalking collector is based on pure modulization design.
The core concept behind this setting file is, SkyWalking collector is based on pure modularization design.
End user can switch or assemble the collector features by their own requirements.
So, in `application.yml`, there are three levels.
......@@ -32,7 +32,7 @@ core:
1. `restHost`, `restPort`, ... `gRPCHost` are all setting items of the implementor.
At the same time, modules includes required and optional, the required modules provide the skeleton of backend,
even modulization supported pluggable, remove those modules are meanless. We highly recommend you don't try to
even modularization supported pluggable, remove those modules are meaningless. We highly recommend you don't try to
change APIs of those modules, unless you understand SkyWalking project and its codes very well.
List the required modules here
......@@ -88,6 +88,19 @@ or 3rd party configuration management system.
OAP backend cluster itself underlying is a distributed streaming process system. For helping the Ops team,
we provide the telemetry for OAP backend itself. Follow [document](backend-telemetry.md) to use it.
## Agent hot reboot trigger mechanism in OAP server upgrade
**IMPORTANT**: Agent hot reboot requires both of the OAP nodes and agents to be version 6.3.0 or higher.
The reboot procedure works by the heartbeat between OAP nodes and the agents:
1. The agent sends a heartbeat package to the OAP server;
1. The OAP server just restarted and found no metadata for this agent, then it sends a reset command to the specific agent;
1. The agent received the reset command and re-register itself to the OAP node.
The agent reboot mechanism is not designed for every scenarios where agent need to reboot, but only the scenario where
the backend servers are to be upgraded with all storage data deleted/erased, therefore, there're some noteworthy limitations:
1. Partially deleting the storage data may not work as expected, you **MUST** delete all the storage data.
1. Set an appropriate threshold of config `agent.cool_down_threshold` to wait before the agents re-registering themselves to backend
to avoid "dirty data", see [`agent.cool_down_threshold`](../service-agent/java-agent/README.md#table-of-agent-configuration-properties)
for more detail.
## FAQs
#### When and why do we need to set Timezone?
......
......@@ -71,10 +71,11 @@ property key | Description | Default |
`agent.active_v2_header`|Active V2 header in default.|`true`|
`agent.instance_uuid` |Instance uuid is the identity of an instance, skywalking treat same instance uuid as one instance.if empty, skywalking agent will generate an 32-bit uuid. |`""`|
`agent.cause_exception_depth`|How depth the agent goes, when log all cause exceptions.|`5`|
`agent.active_v1_header `|Deactive V1 header in default.|`false`|
`agent.active_v1_header `|Deactivate V1 header in default.|`false`|
`agent.cool_down_threshold `|How long should the agent wait (in minute) before re-registering to the OAP server after receiving reset command.|`10`|
`collector.grpc_channel_check_interval`|grpc channel status check interval.|`30`|
`collector.app_and_service_register_check_interval`|application and service registry check interval.|`3`|
`collector.backend_service`|Collector skywalking trace receiver service addresses.|`127.0.0.1:11800`|
`collector.backend_service`|Collector SkyWalking trace receiver service addresses.|`127.0.0.1:11800`|
`logging.level`|The log level. Default is debug.|`DEBUG`|
`logging.file_name`|Log file name.|`skywalking-api.log`|
`logging.dir`|Log files directory. Default is blank string, means, use "system.out" to output logs.|`""`|
......@@ -116,7 +117,7 @@ All bootstrap plugins are optional, due to unexpected risk. Bootstrap plugins ar
* Use gRPC TLS to link backend. See [open TLS](TLS.md)
* Monitor a big cluster by different SkyWalking services. Use [Namespace](Namespace.md) to isolate the context propagation.
* Set client [token](Token-auth.md) if backend open [token authentication](../../backend/backend-token-auth.md).
* Application Toolkit, are a collection of libraries, provided by skywalking APM. Using them, you have a bridge between your application and skywalking APM agent.
* Application Toolkit, are a collection of libraries, provided by SkyWalking APM. Using them, you have a bridge between your application and SkyWalking APM agent.
* If you want to use OpenTracing Java APIs, try [SkyWalking OpenTracing compatible tracer](Opentracing.md). More details you could find at http://opentracing.io
* If you want to print trace context(e.g. traceId) in your logs, choose the log frameworks, [log4j](Application-toolkit-log4j-1.x.md),
[log4j2](Application-toolkit-log4j-2.x.md), [logback](Application-toolkit-logback-1.x.md)
......
......@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.command.CommandService;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
......@@ -78,6 +79,8 @@ public class CoreModule extends ModuleDefine {
addCacheService(classes);
addQueryService(classes);
classes.add(CommandService.class);
return classes.toArray(new Class[] {});
}
......
......@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProces
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.command.CommandService;
import org.apache.skywalking.oap.server.core.config.*;
import org.apache.skywalking.oap.server.core.oal.rt.*;
import org.apache.skywalking.oap.server.core.query.*;
......@@ -155,6 +156,8 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
this.registerServiceImplementation(CommandService.class, new CommandService(getManager()));
annotationScan.registerListener(streamAnnotationListener);
this.remoteClientManager = new RemoteClientManager(getManager());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.command;
import org.apache.skywalking.apm.network.trace.component.command.ServiceResetCommand;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
import java.util.UUID;
/**
* @author kezhenxu94
*/
public class CommandService implements Service {
private final ModuleManager moduleManager;
public CommandService(final ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
public ServiceResetCommand newResetCommand(final int serviceInstanceId, final long time, final String serviceInstanceUUID) {
final String serialNumber = generateSerialNumber(serviceInstanceId, time, serviceInstanceUUID);
return new ServiceResetCommand(serialNumber);
}
private String generateSerialNumber(final int serviceInstanceId, final long time, final String serviceInstanceUUID) {
return UUID.randomUUID().toString(); // Simply generate a uuid without taking care of the parameters
}
}
......@@ -29,6 +29,8 @@ import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.slf4j.*;
import static java.util.Objects.isNull;
/**
* @author peng-yongsheng
*/
......@@ -56,6 +58,10 @@ class TopologyBuilder {
ServiceInventory source = serviceInventoryCache.get(clientCall.getSource());
ServiceInventory target = serviceInventoryCache.get(clientCall.getTarget());
if (isNull(source) || isNull(target)) {
continue;
}
if (target.getMappingServiceId() != Const.NONE) {
continue;
}
......@@ -94,6 +100,10 @@ class TopologyBuilder {
ServiceInventory source = serviceInventoryCache.get(serverCall.getSource());
ServiceInventory target = serviceInventoryCache.get(serverCall.getTarget());
if (isNull(source) || isNull(target)) {
continue;
}
if (source.getSequence() == Const.USER_SERVICE_ID) {
if (!nodes.containsKey(source.getSequence())) {
Node visualUserNode = new Node();
......
......@@ -30,6 +30,7 @@ import org.apache.skywalking.oap.server.core.query.entity.RefType;
import org.apache.skywalking.oap.server.core.query.entity.Trace;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.Service;
......@@ -177,8 +178,12 @@ public class TraceQueryService implements Service {
}
span.setEndpointName(endpointName);
String serviceCode = getServiceInventoryCache().get(serviceId).getName();
span.setServiceCode(serviceCode);
final ServiceInventory serviceInventory = getServiceInventoryCache().get(serviceId);
if (serviceInventory != null) {
span.setServiceCode(serviceInventory.getName());
} else {
span.setServiceCode("unknown");
}
if (spanObject.getComponentId() == 0) {
span.setComponent(spanObject.getComponent());
......@@ -282,8 +287,12 @@ public class TraceQueryService implements Service {
}
span.setEndpointName(endpointName);
String serviceCode = getServiceInventoryCache().get(serviceId).getName();
span.setServiceCode(serviceCode);
final ServiceInventory serviceInventory = getServiceInventoryCache().get(serviceId);
if (serviceInventory != null) {
span.setServiceCode(serviceInventory.getName());
} else {
span.setServiceCode("unknown");
}
if (spanObject.getComponentId() == 0) {
span.setComponent(spanObject.getComponent());
......
......@@ -55,7 +55,7 @@ public class CLRSourceDispatcher {
if (Objects.nonNull(serviceInstanceInventory)) {
serviceId = serviceInstanceInventory.getServiceId();
} else {
logger.warn("Can't found service by service instance id from cache, service instance id is: {}", serviceInstanceId);
logger.warn("Can't find service by service instance id from cache, service instance id is: {}", serviceInstanceId);
return;
}
......
......@@ -39,7 +39,7 @@ public class JVMSourceDispatcher {
public JVMSourceDispatcher(ModuleManager moduleManager) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
}
void sendMetric(int serviceInstanceId, long minuteTimeBucket, JVMMetric metrics) {
......@@ -48,7 +48,7 @@ public class JVMSourceDispatcher {
if (Objects.nonNull(serviceInstanceInventory)) {
serviceId = serviceInstanceInventory.getServiceId();
} else {
logger.warn("Can't found service by service instance id from cache, service instance id is: {}", serviceInstanceId);
logger.warn("Can't find service by service instance id from cache, service instance id is: {}", serviceInstanceId);
return;
}
......
......@@ -19,12 +19,14 @@
package org.apache.skywalking.oap.server.receiver.register.provider.handler.v6.grpc;
import io.grpc.stub.StreamObserver;
import java.util.Objects;
import org.apache.skywalking.apm.network.common.Command;
import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.register.v2.ServiceInstancePingGrpc;
import org.apache.skywalking.apm.network.register.v2.ServiceInstancePingPkg;
import org.apache.skywalking.apm.network.trace.component.command.ServiceResetCommand;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.command.CommandService;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
......@@ -33,6 +35,8 @@ import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
/**
* @author wusheng
*/
......@@ -42,11 +46,13 @@ public class ServiceInstancePingServiceHandler extends ServiceInstancePingGrpc.S
private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
private final IServiceInventoryRegister serviceInventoryRegister;
private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
private final CommandService commandService;
public ServiceInstancePingServiceHandler(ModuleManager moduleManager) {
this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class);
this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInstanceInventoryRegister.class);
this.commandService = moduleManager.find(CoreModule.NAME).provider().getService(CommandService.class);
}
@Override public void doPing(ServiceInstancePingPkg request, StreamObserver<Commands> responseObserver) {
......@@ -57,11 +63,17 @@ public class ServiceInstancePingServiceHandler extends ServiceInstancePingGrpc.S
ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId);
if (Objects.nonNull(serviceInstanceInventory)) {
serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
responseObserver.onNext(Commands.getDefaultInstance());
} else {
logger.warn("Can't found service by service instance id from cache, service instance id is: {}", serviceInstanceId);
logger.warn("Can't find service by service instance id from cache," +
" service instance id is: {}, will send a reset command to agent side", serviceInstanceId);
final ServiceResetCommand resetCommand = commandService.newResetCommand(request.getServiceInstanceId(), request.getTime(), request.getServiceInstanceUUID());
final Command command = resetCommand.serialize().build();
final Commands nextCommands = Commands.newBuilder().addCommands(command).build();
responseObserver.onNext(nextCommands);
}
responseObserver.onNext(Commands.getDefaultInstance());
responseObserver.onCompleted();
}
}
......@@ -23,6 +23,8 @@ import java.util.*;
import lombok.Setter;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
......@@ -48,6 +50,7 @@ public class SegmentParseV2 {
private final SegmentParserListenerManager listenerManager;
private final SegmentCoreInfo segmentCoreInfo;
private final TraceServiceModuleConfig config;
private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
@Setter private SegmentStandardizationWorker standardizationWorker;
private volatile static CounterMetrics TRACE_BUFFER_FILE_RETRY;
private volatile static CounterMetrics TRACE_BUFFER_FILE_OUT;
......@@ -72,6 +75,8 @@ public class SegmentParseV2 {
TRACE_PARSE_ERROR = metricsCreator.createCounter("v6_trace_parse_error", "The number of trace segment out of the buffer file",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
}
this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
}
public boolean parse(BufferData<UpstreamSegment> bufferData, SegmentSource source) {
......@@ -87,6 +92,13 @@ public class SegmentParseV2 {
}
SegmentObject segmentObject = bufferData.getV2Segment();
// Recheck in case that the segment comes from file buffer
final int serviceInstanceId = segmentObject.getServiceInstanceId();
if (serviceInstanceInventoryCache.get(serviceInstanceId) == null) {
logger.warn("Cannot recognize service instance id [{}] from cache, segment will be ignored", serviceInstanceId);
return true; // to mark it "completed" thus won't be retried
}
SegmentDecorator segmentDecorator = new SegmentDecorator(segmentObject);
if (!preBuild(traceIds, segmentDecorator)) {
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apache-skywalking-e2e</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>e2e-agent-reboot</artifactId>
<properties>
<e2e.container.version>1.1</e2e.container.version>
<e2e.container.name.prefix>skywalking-e2e-container-${build.id}-agent-reboot</e2e.container.name.prefix>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>e2e-base</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<configuration>
<executable>true</executable>
<addResources>true</addResources>
<excludeDevtools>true</excludeDevtools>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<containerNamePattern>%a-%t-%i</containerNamePattern>
<images>
<image>
<name>skyapm/e2e-container:${e2e.container.version}</name>
<alias>${e2e.container.name.prefix}</alias>
<run>
<env>
<INSTRUMENTED_SERVICE>${project.build.finalName}.jar</INSTRUMENTED_SERVICE>
<INSTRUMENTED_SERVICE_OPTS>-Dskywalking.agent.cool_down_threshold=1</INSTRUMENTED_SERVICE_OPTS>
</env>
<ports>
<port>+webapp.host:webapp.port:8080</port>
<port>+client.host:client.port:9090</port>
<port>+reboot.host:reboot.port:9091</port>
</ports>
<volumes>
<bind>
<volume>
${project.basedir}/../../../dist-for-agent-reboot/apache-skywalking-apm-bin:/sw
</volume>
<volume>
${project.build.directory}:/home
</volume>
<volume>
${project.basedir}/src/docker/rc.d:/rc.d:ro
</volume>
</bind>
</volumes>
<wait>
<log>SkyWalking e2e container is ready for tests</log>
<time>2400000</time>
</wait>
</run>
</image>
</images>
</configuration>
</plugin>
<!-- set the system properties that can be used in test codes -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<sw.webapp.host>
${webapp.host}
</sw.webapp.host>
<sw.webapp.port>
${webapp.port}
</sw.webapp.port>
<client.host>
${client.host}
</client.host>
<client.port>
${client.port}
</client.port>
<reboot.host>
${reboot.host}
</reboot.host>
<reboot.port>
${reboot.port}
</reboot.port>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
# Licensed to the SkyAPM under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env bash
# in order to make it easier to restart the OAP (by executing the restart script) from outside (container),
# we'll expose a tcp port and whenever we receive a message on that port, we'll restart the OAP server,
# socat will help on this to execute the script when receiving a message on that port
apk update && apk add socat
# socat will execute the command in a new shell, thus won't catch the original functions' declarations
# so we'll put the restart command in a script file
echo '
ps -ef | grep -v grep | grep oap.logDir | awk '"'"'{print $1}'"'"' | xargs --no-run-if-empty kill -9
rm -rf /tmp/oap/trace_buffer1
rm -rf /tmp/oap/mesh_buffer1
echo "restarting OAP server..." \
&& SW_RECEIVER_BUFFER_PATH=/tmp/oap/trace_buffer1 \
&& SW_SERVICE_MESH_BUFFER_PATH=/tmp/oap/mesh_buffer1 \
&& cd /sw \
&& bash bin/oapService.sh > /dev/null 2>&1 &
' > /usr/bin/restart_oap
sync
chmod +x /usr/bin/restart_oap
sync
# Licensed to the SkyAPM under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env bash
# set up the tcp server to listen for the restart command
socat -u tcp-l:9091,fork system:'bash restart_oap' &
echo 'starting OAP server...' \
&& SW_STORAGE_ES_BULK_ACTIONS=1 \
&& SW_STORAGE_ES_FLUSH_INTERVAL=1 \
&& SW_RECEIVER_BUFFER_PATH=/tmp/oap/trace_buffer1 \
&& SW_SERVICE_MESH_BUFFER_PATH=/tmp/oap/mesh_buffer1 \
&& start_oap 'init'
echo 'starting Web app...' \
&& start_webapp '0.0.0.0' 8080
echo 'starting instrumented services...' \
&& start_instrumented_services
check_tcp 127.0.0.1 \
9090 \
60 \
10 \
"waiting for the instrumented service to be ready"
if [[ $? -ne 0 ]]; then
echo "instrumented service failed to start in 30 * 10 seconds: "
cat ${SERVICE_LOG}/*
exit 1
fi
echo "SkyWalking e2e container is ready for tests"
tail -f ${OAP_LOG_DIR}/* \
${WEBAPP_LOG_DIR}/* \
${SERVICE_LOG}/* \
${ES_HOME}/logs/stdout.log
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.e2e.sample.client;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
/**
* @author kezhenxu94
*/
@EnableJpaRepositories
@SpringBootApplication
public class SampleClientApplication {
public static void main(String[] args) {
SpringApplication.run(SampleClientApplication.class, args);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.e2e.sample.client;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author kezhenxu94
*/
@RestController
@RequestMapping("/e2e")
public class TestController {
private final UserRepo userRepo;
public TestController(final UserRepo userRepo) {
this.userRepo = userRepo;
}
@GetMapping("/health-check")
public String hello() {
return "healthy";
}
@PostMapping("/users")
public User createAuthor(@RequestBody final User user) {
return userRepo.save(user);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.e2e.sample.client;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
/**
* @author kezhenxu94
*/
@Entity
public class User {
public User() {
}
@Id
@GeneratedValue
private Long id;
@Column
private String name;
public Long getId() {
return id;
}
public void setId(final Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.e2e.sample.client;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* @author kezhenxu94
*/
public interface UserRepo extends JpaRepository<User, Long> {
}
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
server:
port: 9090
spring:
main:
banner-mode: 'off'
datasource:
url: jdbc:h2:mem:testdb
driver-class-name: org.h2.Driver
data-username: sa
password: sa
platform: org.hibernate.dialect.H2Dialect
jpa:
generate-ddl: true
hibernate:
ddl-auto: create-drop
properties:
hibernate.format_sql: true
show-sql: true
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.e2e;
import org.apache.skywalking.e2e.metrics.AtLeastOneOfMetricsMatcher;
import org.apache.skywalking.e2e.metrics.Metrics;
import org.apache.skywalking.e2e.metrics.MetricsQuery;
import org.apache.skywalking.e2e.metrics.MetricsValueMatcher;
import org.apache.skywalking.e2e.service.Service;
import org.apache.skywalking.e2e.service.ServicesMatcher;
import org.apache.skywalking.e2e.service.ServicesQuery;
import org.apache.skywalking.e2e.service.endpoint.Endpoint;
import org.apache.skywalking.e2e.service.endpoint.EndpointQuery;
import org.apache.skywalking.e2e.service.endpoint.Endpoints;
import org.apache.skywalking.e2e.service.endpoint.EndpointsMatcher;
import org.apache.skywalking.e2e.service.instance.Instance;
import org.apache.skywalking.e2e.service.instance.Instances;
import org.apache.skywalking.e2e.service.instance.InstancesMatcher;
import org.apache.skywalking.e2e.service.instance.InstancesQuery;
import org.apache.skywalking.e2e.topo.TopoData;
import org.apache.skywalking.e2e.topo.TopoMatcher;
import org.apache.skywalking.e2e.topo.TopoQuery;
import org.apache.skywalking.e2e.trace.Trace;
import org.apache.skywalking.e2e.trace.TracesMatcher;
import org.apache.skywalking.e2e.trace.TracesQuery;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.web.client.RestTemplate;
import org.yaml.snakeyaml.Yaml;
import java.io.InputStream;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.skywalking.e2e.metrics.MetricsQuery.ALL_ENDPOINT_METRICS;
import static org.apache.skywalking.e2e.metrics.MetricsQuery.ALL_INSTANCE_METRICS;
import static org.apache.skywalking.e2e.metrics.MetricsQuery.ALL_SERVICE_METRICS;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author kezhenxu94
*/
@RunWith(SpringJUnit4ClassRunner.class)
public class AgentRebootITCase {
private static final Logger LOGGER = LoggerFactory.getLogger(AgentRebootITCase.class);
private final RestTemplate restTemplate = new RestTemplate();
private final int retryInterval = 30000;
private SimpleQueryClient queryClient;
private String instrumentedServiceUrl;
@Before
public void setUp() {
final String swWebappHost = System.getProperty("sw.webapp.host", "127.0.0.1");
final String swWebappPort = System.getProperty("sw.webapp.port", "32783");
final String instrumentedServiceHost = System.getProperty("client.host", "127.0.0.1");
final String instrumentedServicePort = System.getProperty("client.port", "32782");
queryClient = new SimpleQueryClient(swWebappHost, swWebappPort);
instrumentedServiceUrl = "http://" + instrumentedServiceHost + ":" + instrumentedServicePort;
}
@Test(timeout = 1200000)
@DirtiesContext
public void verify() throws Exception {
doVerify();
LOGGER.info("Verifications passed before restarting");
restartOAP();
waitOAPStartUp();
assertDataErased();
LOGGER.info("Verifying after restarting successfully");
doVerify();
LOGGER.info("Verifications passed after restarting");
}
private void doVerify() throws InterruptedException {
final LocalDateTime minutesAgo = LocalDateTime.now(ZoneOffset.UTC);
while (true) {
try {
final Map<String, String> user = new HashMap<>();
user.put("name", "SkyWalking");
final ResponseEntity<String> responseEntity = restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
final List<Trace> traces = queryClient.traces(
new TracesQuery()
.start(minutesAgo)
.end(LocalDateTime.now())
.orderByDuration()
);
if (!traces.isEmpty()) {
break;
}
Thread.sleep(10000L);
} catch (Exception ignored) {
}
}
doRetryableVerification(() -> {
try {
verifyTraces(minutesAgo);
} catch (Exception e) {
LOGGER.warn(e.getMessage(), e);
}
});
doRetryableVerification(() -> {
try {
verifyServices(minutesAgo);
} catch (Exception e) {
LOGGER.warn(e.getMessage(), e);
}
});
doRetryableVerification(() -> {
try {
verifyTopo(minutesAgo);
} catch (Exception e) {
LOGGER.warn(e.getMessage(), e);
}
});
}
private void verifyTopo(LocalDateTime minutesAgo) throws Exception {
final LocalDateTime now = LocalDateTime.now(ZoneOffset.UTC);
final TopoData topoData = queryClient.topo(
new TopoQuery()
.stepByMinute()
.start(minutesAgo.minusDays(1))
.end(now)
);
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.SampleVerificationITCase.topo.yml").getInputStream();
final TopoMatcher topoMatcher = new Yaml().loadAs(expectedInputStream, TopoMatcher.class);
topoMatcher.verify(topoData);
}
private void verifyServices(LocalDateTime minutesAgo) throws Exception {
final LocalDateTime now = LocalDateTime.now(ZoneOffset.UTC);
final List<Service> services = queryClient.services(
new ServicesQuery()
.start(minutesAgo)
.end(now)
);
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.SampleVerificationITCase.services.yml").getInputStream();
final ServicesMatcher servicesMatcher = new Yaml().loadAs(expectedInputStream, ServicesMatcher.class);
servicesMatcher.verify(services);
for (Service service : services) {
LOGGER.info("verifying service instances: {}", service);
verifyServiceMetrics(service);
Instances instances = verifyServiceInstances(minutesAgo, now, service);
verifyInstancesMetrics(instances);
Endpoints endpoints = verifyServiceEndpoints(minutesAgo, now, service);
verifyEndpointsMetrics(endpoints);
}
}
private Instances verifyServiceInstances(LocalDateTime minutesAgo, LocalDateTime now, Service service) throws Exception {
InputStream expectedInputStream;
Instances instances = queryClient.instances(
new InstancesQuery()
.serviceId(service.getKey())
.start(minutesAgo)
.end(now)
);
expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.SampleVerificationITCase.instances.yml").getInputStream();
final InstancesMatcher instancesMatcher = new Yaml().loadAs(expectedInputStream, InstancesMatcher.class);
instancesMatcher.verify(instances);
return instances;
}
private Endpoints verifyServiceEndpoints(LocalDateTime minutesAgo, LocalDateTime now, Service service) throws Exception {
Endpoints instances = queryClient.endpoints(
new EndpointQuery().serviceId(service.getKey())
);
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.SampleVerificationITCase.endpoints.yml").getInputStream();
final EndpointsMatcher endpointsMatcher = new Yaml().loadAs(expectedInputStream, EndpointsMatcher.class);
endpointsMatcher.verify(instances);
return instances;
}
private void verifyInstancesMetrics(Instances instances) throws Exception {
for (Instance instance : instances.getInstances()) {
for (String metricsName : ALL_INSTANCE_METRICS) {
LOGGER.info("verifying service instance response time: {}", instance);
final Metrics instanceRespTime = queryClient.metrics(
new MetricsQuery()
.stepByMinute()
.metricsName(metricsName)
.id(instance.getKey())
);
AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher();
MetricsValueMatcher greaterThanZero = new MetricsValueMatcher();
greaterThanZero.setValue("gt 0");
instanceRespTimeMatcher.setValue(greaterThanZero);
instanceRespTimeMatcher.verify(instanceRespTime);
LOGGER.info("{}: {}", metricsName, instanceRespTime);
}
}
}
private void verifyEndpointsMetrics(Endpoints endpoints) throws Exception {
for (Endpoint endpoint : endpoints.getEndpoints()) {
if (!endpoint.getLabel().equals("/e2e/users")) {
continue;
}
for (String metricName : ALL_ENDPOINT_METRICS) {
LOGGER.info("verifying endpoint {}, metrics: {}", endpoint, metricName);
final Metrics metrics = queryClient.metrics(
new MetricsQuery()
.stepByMinute()
.metricsName(metricName)
.id(endpoint.getKey())
);
AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher();
MetricsValueMatcher greaterThanZero = new MetricsValueMatcher();
greaterThanZero.setValue("gt 0");
instanceRespTimeMatcher.setValue(greaterThanZero);
instanceRespTimeMatcher.verify(metrics);
LOGGER.info("metrics: {}", metrics);
}
}
}
private void verifyServiceMetrics(Service service) throws Exception {
for (String metricName : ALL_SERVICE_METRICS) {
LOGGER.info("verifying service {}, metrics: {}", service, metricName);
final Metrics instanceRespTime = queryClient.metrics(
new MetricsQuery()
.stepByMinute()
.metricsName(metricName)
.id(service.getKey())
);
AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher();
MetricsValueMatcher greaterThanZero = new MetricsValueMatcher();
greaterThanZero.setValue("gt 0");
instanceRespTimeMatcher.setValue(greaterThanZero);
instanceRespTimeMatcher.verify(instanceRespTime);
LOGGER.info("instanceRespTime: {}", instanceRespTime);
}
}
private void verifyTraces(LocalDateTime minutesAgo) throws Exception {
final LocalDateTime now = LocalDateTime.now(ZoneOffset.UTC);
final List<Trace> traces = queryClient.traces(
new TracesQuery()
.start(minutesAgo)
.end(now)
.orderByDuration()
);
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.SampleVerificationITCase.traces.yml").getInputStream();
final TracesMatcher tracesMatcher = new Yaml().loadAs(expectedInputStream, TracesMatcher.class);
tracesMatcher.verifyLoosely(traces);
}
private void doRetryableVerification(Runnable runnable) throws InterruptedException {
while (true) {
try {
runnable.run();
break;
} catch (Throwable ignored) {
Thread.sleep(retryInterval);
}
}
}
private void restartOAP() {
final String rebootHost = System.getProperty("reboot.host", "127.0.0.1");
final String rebootPort = System.getProperty("reboot.port", "9091");
try {
final String url = "http://" + rebootHost + ":" + rebootPort;
LOGGER.info("restarting OAP: {}", url);
restTemplate.getForObject(url, String.class);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
private void waitOAPStartUp() {
for (int i = 0; ; i++) {
try {
queryClient.traces(
new TracesQuery()
.start(LocalDateTime.now())
.end(LocalDateTime.now())
.orderByDuration()
);
break;
} catch (Throwable e) {
LOGGER.info("OAP restart not ready, waited {} seconds, {}", i * 10, e.getMessage());
try {
Thread.sleep(10000L);
} catch (InterruptedException ignored) {
}
}
}
}
private void assertDataErased() throws Exception {
final LocalDateTime now = LocalDateTime.now(ZoneOffset.UTC);
final List<Trace> traces = queryClient.traces(
new TracesQuery()
.start(now.minusMinutes(10))
.end(now)
.orderByDuration()
);
assertThat(traces).isEmpty();
}
}
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1 health-check by docker-maven-plugin
# 1 drop table if exists, because we have `ddl-auto: create-drop`
# 1 drop sequence
# 1 create sequence
# 1 create table statement
endpoints:
- key: not null
label: /e2e/users
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1 health-check by docker-maven-plugin
# 1 drop table if exists, because we have `ddl-auto: create-drop`
# 1 drop sequence
# 1 create sequence
# 1 create table statement
instances:
- key: 2
label: not null
attributes:
- name: os_name
value: not null
- name: host_name
value: not null
- name: process_no
value: gt 0
- name: ipv4s
value: not null
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1 health-check by docker-maven-plugin
# 1 drop table if exists, because we have `ddl-auto: create-drop`
# 1 drop sequence
# 1 create sequence
# 1 create table statement
services:
- key: 2
label: "Your_ApplicationName"
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1 health-check by docker-maven-plugin
# 1 drop table if exists, because we have `ddl-auto: create-drop`
# 1 drop sequence
# 1 create sequence
# 1 create table statement
nodes:
- id: 1
name: User
type: USER
isReal: false
- id: 2
name: Your_ApplicationName
type: Tomcat
isReal: true
- id: 3
name: "localhost:-1"
type: H2
isReal: false
calls:
- id: 2_3
source: 2
detectPoints:
- CLIENT
target: 3
- id: 1_2
source: 1
detectPoints:
- SERVER
target: 2
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 1 health-check by docker-maven-plugin
# 1 drop table if exists, because we have `ddl-auto: create-drop`
# 1 drop sequence
# 1 create sequence
# 1 create table statement
traces:
- key: not null
endpointNames:
- /e2e/users
duration: ge 0
start: gt 0
isError: false
traceIds:
- not null
......@@ -153,7 +153,7 @@
<volumes>
<bind>
<volume>
../../../../dist/apache-skywalking-apm-bin:/sw
../../../../dist-for-cluster/apache-skywalking-apm-bin:/sw
</volume>
<volume>
../${service0.name}/target/${service0.name}-${project.version}.jar:/home/${service0.name}-${project.version}.jar
......
......@@ -92,7 +92,7 @@
<volumes>
<bind>
<volume>
${project.basedir}/../../../dist/apache-skywalking-apm-bin:/skywalking
${project.basedir}/../../../dist-for-single-node-service/apache-skywalking-apm-bin:/skywalking
</volume>
<volume>
${project.build.directory}:/home
......
......@@ -72,7 +72,6 @@ public class SampleVerificationITCase {
private static final Logger LOGGER = LoggerFactory.getLogger(SampleVerificationITCase.class);
private final RestTemplate restTemplate = new RestTemplate();
private final int retryTimes = 5;
private final int retryInterval = 30;
private SimpleQueryClient queryClient;
......@@ -93,14 +92,29 @@ public class SampleVerificationITCase {
public void verify() throws Exception {
final LocalDateTime minutesAgo = LocalDateTime.now(ZoneOffset.UTC);
final Map<String, String> user = new HashMap<>();
user.put("name", "SkyWalking");
final ResponseEntity<String> responseEntity = restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
while (true) {
try {
final Map<String, String> user = new HashMap<>();
user.put("name", "SkyWalking");
final ResponseEntity<String> responseEntity = restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
final List<Trace> traces = queryClient.traces(
new TracesQuery()
.start(minutesAgo)
.end(LocalDateTime.now())
.orderByDuration()
);
if (!traces.isEmpty()) {
break;
}
Thread.sleep(10000L);
} catch (Exception ignored) {
}
}
doRetryableVerification(() -> {
try {
......@@ -279,7 +293,7 @@ public class SampleVerificationITCase {
}
private void doRetryableVerification(Runnable runnable) throws InterruptedException {
for (int i = 0; i < retryTimes; i++) {
while (true) {
try {
runnable.run();
break;
......
......@@ -35,6 +35,7 @@
<module>e2e-base</module>
<module>e2e-single-service</module>
<module>e2e-cluster</module>
<module>e2e-agent-reboot</module>
</modules>
<properties>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册