CoreModuleProvider.java 16.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.skywalking.oap.server.core;

wu-sheng's avatar
wu-sheng 已提交
21
import java.io.FileNotFoundException;
22
import java.io.IOException;
23
import java.nio.file.Paths;
24
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
G
Gao Hongtao 已提交
25
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
26 27 28
import org.apache.skywalking.oap.server.core.analysis.ApdexThresholdConfig;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
wu-sheng's avatar
wu-sheng 已提交
29
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
G
Gao Hongtao 已提交
30
import org.apache.skywalking.oap.server.core.analysis.metrics.ApdexMetrics;
31
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
32
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
33
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
34
import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
35
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
36 37 38 39
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
40
import org.apache.skywalking.oap.server.core.command.CommandService;
41 42
import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.config.ConfigService;
43
import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
44
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
wu-sheng's avatar
wu-sheng 已提交
45 46 47
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
import org.apache.skywalking.oap.server.core.config.group.EndpointNameGroupingRuleWatcher;
48
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
49
import org.apache.skywalking.oap.server.core.profile.ProfileTaskMutationService;
50 51 52 53
import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
import org.apache.skywalking.oap.server.core.query.LogQueryService;
import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
wu-sheng's avatar
wu-sheng 已提交
54
import org.apache.skywalking.oap.server.core.query.MetricsMetadataQueryService;
wu-sheng's avatar
wu-sheng 已提交
55
import org.apache.skywalking.oap.server.core.query.MetricsQueryService;
56 57 58 59 60 61 62 63
import org.apache.skywalking.oap.server.core.query.ProfileTaskQueryService;
import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
64
import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
65 66 67 68 69 70 71
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
72
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
73
import org.apache.skywalking.oap.server.core.storage.StorageException;
74
import org.apache.skywalking.oap.server.core.storage.model.IModelManager;
75 76
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator;
77
import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
78
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
79 80 81 82 83 84 85 86
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
87 88 89
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
wu-sheng's avatar
wu-sheng 已提交
90
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
91

92 93 94 95 96 97 98 99
/**
 * Core module provider includes the recommended and default implementations of {@link CoreModule#services()}. All
 * services with these default implementations are widely used including data receiver, data analysis, streaming
 * process, storage and query.
 *
 * NOTICE. In our experiences, no one should re-implement the core module service implementations, unless we are very
 * familiar with all mechanisms of SkyWalking.
 */
100 101 102 103 104
public class CoreModuleProvider extends ModuleProvider {

    private final CoreModuleConfig moduleConfig;
    private GRPCServer grpcServer;
    private JettyServer jettyServer;
105
    private RemoteClientManager remoteClientManager;
106
    private final AnnotationScan annotationScan;
107
    private final StorageModels storageModels;
108
    private final SourceReceiverImpl receiver;
G
Gao Hongtao 已提交
109
    private ApdexThresholdConfig apdexThresholdConfig;
wu-sheng's avatar
wu-sheng 已提交
110
    private EndpointNameGroupingRuleWatcher endpointNameGroupingRuleWatcher;
111 112 113 114

    public CoreModuleProvider() {
        super();
        this.moduleConfig = new CoreModuleConfig();
115
        this.annotationScan = new AnnotationScan();
116
        this.storageModels = new StorageModels();
117
        this.receiver = new SourceReceiverImpl();
118 119
    }

120 121
    @Override
    public String name() {
122 123 124
        return "default";
    }

125 126
    @Override
    public Class<? extends ModuleDefine> module() {
127 128 129
        return CoreModule.class;
    }

130 131
    @Override
    public ModuleConfig createConfigBeanIfAbsent() {
132 133 134
        return moduleConfig;
    }

135 136
    @Override
    public void prepare() throws ServiceNotProvidedException, ModuleStartException {
137 138 139
        if (moduleConfig.isActiveExtraModelColumns()) {
            DefaultScopeDefine.activeExtraModelColumns();
        }
wu-sheng's avatar
wu-sheng 已提交
140 141
        EndpointNameGrouping endpointNameGrouping = new EndpointNameGrouping();
        this.registerServiceImplementation(NamingControl.class, new NamingControl(
142 143
            moduleConfig.getServiceNameMaxLength(),
            moduleConfig.getInstanceNameMaxLength(),
wu-sheng's avatar
wu-sheng 已提交
144 145
            moduleConfig.getEndpointNameMaxLength(),
            endpointNameGrouping
146
        ));
wu-sheng's avatar
wu-sheng 已提交
147 148 149 150 151 152
        try {
            endpointNameGroupingRuleWatcher = new EndpointNameGroupingRuleWatcher(
                this, endpointNameGrouping);
        } catch (FileNotFoundException e) {
            throw new ModuleStartException(e.getMessage(), e);
        }
153

154
        StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(getManager());
wu-sheng's avatar
wu-sheng 已提交
155

wu-sheng's avatar
wu-sheng 已提交
156 157 158
        AnnotationScan scopeScan = new AnnotationScan();
        scopeScan.registerListener(new DefaultScopeDefine.Listener());
        try {
wu-sheng's avatar
wu-sheng 已提交
159 160 161 162 163
            scopeScan.scan();
        } catch (Exception e) {
            throw new ModuleStartException(e.getMessage(), e);
        }

164
        this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager()));
