CoreModuleProvider.java 11.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.skywalking.oap.server.core;

21
import java.io.IOException;
22
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
G
Gao Hongtao 已提交
23
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
24
import org.apache.skywalking.oap.server.core.analysis.*;
G
Gao Hongtao 已提交
25
import org.apache.skywalking.oap.server.core.analysis.metrics.ApdexMetrics;
26
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
27
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
28
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
29 30
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
31
import org.apache.skywalking.oap.server.core.command.CommandService;
32
import org.apache.skywalking.oap.server.core.config.*;
33
import org.apache.skywalking.oap.server.core.profile.ProfileTaskMutationService;
34 35 36 37 38
import org.apache.skywalking.oap.server.core.oal.rt.*;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.*;
import org.apache.skywalking.oap.server.core.remote.client.*;
39
import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
40 41
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.*;
42
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
43
import org.apache.skywalking.oap.server.core.storage.model.*;
44
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
45 46
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.*;
47 48 49
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
wu-sheng's avatar
wu-sheng 已提交
50
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
51 52 53 54 55 56 57 58 59

/**
 * @author peng-yongsheng
 */
public class CoreModuleProvider extends ModuleProvider {

    private final CoreModuleConfig moduleConfig;
    private GRPCServer grpcServer;
    private JettyServer jettyServer;
60
    private RemoteClientManager remoteClientManager;
61
    private final AnnotationScan annotationScan;
62
    private final StorageModels storageModels;
63
    private final SourceReceiverImpl receiver;
wu-sheng's avatar
wu-sheng 已提交
64
    private OALEngine oalEngine;
G
Gao Hongtao 已提交
65
    private ApdexThresholdConfig apdexThresholdConfig;
66 67 68 69

    public CoreModuleProvider() {
        super();
        this.moduleConfig = new CoreModuleConfig();
70
        this.annotationScan = new AnnotationScan();
71
        this.storageModels = new StorageModels();
72
        this.receiver = new SourceReceiverImpl();
73 74 75 76 77 78
    }

    @Override public String name() {
        return "default";
    }

彭勇升 pengys 已提交
79
    @Override public Class<? extends ModuleDefine> module() {
80 81 82 83 84 85 86
        return CoreModule.class;
    }

    @Override public ModuleConfig createConfigBeanIfAbsent() {
        return moduleConfig;
    }

wu-sheng's avatar
wu-sheng 已提交
87
    @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
88
        StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(getManager());
wu-sheng's avatar
wu-sheng 已提交
89

wu-sheng's avatar
wu-sheng 已提交
90 91 92
        AnnotationScan scopeScan = new AnnotationScan();
        scopeScan.registerListener(new DefaultScopeDefine.Listener());
        try {
wu-sheng's avatar
wu-sheng 已提交
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
            scopeScan.scan();

            oalEngine = OALEngineLoader.get();
            oalEngine.setStreamListener(streamAnnotationListener);
            oalEngine.setDispatcherListener(receiver.getDispatcherManager());
            oalEngine.start(getClass().getClassLoader());
        } catch (Exception e) {
            throw new ModuleStartException(e.getMessage(), e);
        }

        AnnotationScan oalDisable = new AnnotationScan();
        oalDisable.registerListener(DisableRegister.INSTANCE);
        oalDisable.registerListener(new DisableRegister.SingleDisableScanListener());
        try {
            oalDisable.scan();
wu-sheng's avatar
wu-sheng 已提交
108 109 110 111
        } catch (IOException e) {
            throw new ModuleStartException(e.getMessage(), e);
        }

112
        grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort());
113 114 115 116 117 118
        if (moduleConfig.getMaxConcurrentCallsPerConnection() > 0) {
            grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getMaxConcurrentCallsPerConnection());
        }
        if (moduleConfig.getMaxMessageSize() > 0) {
            grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize());
        }
119 120 121 122 123 124
        if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) {
            grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize());
        }
        if (moduleConfig.getGRPCThreadPoolSize() > 0) {
            grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
        }
125 126
        grpcServer.initialize();

127
        jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig.getJettySelectors());
128 129
        jettyServer.initialize();

130
        this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig));
131 132
        this.registerServiceImplementation(DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling()));

133 134 135
        this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
        this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer));

136 137
        this.registerServiceImplementation(IComponentLibraryCatalogService.class, new ComponentLibraryCatalogService());

138
        this.registerServiceImplementation(SourceReceiver.class, receiver);
彭勇升 pengys 已提交
139

140 141 142 143
        WorkerInstancesService instancesService = new WorkerInstancesService();
        this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
        this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);

彭勇升 pengys 已提交
144
        this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
145 146 147
        this.registerServiceImplementation(IModelSetter.class, storageModels);
        this.registerServiceImplementation(IModelGetter.class, storageModels);
        this.registerServiceImplementation(IModelOverride.class, storageModels);
148

