CoreModuleProvider.java 19.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.skywalking.oap.server.core;

21 22
import java.io.File;
import java.io.FileInputStream;
wu-sheng's avatar
wu-sheng 已提交
23
import java.io.FileNotFoundException;
24
import java.io.IOException;
25
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
G
Gao Hongtao 已提交
26
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
27 28 29
import org.apache.skywalking.oap.server.core.analysis.ApdexThresholdConfig;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
wu-sheng's avatar
wu-sheng 已提交
30
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
G
Gao Hongtao 已提交
31
import org.apache.skywalking.oap.server.core.analysis.metrics.ApdexMetrics;
32
import org.apache.skywalking.oap.server.core.analysis.worker.ManagementStreamProcessor;
33
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
34
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
35
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
36
import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
37
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
38 39 40
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
41
import org.apache.skywalking.oap.server.core.cluster.OAPNodeChecker;
42
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
43
import org.apache.skywalking.oap.server.core.command.CommandService;
44 45
import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.config.ConfigService;
46
import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
47
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
wu-sheng's avatar
wu-sheng 已提交
48 49 50
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
import org.apache.skywalking.oap.server.core.config.group.EndpointNameGroupingRuleWatcher;
51 52
import org.apache.skywalking.oap.server.core.management.ui.template.UITemplateInitializer;
import org.apache.skywalking.oap.server.core.management.ui.template.UITemplateManagementService;
53
import org.apache.skywalking.oap.server.core.oal.rt.DisableOALDefine;
54
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
55
import org.apache.skywalking.oap.server.core.profile.ProfileTaskMutationService;
56 57
import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
58
import org.apache.skywalking.oap.server.core.query.BrowserLogQueryService;
59
import org.apache.skywalking.oap.server.core.query.EventQueryService;
60 61
import org.apache.skywalking.oap.server.core.query.LogQueryService;
import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
wu-sheng's avatar
wu-sheng 已提交
62
import org.apache.skywalking.oap.server.core.query.MetricsMetadataQueryService;
wu-sheng's avatar
wu-sheng 已提交
63
import org.apache.skywalking.oap.server.core.query.MetricsQueryService;
64 65 66 67 68 69 70 71
import org.apache.skywalking.oap.server.core.query.ProfileTaskQueryService;
import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
72
import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
73 74 75 76 77 78 79
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
80
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
81
import org.apache.skywalking.oap.server.core.storage.StorageException;
82
import org.apache.skywalking.oap.server.core.storage.model.IModelManager;
83 84
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator;
85
import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
86
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
87 88 89 90 91 92 93 94
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
95 96 97
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
98
import org.apache.skywalking.oap.server.library.server.jetty.JettyServerConfig;
99
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
wu-sheng's avatar
wu-sheng 已提交
100
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
101
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
102

103 104 105 106 107 108 109 110
/**
 * Core module provider includes the recommended and default implementations of {@link CoreModule#services()}. All
 * services with these default implementations are widely used including data receiver, data analysis, streaming
 * process, storage and query.
 *
 * NOTICE. In our experiences, no one should re-implement the core module service implementations, unless we are very
 * familiar with all mechanisms of SkyWalking.
 */
111 112 113 114 115
public class CoreModuleProvider extends ModuleProvider {

    private final CoreModuleConfig moduleConfig;
    private GRPCServer grpcServer;
    private JettyServer jettyServer;
116
    private RemoteClientManager remoteClientManager;
117
    private final AnnotationScan annotationScan;
118
    private final StorageModels storageModels;
119
    private final SourceReceiverImpl receiver;
G
Gao Hongtao 已提交
120
    private ApdexThresholdConfig apdexThresholdConfig;
wu-sheng's avatar
wu-sheng 已提交
121
    private EndpointNameGroupingRuleWatcher endpointNameGroupingRuleWatcher;
122
    private OALEngineLoaderService oalEngineLoaderService;
123 124 125 126