wu-sheng's avatar
wu-sheng 已提交
165

wu-sheng's avatar
wu-sheng 已提交
166 167 168 169 170
        AnnotationScan oalDisable = new AnnotationScan();
        oalDisable.registerListener(DisableRegister.INSTANCE);
        oalDisable.registerListener(new DisableRegister.SingleDisableScanListener());
        try {
            oalDisable.scan();
171
        } catch (IOException | StorageException e) {
wu-sheng's avatar
wu-sheng 已提交
172 173 174
            throw new ModuleStartException(e.getMessage(), e);
        }

175 176 177
        if (moduleConfig.isGRPCSslEnabled()) {
            grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(),
                                        Paths.get(moduleConfig.getGRPCSslCertChainPath()).toFile(),
178 179
                                        Paths.get(moduleConfig.getGRPCSslKeyPath()).toFile()
            );
180 181 182
        } else {
            grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort());
        }
183 184 185 186 187 188
        if (moduleConfig.getMaxConcurrentCallsPerConnection() > 0) {
            grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getMaxConcurrentCallsPerConnection());
        }
        if (moduleConfig.getMaxMessageSize() > 0) {
            grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize());
        }
189 190 191 192 193 194
        if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) {
            grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize());
        }
        if (moduleConfig.getGRPCThreadPoolSize() > 0) {
            grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
        }
195 196
        grpcServer.initialize();

197 198
        jettyServer = new JettyServer(
            moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig
199
            .getJettySelectors());
200 201
        jettyServer.initialize();

202
        this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig));
203
        this.registerServiceImplementation(
204
            DownSamplingConfigService.class, new DownSamplingConfigService(moduleConfig.getDownsampling()));
205

206 207 208
        this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
        this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer));

209 210
        this.registerServiceImplementation(IComponentLibraryCatalogService.class, new ComponentLibraryCatalogService());

211
        this.registerServiceImplementation(SourceReceiver.class, receiver);
彭勇升 pengys 已提交
212

213 214 215 216
        WorkerInstancesService instancesService = new WorkerInstancesService();
        this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
        this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);

彭勇升 pengys 已提交
217
        this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
218
        this.registerServiceImplementation(ModelCreator.class, storageModels);
219
        this.registerServiceImplementation(IModelManager.class, storageModels);
220
        this.registerServiceImplementation(ModelManipulator.class, storageModels);
221

222
        this.registerServiceImplementation(
223
            NetworkAddressAliasCache.class, new NetworkAddressAliasCache(moduleConfig));
224

225
        this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
wu-sheng's avatar
wu-sheng 已提交
226
        this.registerServiceImplementation(MetricsMetadataQueryService.class, new MetricsMetadataQueryService());
wu-sheng's avatar
wu-sheng 已提交
227
        this.registerServiceImplementation(MetricsQueryService.class, new MetricsQueryService(getManager()));
彭勇升 pengys 已提交
228
        this.registerServiceImplementation(TraceQueryService.class, new TraceQueryService(getManager()));
229
        this.registerServiceImplementation(LogQueryService.class, new LogQueryService(getManager()));
230
        this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager()));
231
        this.registerServiceImplementation(AggregationQueryService.class, new AggregationQueryService(getManager()));
232
        this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
