提交 ccfef16c 编写于 作者: wu-sheng's avatar wu-sheng

Merge branch '6.0' of https://github.com/apache/incubator-skywalking into 6.0

......@@ -19,15 +19,14 @@
package org.apache.skywalking.oap.server.cluster.plugin.standalone;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.junit.Assert;
import org.junit.Test;
import org.junit.*;
public class StandaloneManagerTest {
@Test
public void test() {
StandaloneManager standaloneManager = new StandaloneManager();
RemoteInstance remote1 = new RemoteInstance();
RemoteInstance remote2 = new RemoteInstance();
RemoteInstance remote1 = new RemoteInstance("A", 100, true);
RemoteInstance remote2 = new RemoteInstance("B", 100, false);
standaloneManager.registerRemote(remote1);
Assert.assertEquals(remote1, standaloneManager.queryRemoteNodes().get(0));
......
......@@ -21,16 +21,9 @@ package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import java.io.IOException;
import java.util.List;
import org.apache.curator.test.TestingServer;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.junit.*;
/**
* @author peng-yongsheng
......@@ -59,9 +52,7 @@ public class ClusterModuleZookeeperProviderTestCase {
ClusterRegister moduleRegister = provider.getService(ClusterRegister.class);
ClusterNodesQuery clusterNodesQuery = provider.getService(ClusterNodesQuery.class);
RemoteInstance remoteInstance = new RemoteInstance();
remoteInstance.setHost("ProviderAHost");
remoteInstance.setPort(1000);
RemoteInstance remoteInstance = new RemoteInstance("ProviderAHost", 1000, true);
moduleRegister.registerRemote(remoteInstance);
......
......@@ -41,6 +41,11 @@
<artifactId>library-util</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-server</artifactId>
......@@ -57,4 +62,49 @@
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<!--
The version of protoc must match protobuf-java. If you don't depend on
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
......@@ -19,7 +19,11 @@
package org.apache.skywalking.oap.server.core;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
import org.apache.skywalking.oap.server.core.receiver.SourceReceiver;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
......@@ -38,6 +42,7 @@ public class CoreModule extends ModuleDefine {
List<Class> classes = new ArrayList<>();
addServerInterface(classes);
addReceiverInterface(classes);
addInsideService(classes);
return classes.toArray(new Class[] {});
}
......@@ -47,6 +52,13 @@ public class CoreModule extends ModuleDefine {
classes.add(JettyHandlerRegister.class);
}
private void addInsideService(List<Class> classes) {
classes.add(IndicatorMapper.class);
classes.add(WorkerMapper.class);
classes.add(RemoteClientManager.class);
classes.add(RemoteSenderService.class);
}
private void addReceiverInterface(List<Class> classes) {
classes.add(SourceReceiver.class);
}
......
......@@ -18,24 +18,18 @@
package org.apache.skywalking.oap.server.core;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.receiver.SourceReceiver;
import org.apache.skywalking.oap.server.core.receiver.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.core.analysis.indicator.define.*;
import org.apache.skywalking.oap.server.core.analysis.worker.define.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.receiver.*;
import org.apache.skywalking.oap.server.core.remote.*;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -47,17 +41,21 @@ public class CoreModuleProvider extends ModuleProvider {
private final CoreModuleConfig moduleConfig;
private GRPCServer grpcServer;
private JettyServer jettyServer;
private final IndicatorMapper indicatorMapper;
private final WorkerMapper workerMapper;
public CoreModuleProvider() {
super();
this.moduleConfig = new CoreModuleConfig();
this.indicatorMapper = new IndicatorMapper();
this.workerMapper = new WorkerMapper(getManager());
}
@Override public String name() {
return "default";
}
@Override public Class module() {
@Override public Class<? extends ModuleDefine> module() {
return CoreModule.class;
}
......@@ -75,11 +73,24 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer));
this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl());
this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl(getManager()));
this.registerServiceImplementation(IndicatorMapper.class, indicatorMapper);
this.registerServiceImplementation(WorkerMapper.class, workerMapper);
this.registerServiceImplementation(RemoteClientManager.class, new RemoteClientManager(getManager()));
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
}
@Override public void start() {
@Override public void start() throws ModuleStartException {
grpcServer.addHandler(new RemoteServiceHandler(getManager()));
try {
indicatorMapper.load();
workerMapper.load();
} catch (IndicatorDefineLoadException | WorkerDefineLoadException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
@Override public void notifyAfterCompleted() throws ModuleStartException {
......@@ -90,9 +101,7 @@ public class CoreModuleProvider extends ModuleProvider {
throw new ModuleStartException(e.getMessage(), e);
}
RemoteInstance gRPCServerInstance = new RemoteInstance();
gRPCServerInstance.setHost(moduleConfig.getGRPCHost());
gRPCServerInstance.setPort(moduleConfig.getGRPCPort());
RemoteInstance gRPCServerInstance = new RemoteInstance(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
this.getManager().find(ClusterModule.NAME).getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
}
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointDispatcher;
import org.apache.skywalking.oap.server.core.receiver.Scope;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
......@@ -32,9 +33,9 @@ public class DispatcherManager {
private Map<Scope, SourceDispatcher> dispatcherMap;
public DispatcherManager() {
public DispatcherManager(ModuleManager moduleManager) {
this.dispatcherMap = new HashMap<>();
this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher());
this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher(moduleManager));
}
public SourceDispatcher getDispatcher(Scope scope) {
......
......@@ -18,10 +18,12 @@
package org.apache.skywalking.oap.server.core.analysis.data;
import org.apache.skywalking.oap.server.core.remote.*;
/**
* @author peng-yongsheng
*/
public abstract class StreamData implements QueueData {
public abstract class StreamData implements QueueData, Serializable, Deserializable {
private EndOfBatchContext endOfBatchContext;
......
......@@ -18,18 +18,22 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
import org.apache.skywalking.oap.server.core.receiver.Endpoint;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public class EndpointDispatcher implements SourceDispatcher<Endpoint> {
private final EndpointLatencyAvgAggregator avgAggregator;
private final ModuleManager moduleManager;
private EndpointLatencyAvgAggregateWorker avgAggregator;
public EndpointDispatcher() {
this.avgAggregator = new EndpointLatencyAvgAggregator();
public EndpointDispatcher(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
@Override public void dispatch(Endpoint source) {
......@@ -37,7 +41,14 @@ public class EndpointDispatcher implements SourceDispatcher<Endpoint> {
}
private void avg(Endpoint source) {
EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator(source.getTimeBucket(), source.getId());
if (avgAggregator == null) {
WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
avgAggregator = (EndpointLatencyAvgAggregateWorker)workerMapper.findInstanceByClass(EndpointLatencyAvgAggregateWorker.class);
}
EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator();
indicator.setId(source.getId());
indicator.setTimeBucket(source.getTimeBucket());
indicator.combine(source.getLatency(), 1);
avgAggregator.in(indicator);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.endpoint;
import org.apache.skywalking.oap.server.core.analysis.worker.AbstractAggregatorWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class EndpointLatencyAvgAggregateWorker extends AbstractAggregatorWorker<EndpointLatencyAvgIndicator> {
private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregateWorker.class);
private final EndpointLatencyAvgRemoteWorker remoter;
public EndpointLatencyAvgAggregateWorker(ModuleManager moduleManager) {
super(moduleManager);
this.remoter = new EndpointLatencyAvgRemoteWorker(moduleManager);
}
@Override protected void onNext(EndpointLatencyAvgIndicator data) {
remoter.in(data);
}
}
......@@ -18,19 +18,16 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
public class EndpointLatencyAvgIndicator extends AvgIndicator {
private final int id;
public EndpointLatencyAvgIndicator(long timeBucket, int id) {
super(timeBucket);
this.id = id;
}
@Setter @Getter private int id;
@Override public int hashCode() {
int result = 17;
......@@ -55,4 +52,22 @@ public class EndpointLatencyAvgIndicator extends AvgIndicator {
return true;
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataIntegers(0, getId());
remoteBuilder.setDataIntegers(1, getCount());
remoteBuilder.setDataLongs(0, getTimeBucket());
remoteBuilder.setDataLongs(1, getSummation());
return remoteBuilder;
}
@Override public void deserialize(RemoteData remoteData) {
setId(remoteData.getDataIntegers(0));
setCount(remoteData.getDataIntegers(1));
setTimeBucket(remoteData.getDataLongs(0));
setSummation(remoteData.getDataLongs(1));
}
}
......@@ -18,17 +18,15 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
import org.apache.skywalking.oap.server.core.analysis.AbstractAggregator;
import org.slf4j.*;
import org.apache.skywalking.oap.server.core.analysis.worker.AbstractPersistentWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public class EndpointLatencyAvgAggregator extends AbstractAggregator<EndpointLatencyAvgIndicator> {
private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
@Override protected void onNext(EndpointLatencyAvgIndicator data) {
public class EndpointLatencyAvgPersistentWorker extends AbstractPersistentWorker<EndpointLatencyAvgIndicator> {
public EndpointLatencyAvgPersistentWorker(ModuleManager moduleManager) {
super(moduleManager);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.endpoint;
import org.apache.skywalking.oap.server.core.analysis.worker.AbstractRemoteWorker;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public class EndpointLatencyAvgRemoteWorker extends AbstractRemoteWorker<EndpointLatencyAvgIndicator> {
public EndpointLatencyAvgRemoteWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public Selector selector() {
return Selector.HashCode;
}
@Override public Class nextWorkerClass() {
return EndpointLatencyAvgPersistentWorker.class;
}
}
......@@ -18,28 +18,26 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
/**
* @author peng-yongsheng
*/
@IndicatorType
@IndicatorType(selector = Selector.HashCode)
public abstract class AvgIndicator extends Indicator {
private long summation;
private int count;
public AvgIndicator(long timeBucket) {
super(timeBucket);
}
@Getter @Setter private long summation;
@Getter @Setter private int count;
@Entrance
public void combine(@SourceFrom long summation, @ConstOne int count) {
public final void combine(@SourceFrom long summation, @ConstOne int count) {
this.summation += summation;
this.count += count;
}
@Override public void combine(Indicator indicator) {
@Override public final void combine(Indicator indicator) {
AvgIndicator avgIndicator = (AvgIndicator)indicator;
combine(avgIndicator.summation, avgIndicator.count);
}
......
......@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
import lombok.Getter;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
/**
......@@ -26,11 +26,7 @@ import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
*/
public abstract class Indicator extends StreamData {
@Getter private final long timeBucket;
public Indicator(long timeBucket) {
this.timeBucket = timeBucket;
}
@Getter @Setter private long timeBucket;
public abstract void combine(Indicator indicator);
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
import java.lang.annotation.*;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
/**
* @author peng-yongsheng
......@@ -26,4 +27,5 @@ import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.SOURCE)
public @interface IndicatorType {
Selector selector();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.indicator.define;
/**
* @author peng-yongsheng
*/
public class IndicatorDefineLoadException extends Exception {
public IndicatorDefineLoadException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.indicator.define;
import java.io.*;
import java.net.URL;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.library.module.Service;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class IndicatorMapper implements Service {
private static final Logger logger = LoggerFactory.getLogger(IndicatorMapper.class);
private int id = 0;
private final Map<Class<Indicator>, Integer> classKeyMapping;
private final Map<Integer, Class<Indicator>> idKeyMapping;
public IndicatorMapper() {
this.classKeyMapping = new HashMap<>();
this.idKeyMapping = new HashMap<>();
}
@SuppressWarnings(value = "unchecked")
public void load() throws IndicatorDefineLoadException {
try {
List<String> indicatorClasses = new LinkedList<>();
Enumeration<URL> urlEnumeration = this.getClass().getClassLoader().getResources("META-INF/defines/indicator.def");
while (urlEnumeration.hasMoreElements()) {
URL definitionFileURL = urlEnumeration.nextElement();
logger.info("Load indicator definition file url: {}", definitionFileURL.getPath());
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(definitionFileURL.openStream()));
Properties properties = new Properties();
properties.load(bufferedReader);
Enumeration defineItem = properties.propertyNames();
while (defineItem.hasMoreElements()) {
String fullNameClass = (String)defineItem.nextElement();
indicatorClasses.add(fullNameClass);
}
}
for (String indicatorClassName : indicatorClasses) {
Class<Indicator> indicatorClass = (Class<Indicator>)Class.forName(indicatorClassName);
id++;
classKeyMapping.put(indicatorClass, id);
idKeyMapping.put(id, indicatorClass);
}
} catch (IOException | ClassNotFoundException e) {
throw new IndicatorDefineLoadException(e.getMessage(), e);
}
}
public int findIdByClass(Class indicatorClass) {
return classKeyMapping.get(indicatorClass);
}
public Class<Indicator> findClassById(int id) {
return idKeyMapping.get(id);
}
}
......@@ -16,35 +16,36 @@
*
*/
package org.apache.skywalking.oap.server.core.analysis;
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public abstract class AbstractAggregator<INPUT extends Indicator> {
public abstract class AbstractAggregatorWorker<INPUT extends Indicator> extends Worker<INPUT> {
private static final Logger logger = LoggerFactory.getLogger(AbstractAggregator.class);
private static final Logger logger = LoggerFactory.getLogger(AbstractAggregatorWorker.class);
private final DataCarrier<INPUT> dataCarrier;
private final MergeDataCache<INPUT> mergeDataCache;
private int messageNum;
public AbstractAggregator() {
public AbstractAggregatorWorker(ModuleManager moduleManager) {
this.mergeDataCache = new MergeDataCache<>();
this.dataCarrier = new DataCarrier<>(1, 10000);
this.dataCarrier.consume(new AggregatorConsumer(this), 1);
}
public void in(INPUT message) {
message.setEndOfBatchContext(new EndOfBatchContext(false));
dataCarrier.produce(message);
@Override public final void in(INPUT input) {
input.setEndOfBatchContext(new EndOfBatchContext(false));
dataCarrier.produce(input);
}
private void onWork(INPUT message) {
......@@ -91,9 +92,9 @@ public abstract class AbstractAggregator<INPUT extends Indicator> {
private class AggregatorConsumer implements IConsumer<INPUT> {
private final AbstractAggregator<INPUT> aggregator;
private final AbstractAggregatorWorker<INPUT> aggregator;
private AggregatorConsumer(AbstractAggregator<INPUT> aggregator) {
private AggregatorConsumer(AbstractAggregatorWorker<INPUT> aggregator) {
this.aggregator = aggregator;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.worker;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public abstract class AbstractPersistentWorker<INPUT extends Indicator> extends Worker<INPUT> {
public AbstractPersistentWorker(ModuleManager moduleManager) {
}
@Override public final void in(INPUT input) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.worker;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public abstract class AbstractRemoteWorker<INPUT extends Indicator> extends Worker<INPUT> {
private static final Logger logger = LoggerFactory.getLogger(AbstractRemoteWorker.class);
private final ModuleManager moduleManager;
private RemoteSenderService remoteSender;
private WorkerMapper workerMapper;
public AbstractRemoteWorker(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
@Override public final void in(INPUT input) {
if (remoteSender == null) {
remoteSender = moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
}
if (workerMapper == null) {
workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
}
try {
int nextWorkerId = workerMapper.findIdByClass(nextWorkerClass());
remoteSender.send(nextWorkerId, input, selector());
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
public abstract Class nextWorkerClass();
public abstract Selector selector();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.worker;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
/**
* @author peng-yongsheng
*/
public abstract class Worker<INPUT extends Indicator> {
public abstract void in(INPUT input);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.worker.define;
/**
* @author peng-yongsheng
*/
public class WorkerDefineLoadException extends Exception {
public WorkerDefineLoadException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.worker.define;
import java.io.*;
import java.lang.reflect.Constructor;
import java.net.URL;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.worker.Worker;
import org.apache.skywalking.oap.server.library.module.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class WorkerMapper implements Service {
private static final Logger logger = LoggerFactory.getLogger(WorkerMapper.class);
private int id = 0;
private final ModuleManager moduleManager;
private final Map<Class<Worker>, Integer> classKeyMapping;
private final Map<Integer, Class<Worker>> idKeyMapping;
private final Map<Class<Worker>, Worker> classKeyInstanceMapping;
private final Map<Integer, Worker> idKeyInstanceMapping;
public WorkerMapper(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.classKeyMapping = new HashMap<>();
this.idKeyMapping = new HashMap<>();
this.classKeyInstanceMapping = new HashMap<>();
this.idKeyInstanceMapping = new HashMap<>();
}
@SuppressWarnings(value = "unchecked")
public void load() throws WorkerDefineLoadException {
try {
List<String> workerClasses = new LinkedList<>();
Enumeration<URL> urlEnumeration = this.getClass().getClassLoader().getResources("META-INF/defines/worker.def");
while (urlEnumeration.hasMoreElements()) {
URL definitionFileURL = urlEnumeration.nextElement();
logger.info("Load worker definition file url: {}", definitionFileURL.getPath());
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(definitionFileURL.openStream()));
Properties properties = new Properties();
properties.load(bufferedReader);
Enumeration defineItem = properties.propertyNames();
while (defineItem.hasMoreElements()) {
String fullNameClass = (String)defineItem.nextElement();
workerClasses.add(fullNameClass);
}
}
for (String workerClassName : workerClasses) {
Class<Worker> workerClass = (Class<Worker>)Class.forName(workerClassName);
id++;
classKeyMapping.put(workerClass, id);
idKeyMapping.put(id, workerClass);
Constructor<Worker> constructor = workerClass.getDeclaredConstructor(ModuleManager.class);
Worker worker = constructor.newInstance(moduleManager);
classKeyInstanceMapping.put(workerClass, worker);
idKeyInstanceMapping.put(id, worker);
}
} catch (Exception e) {
throw new WorkerDefineLoadException(e.getMessage(), e);
}
}
public int findIdByClass(Class workerClass) {
return classKeyMapping.get(workerClass);
}
public Class<Worker> findClassById(int id) {
return idKeyMapping.get(id);
}
public Worker findInstanceByClass(Class workerClass) {
return classKeyInstanceMapping.get(workerClass);
}
public Worker findInstanceById(int id) {
return idKeyInstanceMapping.get(id);
}
}
......@@ -19,48 +19,29 @@
package org.apache.skywalking.oap.server.core.cluster;
import java.util.Objects;
import lombok.*;
/**
* @author peng-yongsheng
*/
public class RemoteInstance {
public class RemoteInstance implements Comparable<RemoteInstance> {
private String host;
private int port;
private boolean self = false;
@Getter private final String host;
@Getter private final int port;
@Getter @Setter private boolean isSelf = false;
public RemoteInstance() {
}
public RemoteInstance(String host, int port, boolean self) {
public RemoteInstance(String host, int port, boolean isSelf) {
this.host = host;
this.port = port;
this.self = self;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
this.isSelf = isSelf;
}
public boolean isSelf() {
return self;
@Override public int compareTo(RemoteInstance o) {
return toString().compareTo(toString());
}
public void setSelf(boolean self) {
this.self = self;
@Override public String toString() {
return host + String.valueOf(port);
}
@Override public boolean equals(Object o) {
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.receiver;
import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
......@@ -27,8 +28,8 @@ public class SourceReceiverImpl implements SourceReceiver {
private final DispatcherManager dispatcherManager;
public SourceReceiverImpl() {
this.dispatcherManager = new DispatcherManager();
public SourceReceiverImpl(ModuleManager moduleManager) {
this.dispatcherManager = new DispatcherManager(moduleManager);
}
@Override public void receive(Source source) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
public interface Deserializable {
void deserialize(RemoteData remoteData);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.remote.client.*;
import org.apache.skywalking.oap.server.core.remote.selector.*;
import org.apache.skywalking.oap.server.library.module.*;
/**
* @author peng-yongsheng
*/
public class RemoteSenderService implements Service {
private final ModuleManager moduleManager;
private final HashCodeSelector hashCodeSelector;
private final ForeverFirstSelector foreverFirstSelector;
private final RollingSelector rollingSelector;
public RemoteSenderService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.hashCodeSelector = new HashCodeSelector();
this.foreverFirstSelector = new ForeverFirstSelector();
this.rollingSelector = new RollingSelector();
}
public void send(int nextWorkId, Indicator indicator, Selector selector) {
RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).getService(RemoteClientManager.class);
RemoteClient remoteClient;
switch (selector) {
case HashCode:
remoteClient = hashCodeSelector.select(clientManager.getRemoteClient(), indicator);
remoteClient.push(nextWorkId, indicator);
case Rolling:
remoteClient = rollingSelector.select(clientManager.getRemoteClient(), indicator);
remoteClient.push(nextWorkId, indicator);
case ForeverFirst:
remoteClient = foreverFirstSelector.select(clientManager.getRemoteClient(), indicator);
remoteClient.push(nextWorkId, indicator);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBase implements GRPCHandler {
private static final Logger logger = LoggerFactory.getLogger(RemoteServiceHandler.class);
private final IndicatorMapper indicatorMapper;
private final WorkerMapper workerMapper;
public RemoteServiceHandler(ModuleManager moduleManager) {
this.indicatorMapper = moduleManager.find(CoreModule.NAME).getService(IndicatorMapper.class);
this.workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
}
@Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
return new StreamObserver<RemoteMessage>() {
@Override public void onNext(RemoteMessage message) {
int indicatorId = message.getIndicatorId();
int nextWorkerId = message.getNextWorkerId();
RemoteData remoteData = message.getRemoteData();
Class<Indicator> indicatorClass = indicatorMapper.findClassById(indicatorId);
try {
indicatorClass.newInstance().deserialize(remoteData);
} catch (InstantiationException | IllegalAccessException e) {
logger.warn(e.getMessage());
}
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
responseObserver.onNext(Empty.newBuilder().build());
responseObserver.onCompleted();
}
};
}
}
......@@ -16,11 +16,13 @@
*
*/
package org.apache.skywalking.oap.server.core.analysis.data;
package org.apache.skywalking.oap.server.core.remote;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
public interface RemoteData {
String selectKey();
public interface Serializable {
RemoteData.Builder serialize();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.client;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class GRPCRemoteClient implements RemoteClient, Comparable<GRPCRemoteClient> {
private static final Logger logger = LoggerFactory.getLogger(GRPCRemoteClient.class);
private final GRPCClient client;
private final DataCarrier<RemoteMessage> carrier;
private final IndicatorMapper indicatorMapper;
public GRPCRemoteClient(IndicatorMapper indicatorMapper, RemoteInstance remoteInstance, int channelSize,
int bufferSize) {
this.indicatorMapper = indicatorMapper;
this.client = new GRPCClient(remoteInstance.getHost(), remoteInstance.getPort());
this.carrier = new DataCarrier<>(channelSize, bufferSize);
this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
this.carrier.consume(new RemoteMessageConsumer(), 1);
}
@Override public void push(int nextWorkerId, Indicator indicator) {
int indicatorId = indicatorMapper.findIdByClass(indicator.getClass());
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setNextWorkerId(nextWorkerId);
builder.setIndicatorId(indicatorId);
builder.setRemoteData(indicator.serialize());
this.carrier.produce(builder.build());
}
class RemoteMessageConsumer implements IConsumer<RemoteMessage> {
@Override public void init() {
}
@Override public void consume(List<RemoteMessage> remoteMessages) {
StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
for (RemoteMessage remoteMessage : remoteMessages) {
streamObserver.onNext(remoteMessage);
}
streamObserver.onCompleted();
}
@Override public void onError(List<RemoteMessage> remoteMessages, Throwable t) {
logger.error(t.getMessage(), t);
}
@Override public void onExit() {
}
}
private StreamObserver<RemoteMessage> createStreamObserver() {
RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel());
StreamStatus status = new StreamStatus(false);
return stub.call(new StreamObserver<Empty>() {
@Override public void onNext(Empty empty) {
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
status.finished();
}
});
}
class StreamStatus {
private final Logger logger = LoggerFactory.getLogger(StreamStatus.class);
private volatile boolean status;
StreamStatus(boolean status) {
this.status = status;
}
public boolean isFinish() {
return status;
}
void finished() {
this.status = true;
}
/**
* @param maxTimeout max wait time, milliseconds.
*/
public void wait4Finish(long maxTimeout) {
long time = 0;
while (!status) {
if (time > maxTimeout) {
break;
}
try2Sleep(5);
time += 5;
}
}
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
* @param millis the length of time to sleep in milliseconds
*/
private void try2Sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
@Override public int compareTo(GRPCRemoteClient o) {
return this.client.toString().compareTo(o.client.toString());
}
public String getHost() {
return client.getHost();
}
public int getPort() {
return client.getPort();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.client;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
/**
* @author peng-yongsheng
*/
public interface RemoteClient {
String getHost();
int getPort();
void push(int nextWorkerId, Indicator indicator);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.client;
import java.util.*;
import java.util.concurrent.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class RemoteClientManager implements Service {
private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);
private final ModuleManager moduleManager;
private IndicatorMapper indicatorMapper;
private ClusterNodesQuery clusterNodesQuery;
private final List<RemoteClient> clientsA;
private final List<RemoteClient> clientsB;
private List<RemoteClient> usingClients;
public RemoteClientManager(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.clientsA = new LinkedList<>();
this.clientsB = new LinkedList<>();
this.usingClients = clientsA;
}
public void start() {
this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).getService(ClusterNodesQuery.class);
this.indicatorMapper = moduleManager.find(ClusterModule.NAME).getService(IndicatorMapper.class);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 1, 2, TimeUnit.SECONDS);
}
private void refresh() {
List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
Collections.sort(instanceList);
if (!compare(instanceList)) {
buildNewClients(instanceList);
}
}
public List<RemoteClient> getRemoteClient() {
return usingClients;
}
private List<RemoteClient> getFreeClients() {
if (usingClients.equals(clientsA)) {
return clientsB;
} else {
return clientsA;
}
}
private void switchCurrentClients() {
if (usingClients.equals(clientsA)) {
usingClients = clientsB;
} else {
usingClients = clientsA;
}
}
private void buildNewClients(List<RemoteInstance> remoteInstances) {
getFreeClients().clear();
Map<String, RemoteClient> currentClientsMap = new HashMap<>();
this.usingClients.forEach(remoteClient -> {
currentClientsMap.put(address(remoteClient.getHost(), remoteClient.getPort()), remoteClient);
});
remoteInstances.forEach(remoteInstance -> {
String address = address(remoteInstance.getHost(), remoteInstance.getPort());
RemoteClient client;
if (currentClientsMap.containsKey(address)) {
client = currentClientsMap.get(address);
} else {
if (remoteInstance.isSelf()) {
client = new SelfRemoteClient(moduleManager, remoteInstance.getHost(), remoteInstance.getPort());
} else {
client = new GRPCRemoteClient(indicatorMapper, remoteInstance, 1, 3000);
}
}
getFreeClients().add(client);
});
switchCurrentClients();
}
private boolean compare(List<RemoteInstance> remoteInstances) {
if (usingClients.size() == remoteInstances.size()) {
for (int i = 0; i < usingClients.size(); i++) {
if (!address(usingClients.get(i).getHost(), usingClients.get(i).getPort()).equals(address(remoteInstances.get(i).getHost(), remoteInstances.get(i).getPort()))) {
return false;
}
}
return true;
} else {
return false;
}
}
private String address(String host, int port) {
return host + String.valueOf(port);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.client;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public class SelfRemoteClient implements RemoteClient {
private final ModuleManager moduleManager;
private final String host;
private final int port;
public SelfRemoteClient(ModuleManager moduleManager, String host, int port) {
this.moduleManager = moduleManager;
this.host = host;
this.port = port;
}
@Override public String getHost() {
return host;
}
@Override public int getPort() {
return port;
}
@Override public void push(int nextWorkerId, Indicator indicator) {
WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
workerMapper.findInstanceById(nextWorkerId).in(indicator);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.selector;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ForeverFirstSelector implements RemoteClientSelector {
private static final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);
@Override public RemoteClient select(List<RemoteClient> clients, Indicator indicator) {
if (logger.isDebugEnabled()) {
logger.debug("clients size: {}", clients.size());
}
return clients.get(0);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.selector;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
/**
* @author peng-yongsheng
*/
public class HashCodeSelector implements RemoteClientSelector {
@Override public RemoteClient select(List<RemoteClient> clients, Indicator indicator) {
int size = clients.size();
int selectIndex = Math.abs(indicator.hashCode()) % size;
return clients.get(selectIndex);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.selector;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
/**
* @author peng-yongsheng
*/
public interface RemoteClientSelector {
RemoteClient select(List<RemoteClient> clients, Indicator indicator);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote.selector;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
/**
* @author peng-yongsheng
*/
public class RollingSelector implements RemoteClientSelector {
private int index = 0;
@Override public RemoteClient select(List<RemoteClient> clients, Indicator indicator) {
int size = clients.size();
index++;
int selectIndex = Math.abs(index) % size;
if (index == Integer.MAX_VALUE) {
index = 0;
}
return clients.get(selectIndex);
}
}
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.core.remote;
package org.apache.skywalking.oap.server.core.remote.selector;
/**
* @author peng-yongsheng
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.apache.skywalking.oap.server.core.remote.grpc.proto";
service RemoteService {
rpc call (stream RemoteMessage) returns (Empty) {
}
}
message RemoteMessage {
int32 nextWorkerId = 1;
int32 indicatorId = 2;
RemoteData remoteData = 3;
}
message RemoteData {
repeated string dataStrings = 1;
repeated int64 dataLongs = 2;
repeated double dataDoubles = 3;
repeated int32 dataIntegers = 4;
}
message Empty {
}
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgIndicator
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgAggregateWorker
org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgRemoteWorker
org.apache.skywalking.oap.server.core.analysis.endpoint.EndpointLatencyAvgPersistentWorker
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.indicator.define;
import org.junit.*;
/**
* @author peng-yongsheng
*/
public class IndicatorMapperTestCase {
@Test
public void test() throws IndicatorDefineLoadException {
IndicatorMapper mapper = new IndicatorMapper();
mapper.load();
Assert.assertEquals(1, mapper.findIdByClass(TestAvgIndicator.class));
Assert.assertEquals(TestAvgIndicator.class, mapper.findClassById(1));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.indicator.define;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
* @author peng-yongsheng
*/
public class TestAvgIndicator extends AvgIndicator {
@Setter @Getter private int id;
@Override public RemoteData.Builder serialize() {
return null;
}
@Override public void deserialize(RemoteData remoteData) {
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.core.analysis.indicator.define.TestAvgIndicator
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<Configuration status="DEBUG">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="DEBUG">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.library.client.grpc;
import io.grpc.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.library.client.Client;
/**
......@@ -26,9 +27,9 @@ import org.apache.skywalking.oap.server.library.client.Client;
*/
public class GRPCClient implements Client {
private final String host;
@Getter private final String host;
private final int port;
@Getter private final int port;
private ManagedChannel channel;
......
......@@ -18,8 +18,7 @@
package org.apache.skywalking.oap.server.library.module;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
/**
* The <code>ModuleProvider</code> is an implementation of a {@link ModuleDefine}.
......
......@@ -35,8 +35,6 @@ public class MeshGRPCHandler extends ServiceMeshMetricServiceGrpc.ServiceMeshMet
if (logger.isDebugEnabled()) {
logger.debug("Received mesh metric: {}", metric);
}
}
@Override public void onError(Throwable throwable) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册