    public CoreModuleProvider() {
        super();
        this.moduleConfig = new CoreModuleConfig();
127
        this.annotationScan = new AnnotationScan();
128
        this.storageModels = new StorageModels();
129
        this.receiver = new SourceReceiverImpl();
130 131
    }

132 133
    @Override
    public String name() {
134 135 136
        return "default";
    }

137 138
    @Override
    public Class<? extends ModuleDefine> module() {
139 140 141
        return CoreModule.class;
    }

142 143
    @Override
    public ModuleConfig createConfigBeanIfAbsent() {
144 145 146
        return moduleConfig;
    }

147 148
    @Override
    public void prepare() throws ServiceNotProvidedException, ModuleStartException {
149 150 151
        if (moduleConfig.isActiveExtraModelColumns()) {
            DefaultScopeDefine.activeExtraModelColumns();
        }
wu-sheng's avatar
wu-sheng 已提交
152 153
        EndpointNameGrouping endpointNameGrouping = new EndpointNameGrouping();
        this.registerServiceImplementation(NamingControl.class, new NamingControl(
154 155 156 157
            moduleConfig.getServiceNameMaxLength(),
            moduleConfig.getInstanceNameMaxLength(),
            moduleConfig.getEndpointNameMaxLength(),
            endpointNameGrouping
158
        ));
wu-sheng's avatar
wu-sheng 已提交
159 160
        try {
            endpointNameGroupingRuleWatcher = new EndpointNameGroupingRuleWatcher(
161
                this, endpointNameGrouping);
wu-sheng's avatar
wu-sheng 已提交
162 163 164
        } catch (FileNotFoundException e) {
            throw new ModuleStartException(e.getMessage(), e);
        }
165

wu-sheng's avatar
wu-sheng 已提交
166 167 168
        AnnotationScan scopeScan = new AnnotationScan();
        scopeScan.registerListener(new DefaultScopeDefine.Listener());
        try {
wu-sheng's avatar
wu-sheng 已提交
169 170 171 172 173
            scopeScan.scan();
        } catch (Exception e) {
            throw new ModuleStartException(e.getMessage(), e);
        }

174
        this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager()));
wu-sheng's avatar
wu-sheng 已提交
175

wu-sheng's avatar
wu-sheng 已提交
176 177 178 179 180
        AnnotationScan oalDisable = new AnnotationScan();
        oalDisable.registerListener(DisableRegister.INSTANCE);
        oalDisable.registerListener(new DisableRegister.SingleDisableScanListener());
        try {
            oalDisable.scan();
181
        } catch (IOException | StorageException e) {
wu-sheng's avatar
wu-sheng 已提交
182 183 184
            throw new ModuleStartException(e.getMessage(), e);
        }

185 186
        if (moduleConfig.isGRPCSslEnabled()) {
            grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(),
187 188
                                        moduleConfig.getGRPCSslCertChainPath(),
                                        moduleConfig.getGRPCSslKeyPath()
189
            );
190 191 192
        } else {
            grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort());
        }
193 194 195 196 197 198
        if (moduleConfig.getMaxConcurrentCallsPerConnection() > 0) {
            grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getMaxConcurrentCallsPerConnection());
        }
        if (moduleConfig.getMaxMessageSize() > 0) {
            grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize());
        }
199 200 201 202 203 204
        if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) {
            grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize());
        }
        if (moduleConfig.getGRPCThreadPoolSize() > 0) {
            grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
        }
205 206
        grpcServer.initialize();