wu-sheng's avatar
wu-sheng 已提交
233
        this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
234

235
        // add profile service implementations
236 237 238 239
        this.registerServiceImplementation(
            ProfileTaskMutationService.class, new ProfileTaskMutationService(getManager()));
        this.registerServiceImplementation(
            ProfileTaskQueryService.class, new ProfileTaskQueryService(getManager(), moduleConfig));
240
        this.registerServiceImplementation(ProfileTaskCache.class, new ProfileTaskCache(getManager(), moduleConfig));
241

242 243
        this.registerServiceImplementation(CommandService.class, new CommandService(getManager()));

244 245 246
        // add oal engine loader service implementations
        this.registerServiceImplementation(OALEngineLoaderService.class, new OALEngineLoaderService(getManager()));

wu-sheng's avatar
wu-sheng 已提交
247
        annotationScan.registerListener(streamAnnotationListener);
248

249 250
        if (moduleConfig.isGRPCSslEnabled()) {
            this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout(),
251 252 253
                                                               Paths.get(moduleConfig.getGRPCSslTrustedCAPath())
                                                                    .toFile()
            );
254 255 256
        } else {
            this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout());
        }
257
        this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
258 259

        MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
260
        TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
G
Gao Hongtao 已提交
261 262
        apdexThresholdConfig = new ApdexThresholdConfig(this);
        ApdexMetrics.setDICT(apdexThresholdConfig);
263 264
    }

265 266
    @Override
    public void start() throws ModuleStartException {
彭勇升 pengys 已提交
267
        grpcServer.addHandler(new RemoteServiceHandler(getManager()));
268
        grpcServer.addHandler(new HealthCheckServiceHandler());
269
        remoteClientManager.start();
270

彭勇升 pengys 已提交
271
        try {
272
            receiver.scan();
wu-sheng's avatar
wu-sheng 已提交
273
            annotationScan.scan();
274
        } catch (IOException | IllegalAccessException | InstantiationException | StorageException e) {
彭勇升 pengys 已提交
275 276
            throw new ModuleStartException(e.getMessage(), e);
        }
277

278
        if (CoreModuleConfig.Role.Mixed.name()
279
                                       .equalsIgnoreCase(
280 281 282 283
                                           moduleConfig.getRole())
            || CoreModuleConfig.Role.Aggregator.name()
                                               .equalsIgnoreCase(
                                                   moduleConfig.getRole())) {
284 285
            RemoteInstance gRPCServerInstance = new RemoteInstance(
                new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
286 287 288 289 290
            this.getManager()
                .find(ClusterModule.NAME)
                .provider()
                .getService(ClusterRegister.class)
                .registerRemote(gRPCServerInstance);
291 292
        }

293 294
        DynamicConfigurationService dynamicConfigurationService = getManager().find(ConfigurationModule.NAME)
                                                                              .provider()
295 296
                                                                              .getService(
                                                                                  DynamicConfigurationService.class);
G
Gao Hongtao 已提交
297
        dynamicConfigurationService.registerConfigChangeWatcher(apdexThresholdConfig);
wu-sheng's avatar
wu-sheng 已提交
298
        dynamicConfigurationService.registerConfigChangeWatcher(endpointNameGroupingRuleWatcher);
299 300
    }

301 302
    @Override
    public void notifyAfterCompleted() throws ModuleStartException {
303 304 305 306 307 308 309
        try {
            grpcServer.start();
            jettyServer.start();
        } catch (ServerException e) {
            throw new ModuleStartException(e.getMessage(), e);
        }

310
        PersistenceTimer.INSTANCE.start(getManager(), moduleConfig);
311

312
        if (moduleConfig.isEnableDataKeeperExecutor()) {
K
kezhenxu94 已提交
313
            DataTTLKeeperTimer.INSTANCE.start(getManager(), moduleConfig);
314
        }
315

316
        CacheUpdateTimer.INSTANCE.start(getManager(), moduleConfig.getMetricsDataTTL());
317
    }
wu-sheng's avatar
wu-sheng 已提交
318 319 320

    @Override
    public String[] requiredModules() {
321 322 323 324
        return new String[] {
            TelemetryModule.NAME,
            ConfigurationModule.NAME
        };
wu-sheng's avatar
wu-sheng 已提交
325
    }
326
}