wu-sheng's avatar
wu-sheng 已提交
149
        this.registerServiceImplementation(ServiceInventoryCache.class, new ServiceInventoryCache(getManager(), moduleConfig));
150 151
        this.registerServiceImplementation(IServiceInventoryRegister.class, new ServiceInventoryRegister(getManager()));

wu-sheng's avatar
wu-sheng 已提交
152
        this.registerServiceImplementation(ServiceInstanceInventoryCache.class, new ServiceInstanceInventoryCache(getManager(), moduleConfig));
153 154
        this.registerServiceImplementation(IServiceInstanceInventoryRegister.class, new ServiceInstanceInventoryRegister(getManager()));

wu-sheng's avatar
wu-sheng 已提交
155
        this.registerServiceImplementation(EndpointInventoryCache.class, new EndpointInventoryCache(getManager(), moduleConfig));
156 157
        this.registerServiceImplementation(IEndpointInventoryRegister.class, new EndpointInventoryRegister(getManager()));

wu-sheng's avatar
wu-sheng 已提交
158
        this.registerServiceImplementation(NetworkAddressInventoryCache.class, new NetworkAddressInventoryCache(getManager(), moduleConfig));
159 160
        this.registerServiceImplementation(INetworkAddressInventoryRegister.class, new NetworkAddressInventoryRegister(getManager()));

161
        this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
彭勇升 pengys 已提交
162 163
        this.registerServiceImplementation(MetricQueryService.class, new MetricQueryService(getManager()));
        this.registerServiceImplementation(TraceQueryService.class, new TraceQueryService(getManager()));
164
        this.registerServiceImplementation(LogQueryService.class, new LogQueryService(getManager()));
165
        this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager()));
166
        this.registerServiceImplementation(AggregationQueryService.class, new AggregationQueryService(getManager()));
167
        this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
wu-sheng's avatar
wu-sheng 已提交
168
        this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
169

170 171 172 173
        // add profile service implementations
        this.registerServiceImplementation(ProfileTaskMutationService.class, new ProfileTaskMutationService(getManager()));
        this.registerServiceImplementation(ProfileTaskQueryService.class, new ProfileTaskQueryService(getManager()));

174 175
        this.registerServiceImplementation(CommandService.class, new CommandService(getManager()));

wu-sheng's avatar
wu-sheng 已提交
176
        annotationScan.registerListener(streamAnnotationListener);
177

178
        this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout());
179
        this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
180 181

        MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
182
        TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
G
Gao Hongtao 已提交
183 184
        apdexThresholdConfig = new ApdexThresholdConfig(this);
        ApdexMetrics.setDICT(apdexThresholdConfig);
185 186
    }

彭勇升 pengys 已提交
187
    @Override public void start() throws ModuleStartException {
G
Gao Hongtao 已提交
188

彭勇升 pengys 已提交
189
        grpcServer.addHandler(new RemoteServiceHandler(getManager()));
190
        grpcServer.addHandler(new HealthCheckServiceHandler());
191
        remoteClientManager.start();
192

彭勇升 pengys 已提交
193
        try {
194
            receiver.scan();
wu-sheng's avatar
wu-sheng 已提交
195 196 197
            annotationScan.scan();

            oalEngine.notifyAllListeners();
198
        } catch (IOException | IllegalAccessException | InstantiationException e) {
彭勇升 pengys 已提交
199 200
            throw new ModuleStartException(e.getMessage(), e);
        }
201 202 203 204 205 206

        if (CoreModuleConfig.Role.Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equalsIgnoreCase(moduleConfig.getRole())) {
            RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
            this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
        }

G
Gao Hongtao 已提交
207 208
        DynamicConfigurationService dynamicConfigurationService = getManager().find(ConfigurationModule.NAME).provider().getService(DynamicConfigurationService.class);
        dynamicConfigurationService.registerConfigChangeWatcher(apdexThresholdConfig);
209 210
    }

wu-sheng's avatar
Fix CI.  
wu-sheng 已提交
211
    @Override public void notifyAfterCompleted() throws ModuleStartException {
212 213 214 215 216 217 218
        try {
            grpcServer.start();
            jettyServer.start();
        } catch (ServerException e) {
            throw new ModuleStartException(e.getMessage(), e);
        }

219
        PersistenceTimer.INSTANCE.start(getManager(), moduleConfig);
220

221
        if (moduleConfig.isEnableDataKeeperExecutor()) {
K
kezhenxu94 已提交
222
            DataTTLKeeperTimer.INSTANCE.start(getManager(), moduleConfig);
223
        }
224 225

        CacheUpdateTimer.INSTANCE.start(getManager());
226
    }
wu-sheng's avatar
wu-sheng 已提交
227 228 229

    @Override
    public String[] requiredModules() {
230
        return new String[] {TelemetryModule.NAME, ConfigurationModule.NAME};
wu-sheng's avatar
wu-sheng 已提交
231
    }
232
}