207 208 209 210 211 212 213 214 215 216 217 218 219
        JettyServerConfig jettyServerConfig = JettyServerConfig.builder()
                                                               .host(moduleConfig.getRestHost())
                                                               .port(moduleConfig.getRestPort())
                                                               .contextPath(moduleConfig.getRestContextPath())
                                                               .jettyIdleTimeOut(moduleConfig.getRestIdleTimeOut())
                                                               .jettyAcceptorPriorityDelta(
                                                                   moduleConfig.getRestAcceptorPriorityDelta())
                                                               .jettyMinThreads(moduleConfig.getRestMinThreads())
                                                               .jettyMaxThreads(moduleConfig.getRestMaxThreads())
                                                               .jettyAcceptQueueSize(
                                                                   moduleConfig.getRestAcceptQueueSize())
                                                               .build();
        jettyServer = new JettyServer(jettyServerConfig);
220 221
        jettyServer.initialize();

222
        this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig));
223
        this.registerServiceImplementation(
224
            DownSamplingConfigService.class, new DownSamplingConfigService(moduleConfig.getDownsampling()));
225

226 227 228
        this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
        this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer));

229 230
        this.registerServiceImplementation(IComponentLibraryCatalogService.class, new ComponentLibraryCatalogService());

231
        this.registerServiceImplementation(SourceReceiver.class, receiver);
彭勇升 pengys 已提交
232

233 234 235 236
        WorkerInstancesService instancesService = new WorkerInstancesService();
        this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
        this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);

彭勇升 pengys 已提交
237
        this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
238
        this.registerServiceImplementation(ModelCreator.class, storageModels);
239
        this.registerServiceImplementation(IModelManager.class, storageModels);
240
        this.registerServiceImplementation(ModelManipulator.class, storageModels);
241

242
        this.registerServiceImplementation(
243
            NetworkAddressAliasCache.class, new NetworkAddressAliasCache(moduleConfig));
244

245
        this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
wu-sheng's avatar
wu-sheng 已提交
246
        this.registerServiceImplementation(MetricsMetadataQueryService.class, new MetricsMetadataQueryService());
wu-sheng's avatar
wu-sheng 已提交
247
        this.registerServiceImplementation(MetricsQueryService.class, new MetricsQueryService(getManager()));
彭勇升 pengys 已提交
248
        this.registerServiceImplementation(TraceQueryService.class, new TraceQueryService(getManager()));
249
        this.registerServiceImplementation(BrowserLogQueryService.class, new BrowserLogQueryService(getManager()));
250
        this.registerServiceImplementation(LogQueryService.class, new LogQueryService(getManager()));
251
        this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager()));
252
        this.registerServiceImplementation(AggregationQueryService.class, new AggregationQueryService(getManager()));
253
        this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
wu-sheng's avatar
wu-sheng 已提交
254
        this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
255
        this.registerServiceImplementation(EventQueryService.class, new EventQueryService(getManager()));
256

257
        // add profile service implementations
258
        this.registerServiceImplementation(
259
            ProfileTaskMutationService.class, new ProfileTaskMutationService(getManager()));
260
        this.registerServiceImplementation(
261
            ProfileTaskQueryService.class, new ProfileTaskQueryService(getManager(), moduleConfig));
262
        this.registerServiceImplementation(ProfileTaskCache.class, new ProfileTaskCache(getManager(), moduleConfig));
263

264 265
        this.registerServiceImplementation(CommandService.class, new CommandService(getManager()));

266
        // add oal engine loader service implementations
267 268
        oalEngineLoaderService = new OALEngineLoaderService(getManager());
        this.registerServiceImplementation(OALEngineLoaderService.class, oalEngineLoaderService);
269

270
        annotationScan.registerListener(new StreamAnnotationListener(getManager()));
271

272 273
        if (moduleConfig.isGRPCSslEnabled()) {
            this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout(),
274 275
                                                               moduleConfig.getGRPCSslTrustedCAPath()
            );
276 277 278
        } else {
            this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout());
        }
279
        this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
280

281
        // Management
282 283
        this.registerServiceImplementation(
            UITemplateManagementService.class, new UITemplateManagementService(getManager()));
284

285
        MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
286
        TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
G
Gao Hongtao 已提交
287 288
        apdexThresholdConfig = new ApdexThresholdConfig(this);
        ApdexMetrics.setDICT(apdexThresholdConfig);
