提交 08509753 编写于 作者: wu-sheng's avatar wu-sheng

Finish the codebase of Alarm.

上级 b9eafa08
......@@ -37,6 +37,7 @@
<module>server-library</module>
<module>server-starter</module>
<module>server-query-plugin</module>
<module>server-alarm-plugin</module>
</modules>
<properties>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oap-server</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.0.0-alpha-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>server-alarm-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
<version>6.0.0-alpha-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.joda.time.LocalDateTime;
import org.joda.time.Minutes;
/**
* Alarm core includes metric values in certain time windows based on alarm settings. By using its internal timer
* trigger and the alarm rules to decides whether send the alarm to database and webhook(s)
*
* @author wusheng
*/
public class AlarmCore {
private Map<String, List<RunningRule>> runningContext;
private LocalDateTime lastExecuteTime;
AlarmCore(Rules rules) {
runningContext = new HashMap<>();
rules.getRules().forEach(rule -> {
RunningRule runningRule = new RunningRule(rule);
String indicatorName = rule.getIndicatorName();
List<RunningRule> runningRules = runningContext.get(indicatorName);
if (runningRules == null) {
runningRules = new ArrayList<>();
runningContext.put(indicatorName, runningRules);
}
runningRules.add(runningRule);
});
}
public List<RunningRule> findRunningRule(String indicatorName) {
return runningContext.get(indicatorName);
}
public void start() {
LocalDateTime now = LocalDateTime.now();
lastExecuteTime = now;
runningContext.values().forEach(ruleList -> ruleList.forEach(runningRule -> runningRule.start(now)));
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() ->
runningContext.values().forEach(ruleList -> ruleList.forEach(runningRule -> {
LocalDateTime checkTime = LocalDateTime.now();
int minutes = Minutes.minutesBetween(checkTime, lastExecuteTime).getMinutes();
if (minutes > 0) {
runningRule.moveTo(checkTime);
/**
* At least, run 15 seconds in 1 min, alarm check should run.
*/
if (checkTime.getSecondOfMinute() > 15) {
runningRule.check();
// Set the last execute time, and make sure the second is `00`, such as: 18:30:00
lastExecuteTime = checkTime.minusSeconds(checkTime.getSecondOfMinute());
runningRule.check();
}
}
})), 10, 10, TimeUnit.SECONDS);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider;
import java.io.FileNotFoundException;
import java.io.Reader;
import org.apache.skywalking.oap.server.core.alarm.AlarmModule;
import org.apache.skywalking.oap.server.core.alarm.IndicatorNotify;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
public class AlarmModuleProvider extends ModuleProvider {
@Override public String name() {
return "default";
}
@Override public Class<? extends ModuleDefine> module() {
return AlarmModule.class;
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
return new AlarmSettings();
}
@Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
Reader applicationReader;
try {
applicationReader = ResourceUtils.read("alarm-settings.yml");
} catch (FileNotFoundException e) {
throw new ModuleStartException("can't load alarm-settings.yml",e);
}
RulesReader reader = new RulesReader(applicationReader);
Rules rules = reader.readRules();
this.registerServiceImplementation(IndicatorNotify.class, new NotifyHandler(rules));
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
}
@Override public String[] requiredModules() {
return new String[0];
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
/**
* @author wusheng
*/
@Setter(AccessLevel.PUBLIC)
@Getter(AccessLevel.PUBLIC)
public class AlarmRule {
private String alarmRuleName;
private String indicatorName;
private String threshold;
private String op;
private int period;
private int count;
private int silencePeriod;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
public class AlarmSettings extends ModuleConfig {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider;
public enum IndicatorValueType {
LONG, INT, DOUBLE
}
......@@ -16,56 +16,46 @@
*
*/
package org.apache.skywalking.oap.server.core.alarm;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.indicator.LongValueHolder;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity;
import org.junit.Assert;
import org.junit.Test;
public class IndicatorAlarmListenerTest {
@Test
public void testIndicatorPreAnalysis() {
IndicatorAlarmListener listener = new IndicatorAlarmListener();
listener.notify(ATestClass.class);
listener.notify(UnCompleteIndicator.class);
listener.notify(MockIndicator.class);
Assert.assertNull(listener.getTarget(ATestClass.class));
Assert.assertNull(listener.getTarget(UnCompleteIndicator.class));
Assert.assertNotNull(listener.getTarget(MockIndicator.class));
package org.apache.skywalking.oap.server.core.alarm.provider;
import java.util.List;
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.IndicatorNotify;
import org.apache.skywalking.oap.server.core.alarm.MetaInAlarm;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
public class NotifyHandler implements IndicatorNotify {
private final AlarmCore core;
private final Rules rules;
private List<AlarmCallback> allCallbacks;
public NotifyHandler(Rules rules) {
this.rules = rules;
core = new AlarmCore(rules);
core.start();
}
public class ATestClass {
}
@IndicatorType
public class UnCompleteIndicator {
@Override public void notify(MetaInAlarm meta, Indicator indicator) {
switch (meta.getScope()) {
case Service:
break;
default:
return;
}
List<RunningRule> runningRules = core.findRunningRule(meta.getIndicatorName());
if (runningRules == null) {
return;
}
runningRules.forEach(rule -> rule.in(meta, indicator));
}
@IndicatorType
@StorageEntity(name = "mock_indicator", builder = MockBuilder.class)
public class MockIndicator implements LongValueHolder {
@Override public long getValue() {
return 0;
@Override
public void init(AlarmCallback... callbacks) {
for (AlarmCallback callback : callbacks) {
allCallbacks.add(callback);
}
allCallbacks.add(new WebhookCallback(rules.getWebhooks()));
}
public class MockBuilder implements StorageBuilder {
@Override public StorageData map2Data(Map dbMap) {
return null;
}
@Override public Map<String, Object> data2Map(StorageData storageData) {
return null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider;
public enum OP {
GREATER, LESS, EQUAL;
public static OP get(String op) {
switch (op) {
case ">":
return GREATER;
case "<":
return LESS;
case "==":
return EQUAL;
default:
throw new IllegalArgumentException("unknown op, " + op);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider;
import java.util.List;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
@Setter(AccessLevel.PUBLIC)
@Getter(AccessLevel.PUBLIC)
public class Rules {
private List<AlarmRule> rules;
private List<String> webhooks;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider;
import java.io.InputStream;
import java.io.Reader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.yaml.snakeyaml.Yaml;
/**
* Rule Reader parses the given `alarm-settings.yml` config file, to the target {@link Rules}.
*
* @author wusheng
*/
public class RulesReader {
private Map yamlData;
public RulesReader(InputStream inputStream) {
Yaml yaml = new Yaml();
yamlData = yaml.loadAs(inputStream, Map.class);
}
public RulesReader(Reader io) {
Yaml yaml = new Yaml();
yamlData = yaml.loadAs(io, Map.class);
}
public Rules readRules() {
Rules rules = new Rules();
Map rulesData = (Map)yamlData.get("rules");
if (rulesData != null) {
rules.setRules(new ArrayList<>());
rulesData.forEach((k, v) -> {
if (((String)k).endsWith("_rule")) {
AlarmRule alarmRule = new AlarmRule();
alarmRule.setAlarmRuleName((String)k);
Map settings = (Map)v;
Object indicatorName = settings.get("indicator-name");
if (indicatorName == null) {
throw new IllegalArgumentException("indicator-name can't be null");
}
alarmRule.setIndicatorName((String)indicatorName);
alarmRule.setThreshold(settings.get("threshold").toString());
alarmRule.setOp((String)settings.get("op"));
alarmRule.setPeriod((Integer)settings.getOrDefault("period", 1));
alarmRule.setCount((Integer)settings.getOrDefault("count", 1));
alarmRule.setSilencePeriod((Integer)settings.getOrDefault("silence-period", alarmRule.getPeriod()));
rules.getRules().add(alarmRule);
}
});
}
List webhooks = (List)yamlData.get("webhooks");
if (webhooks != null) {
rules.setWebhooks(new ArrayList<>());
webhooks.forEach(url -> {
rules.getWebhooks().add((String)url);
});
}
return rules;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider;
import java.util.LinkedList;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.oap.server.core.alarm.MetaInAlarm;
import org.apache.skywalking.oap.server.core.analysis.indicator.DoubleValueHolder;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.IntValueHolder;
import org.apache.skywalking.oap.server.core.analysis.indicator.LongValueHolder;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.joda.time.LocalDateTime;
import org.joda.time.Minutes;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* RunningRule represents each rule in running status.
* Based on the {@link AlarmRule} definition,
*
*
* @author wusheng
*/
public class RunningRule {
private static final Logger logger = LoggerFactory.getLogger(RunningRule.class);
private static DateTimeFormatter TIME_BUCKET_FORMATTER = DateTimeFormat.forPattern("yyyyMMddHHmm");
private String ruleName;
private final Threshold threshold;
private final OP op;
private final int countThreshold;
private final int silencePeriod;
private int counter;
private int silenceCountdown;
private Window window;
private volatile boolean isStarted = false;
private IndicatorValueType valueType;
private Scope targetScope;
public RunningRule(AlarmRule alarmRule) {
this.ruleName = alarmRule.getAlarmRuleName();
// Init the empty window for alarming rule.
window = new Window(alarmRule.getPeriod());
threshold = new Threshold(alarmRule.getAlarmRuleName(), alarmRule.getThreshold());
op = OP.get(alarmRule.getOp());
this.countThreshold = alarmRule.getCount();
this.silencePeriod = alarmRule.getSilencePeriod();
// -1 means silence countdown is not running.
silenceCountdown = -1;
}
/**
* Receive indicator result from persistence, after it is saved into storage.
* In alarm, only minute dimensionality indicators are expected to process.
*
* @param indicator
*/
public void in(MetaInAlarm meta, Indicator indicator) {
if (!isStarted) {
return;
}
if (valueType == null) {
if (indicator instanceof LongValueHolder) {
valueType = IndicatorValueType.LONG;
threshold.setType(IndicatorValueType.LONG);
} else if (indicator instanceof IntValueHolder) {
valueType = IndicatorValueType.INT;
threshold.setType(IndicatorValueType.INT);
} else if (indicator instanceof DoubleValueHolder) {
valueType = IndicatorValueType.DOUBLE;
threshold.setType(IndicatorValueType.DOUBLE);
}
targetScope = meta.getScope();
}
if (valueType != null) {
window.add(indicator);
}
}
/**
* Start this rule in running mode.
* @param current
*/
public void start(LocalDateTime current) {
window.start(current);
isStarted = true;
}
/**
* Move the buffer window to give time.
* @param targetTime of moving target
*/
public void moveTo(LocalDateTime targetTime) {
window.moveTo(targetTime);
}
/**
* Check the conditions, decide to whether trigger alarm.
*/
public void check() {
boolean isMatched = window.isMatch();
/**
* When
* 1. Metric value threshold triggers alarm by rule
* 2. Counter reaches the count threshold;
* 3. Isn't in silence stage, judged by SilenceCountdown(!=0).
*/
if (isMatched) {
counter++;
if (counter >= countThreshold && silenceCountdown < 1) {
triggerAlarm();
}
} else {
silenceCountdown--;
if (counter > 0) {
counter--;
}
}
}
/**
* Trigger alarm callbacks.
*/
private void triggerAlarm() {
silenceCountdown = silencePeriod;
}
/**
* A indicator window, based on {@link AlarmRule#period}.
* This window slides with time, just keeps the recent N(period) buckets.
*
* @author wusheng
*/
public class Window {
private LocalDateTime endTime;
private int period;
private LinkedList<Indicator> values;
private ReentrantLock lock = new ReentrantLock();
public Window(int period) {
this.period = period;
init();
}
public void start(LocalDateTime current) {
this.endTime = current;
}
public void moveTo(LocalDateTime current) {
lock.lock();
try {
int minutes = Minutes.minutesBetween(endTime, current).getMinutes();
if (minutes <= 0) {
return;
}
if (minutes > values.size()) {
// re-init
init();
} else {
for (int i = 0; i < minutes; i++) {
values.removeFirst();
values.addLast(null);
}
}
endTime = current;
} finally {
lock.unlock();
}
}
public void add(Indicator indicator) {
long bucket = indicator.getTimeBucket();
LocalDateTime timebucket = TIME_BUCKET_FORMATTER.parseLocalDateTime(bucket + "");
int minutes = Minutes.minutesBetween(endTime, timebucket).getMinutes();
if (minutes == -1) {
this.moveTo(timebucket);
}
lock.lock();
try {
minutes = Minutes.minutesBetween(endTime, timebucket).getMinutes();
if (minutes < 0) {
// At any moment, should NOT be here
// Add this code just because of my obsession :P
return;
}
if (minutes >= values.size()) {
// too old data
// also should happen, but maybe if agent/probe mechanism time is not right.
return;
}
int idx = minutes - 1;
values.set(idx, indicator);
} finally {
lock.unlock();
}
}
public boolean isMatch() {
int matchCount = 0;
for (Indicator indicator : values) {
if (indicator == null) {
continue;
}
switch (valueType) {
case LONG:
long lvalue = ((LongValueHolder)indicator).getValue();
long lexpected = RunningRule.this.threshold.getLongThreshold();
switch (op) {
case GREATER:
if (lvalue > lexpected)
matchCount++;
break;
case LESS:
if (lvalue < lexpected)
matchCount++;
break;
case EQUAL:
if (lvalue == lexpected)
matchCount++;
break;
}
break;
case INT:
int ivalue = ((IntValueHolder)indicator).getValue();
int iexpected = RunningRule.this.threshold.getIntThreshold();
switch (op) {
case LESS:
if (ivalue < iexpected)
matchCount++;
break;
case GREATER:
if (ivalue > iexpected)
matchCount++;
break;
case EQUAL:
if (ivalue == iexpected)
matchCount++;
break;
}
break;
case DOUBLE:
double dvalue = ((DoubleValueHolder)indicator).getValue();
double dexpected = RunningRule.this.threshold.getDoubleThreadhold();
switch (op) {
case EQUAL:
// NOTICE: double equal is not reliable in Java,
// match result is not predictable
if (dvalue == dexpected)
matchCount++;
break;
case GREATER:
if (dvalue > dexpected)
matchCount++;
break;
case LESS:
if (dvalue < dexpected)
matchCount++;
break;
}
break;
}
}
// Reach the threshold in current bucket.
return matchCount >= countThreshold;
}
private void init() {
values = new LinkedList();
for (int i = 0; i < period; i++) {
values.add(null);
}
}
private Indicator clone(Indicator indicator) {
Class<? extends StorageBuilder> builder = StorageEntityAnnotationUtils.getBuilder(indicator.getClass());
try {
StorageBuilder indicatorBuilder = builder.newInstance();
return (Indicator)indicatorBuilder.map2Data(indicatorBuilder.data2Map(indicator));
} catch (InstantiationException e) {
logger.error("clone indicator error, {}", indicator.getClass());
return null;
} catch (IllegalAccessException e) {
logger.error("clone indicator error, {}", indicator.getClass());
return null;
} catch (Exception e) {
logger.error("clone indicator error, {}", indicator.getClass());
return null;
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wusheng
*/
public class Threshold {
private static final Logger logger = LoggerFactory.getLogger(Threshold.class);
private String alarmRuleName;
private final String threshold;
private int intThreshold;
private double doubleThreadhold;
private long longThreshold;
public Threshold(String alarmRuleName, String threshold) {
this.alarmRuleName = alarmRuleName;
this.threshold = threshold;
}
public int getIntThreshold() {
return intThreshold;
}
public double getDoubleThreadhold() {
return doubleThreadhold;
}
public long getLongThreshold() {
return longThreshold;
}
public void setType(IndicatorValueType type) {
try {
switch (type) {
case INT:
intThreshold = Integer.parseInt(threshold);
break;
case LONG:
longThreshold = Long.parseLong(threshold);
break;
case DOUBLE:
doubleThreadhold = Double.parseDouble(threshold);
break;
}
} catch (NumberFormatException e) {
logger.warn("Alarm rule {} threshold doesn't match the indicator type, expected type: {}", alarmRuleName, type);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider;
import java.util.List;
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
/**
* Use SkyWalking alarm webhook API call a remote endpoints.
*/
public class WebhookCallback implements AlarmCallback {
private List<String> remoteEndpoints;
public WebhookCallback(List<String> remoteEndpoints) {
this.remoteEndpoints = remoteEndpoints;
}
@Override public void doAlarm(AlarmMessage alarmMessage) {
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.core.alarm.provider.AlarmModuleProvider
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm.provider;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
public class AlarmRuleInitTest {
@Test
public void testInit() {
RulesReader reader = new RulesReader(this.getClass().getClassLoader()
.getResourceAsStream("alarm-settings.yml"));
Rules rules = reader.readRules();
List<AlarmRule> ruleList = rules.getRules();
Assert.assertEquals(2, ruleList.size());
Assert.assertEquals("85", ruleList.get(1).getThreshold());
Assert.assertEquals("endpoint_percent_rule", ruleList.get(0).getAlarmRuleName());
List<String> rulesWebhooks = rules.getWebhooks();
Assert.assertEquals(2, rulesWebhooks.size());
Assert.assertEquals("http://127.0.0.1/go-wechat/", rulesWebhooks.get(1));
}
}
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
rules:
# Rule unique name, must be ended with `_rule`.
endpoint_percent_rule:
# Indicator value need to be long, double or int
indicator-name: endpoint_percent
threshold: 75
op: <
# The length of time to evaluate the metric
period: 10
# How many times after the metric match the condition, will trigger alarm
count: 3
# How many times of checks, the alarm keeps silence after alarm triggered, default as same as period.
silence-period: 10
service_percent_rule:
indicator-name: service_percent
threshold: 85
op: <
period: 10
count: 4
webhooks:
- http://127.0.0.1/notify/
- http://127.0.0.1/go-wechat/
......@@ -31,6 +31,10 @@
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-module</artifactId>
......
......@@ -19,7 +19,6 @@
package org.apache.skywalking.oap.server.core;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.alarm.IndicatorAlarmListener;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
......@@ -109,7 +108,6 @@ public class CoreModuleProvider extends ModuleProvider {
annotationScan.registerListener(streamAnnotationListener);
annotationScan.registerListener(new IndicatorTypeListener(getManager()));
annotationScan.registerListener(new InventoryTypeListener(getManager()));
annotationScan.registerListener(IndicatorAlarmListener.INSTANCE);
this.remoteClientManager = new RemoteClientManager(getManager());
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
......
......@@ -19,11 +19,11 @@
package org.apache.skywalking.oap.server.core.alarm;
/**
* Indicator Alarm data type represents the indicator result data time, based on this, the core drives the {@link
* IndicatorNotify} to notify the alarm implementor.
* Alarm call back will be called by alarm implementor,
* after it decided alarm should be sent.
*
* @author wusheng
*/
public enum IndicatorAlarmDataType {
LONG, INT, DOUBLE
public interface AlarmCallback {
void doAlarm(AlarmMessage alarmMessage);
}
......@@ -18,58 +18,66 @@
package org.apache.skywalking.oap.server.core.alarm;
import java.lang.annotation.Annotation;
import java.util.HashMap;
import org.apache.skywalking.oap.server.core.analysis.indicator.DoubleValueHolder;
import org.apache.skywalking.oap.server.core.analysis.indicator.IntValueHolder;
import org.apache.skywalking.oap.server.core.analysis.indicator.LongValueHolder;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntity;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* Indicator Alarm listener does the pre-analysis of each indicator implementation, when alarm happens, it drives
* notification based on indicator value data type.
*
* @author wusheng
*/
public class IndicatorAlarmListener implements AnnotationListener {
public static final IndicatorAlarmListener INSTANCE = new IndicatorAlarmListener();
private final HashMap<Class, NotifyTarget> notifyTarget;
IndicatorAlarmListener() {
notifyTarget = new HashMap<>();
}
public class AlarmEntrance {
private ModuleManager moduleManager;
private ServiceInventoryCache serviceInventoryCache;
private IndicatorNotify indicatorNotify;
private ReentrantLock initLock;
@Override public Class<? extends Annotation> annotation() {
return IndicatorType.class;
public AlarmEntrance(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.initLock = new ReentrantLock();
}
@Override public void notify(Class aClass) {
StorageEntity storageEntityAnnotation = (StorageEntity)aClass.getAnnotation(StorageEntity.class);
if (storageEntityAnnotation == null) {
public void forward(Indicator indicator) {
if (!moduleManager.has(AlarmModule.NAME)) {
return;
}
String indicatorName = storageEntityAnnotation.name();
NotifyTarget target = new NotifyTarget();
target.setIndicatorName(indicatorName);
if (DoubleValueHolder.class.isAssignableFrom(aClass)) {
target.setDataType(IndicatorAlarmDataType.DOUBLE);
} else if (LongValueHolder.class.isAssignableFrom(aClass)) {
target.setDataType(IndicatorAlarmDataType.LONG);
} else if (IntValueHolder.class.isAssignableFrom(aClass)) {
target.setDataType(IndicatorAlarmDataType.INT);
} else {
// If don't declare as any value holder, this is not an alarm candidate value.
return;
init();
AlarmMeta alarmMeta = ((AlarmSupported)indicator).getAlarmMeta();
MetaInAlarm metaInAlarm = null;
switch (alarmMeta.getScope()) {
case Service:
int serviceId = alarmMeta.getIds().getID(0);
ServiceInventory serviceInventory = serviceInventoryCache.get(serviceId);
ServiceMetaInAlarm serviceMetaInAlarm = new ServiceMetaInAlarm();
serviceMetaInAlarm.setIndicatorName(alarmMeta.getIndicatorName());
serviceMetaInAlarm.setId(serviceId);
serviceMetaInAlarm.setName(serviceInventory.getName());
metaInAlarm = serviceMetaInAlarm;
break;
default:
return;
}
notifyTarget.put(aClass, target);
indicatorNotify.notify(metaInAlarm, indicator);
}
public NotifyTarget getTarget(Class indicatorClass) {
return notifyTarget.get(indicatorClass);
private void init() {
if (serviceInventoryCache == null) {
initLock.lock();
try {
if (serviceInventoryCache == null) {
serviceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class);
indicatorNotify = moduleManager.find(AlarmModule.NAME).getService(IndicatorNotify.class);
indicatorNotify.init(new AlarmStandardPersistence());
}
} finally {
initLock.unlock();
}
}
}
}
......@@ -18,13 +18,16 @@
package org.apache.skywalking.oap.server.core.alarm;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
/**
* Alarm message represents the details of each alarm.
*
* @author wusheng
*/
public class NotifyTarget {
@Setter @Getter private String indicatorName;
@Setter @Getter private IndicatorAlarmDataType dataType;
@Setter(AccessLevel.PUBLIC)
@Getter(AccessLevel.PUBLIC)
public class AlarmMessage {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm;
/**
* Save the alarm info into storage for UI query.
*/
public class AlarmStandardPersistence implements AlarmCallback {
@Override public void doAlarm(AlarmMessage alarmMessage) {
//TODO Peng-yongsheng
/**
* This is just a callback entrance.
*/
}
}
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.alarm;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.library.module.Service;
/**
......@@ -31,9 +32,7 @@ import org.apache.skywalking.oap.server.library.module.Service;
* @author wusheng
*/
public interface IndicatorNotify extends Service {
void notify(String indicatorName, double value);
void notify(MetaInAlarm indicatorName, Indicator indicator);
void notify(String indicatorName, int value);
void notify(String indicatorName, long value);
void init(AlarmCallback... callbacks);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm;
import org.apache.skywalking.oap.server.core.source.Scope;
public interface MetaInAlarm {
Scope getScope();
String getName();
String getIndicatorName();
/**
* In most scopes, there is only id0, as primary id. Such as Service, Endpoint.
* But in relation, the ID includes two, actually.
* Such as ServiceRelation,
* id0 represents the source service id
*
* @return the primary id.
*/
int getId0();
/**
* Only exist in multiple IDs case,
* Such as ServiceRelation,
* id1 represents the dest service id
*
* @return
*/
int getId1();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.alarm;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.source.Scope;
@Getter(AccessLevel.PUBLIC)
@Setter(AccessLevel.PUBLIC)
public class ServiceMetaInAlarm implements MetaInAlarm {
private String indicatorName;
private int id;
private String name;
private String[] tags;
private String[] properties;
@Override public Scope getScope() {
return Scope.Service;
}
@Override public int getId0() {
return id;
}
@Override public int getId1() {
return 0;
}
}
......@@ -18,8 +18,11 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import org.apache.skywalking.oap.server.core.alarm.AlarmEntrance;
import org.apache.skywalking.oap.server.core.alarm.AlarmSupported;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* Alarm notify worker, do a simple route to alarm core after the aggregation persistence.
......@@ -27,11 +30,18 @@ import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
* @author wusheng
*/
public class AlarmNotifyWorker extends AbstractWorker<Indicator> {
public AlarmNotifyWorker(int workerId) {
private ModuleManager moduleManager;
private AlarmEntrance entrance;
public AlarmNotifyWorker(int workerId, ModuleManager moduleManager) {
super(workerId);
this.moduleManager = moduleManager;
this.entrance = new AlarmEntrance(moduleManager);
}
@Override public void in(Indicator indicator) {
if (indicator instanceof AlarmSupported) {
entrance.forward(indicator);
}
}
}
......@@ -50,7 +50,7 @@ public enum IndicatorProcess {
throw new UnexpectedException("");
}
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(WorkerIdGenerator.INSTANCES.generate());
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager);
WorkerInstances.INSTANCES.put(alarmNotifyWorker.getWorkerId(), alarmNotifyWorker);
IndicatorPersistentWorker persistentWorker = new IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
......
......@@ -31,10 +31,6 @@
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
......@@ -101,6 +97,13 @@
<version>${project.version}</version>
</dependency>
<!-- query module -->
<!-- alarm module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-alarm-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<finalName>skywalking-oap</finalName>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册