289 290
    }

291 292
    @Override
    public void start() throws ModuleStartException {
彭勇升 pengys 已提交
293
        grpcServer.addHandler(new RemoteServiceHandler(getManager()));
294
        grpcServer.addHandler(new HealthCheckServiceHandler());
295
        remoteClientManager.start();
296

297 298 299
        // Disable OAL script has higher priority
        oalEngineLoaderService.load(DisableOALDefine.INSTANCE);

彭勇升 pengys 已提交
300
        try {
301
            receiver.scan();
wu-sheng's avatar
wu-sheng 已提交
302
            annotationScan.scan();
303
        } catch (IOException | IllegalAccessException | InstantiationException | StorageException e) {
彭勇升 pengys 已提交
304 305
            throw new ModuleStartException(e.getMessage(), e);
        }
306

307 308
        Address gRPCServerInstanceAddress = new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
        TelemetryRelatedContext.INSTANCE.setId(gRPCServerInstanceAddress.toString());
309
        if (CoreModuleConfig.Role.Mixed.name()
310 311 312 313 314
                                       .equalsIgnoreCase(
                                           moduleConfig.getRole())
            || CoreModuleConfig.Role.Aggregator.name()
                                               .equalsIgnoreCase(
                                                   moduleConfig.getRole())) {
315
            RemoteInstance gRPCServerInstance = new RemoteInstance(gRPCServerInstanceAddress);
316
            this.getManager()
317 318 319 320
                .find(ClusterModule.NAME)
                .provider()
                .getService(ClusterRegister.class)
                .registerRemote(gRPCServerInstance);
321 322
        }

323 324
        OAPNodeChecker.setROLE(CoreModuleConfig.Role.fromName(moduleConfig.getRole()));

325
        DynamicConfigurationService dynamicConfigurationService = getManager().find(ConfigurationModule.NAME)
326 327 328
                                                                              .provider()
                                                                              .getService(
                                                                                  DynamicConfigurationService.class);
G
Gao Hongtao 已提交
329
        dynamicConfigurationService.registerConfigChangeWatcher(apdexThresholdConfig);
wu-sheng's avatar
wu-sheng 已提交
330
        dynamicConfigurationService.registerConfigChangeWatcher(endpointNameGroupingRuleWatcher);
331 332
    }

333 334
    @Override
    public void notifyAfterCompleted() throws ModuleStartException {
335 336 337 338 339 340 341
        try {
            grpcServer.start();
            jettyServer.start();
        } catch (ServerException e) {
            throw new ModuleStartException(e.getMessage(), e);
        }

342
        PersistenceTimer.INSTANCE.start(getManager(), moduleConfig);
343

344
        if (moduleConfig.isEnableDataKeeperExecutor()) {
K
kezhenxu94 已提交
345
            DataTTLKeeperTimer.INSTANCE.start(getManager(), moduleConfig);
346
        }
347

348
        CacheUpdateTimer.INSTANCE.start(getManager(), moduleConfig.getMetricsDataTTL());
349 350

        try {
351 352 353
            final File[] templateFiles = ResourceUtils.getPathFiles("ui-initialized-templates");
            for (final File templateFile : templateFiles) {
                new UITemplateInitializer(new FileInputStream(templateFile))
354 355 356 357
                    .read()
                    .forEach(uiTemplate -> {
                        ManagementStreamProcessor.getInstance().in(uiTemplate);
                    });
358 359
            }

360 361 362
        } catch (FileNotFoundException e) {
            throw new ModuleStartException(e.getMessage(), e);
        }
363
    }
wu-sheng's avatar
wu-sheng 已提交
364 365 366

    @Override
    public String[] requiredModules() {
367 368 369
        return new String[] {
            TelemetryModule.NAME,
            ConfigurationModule.NAME
370
        };
wu-sheng's avatar
wu-sheng 已提交
371
    }
372
}