RunningRule.java 10.8 KB
Newer Older
wu-sheng's avatar
wu-sheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.skywalking.oap.server.core.alarm.provider;

import java.util.LinkedList;
wu-sheng's avatar
wu-sheng 已提交
22
import java.util.List;
wu-sheng's avatar
wu-sheng 已提交
23
import java.util.concurrent.locks.ReentrantLock;
wu-sheng's avatar
wu-sheng 已提交
24 25
import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
wu-sheng's avatar
wu-sheng 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39
import org.apache.skywalking.oap.server.core.alarm.MetaInAlarm;
import org.apache.skywalking.oap.server.core.analysis.indicator.DoubleValueHolder;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.IntValueHolder;
import org.apache.skywalking.oap.server.core.analysis.indicator.LongValueHolder;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.joda.time.LocalDateTime;
import org.joda.time.Minutes;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
wu-sheng's avatar
wu-sheng 已提交
40
 * RunningRule represents each rule in running status. Based on the {@link AlarmRule} definition,
wu-sheng's avatar
wu-sheng 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
 *
 * @author wusheng
 */
public class RunningRule {
    private static final Logger logger = LoggerFactory.getLogger(RunningRule.class);
    private static DateTimeFormatter TIME_BUCKET_FORMATTER = DateTimeFormat.forPattern("yyyyMMddHHmm");

    private String ruleName;
    private final Threshold threshold;
    private final OP op;
    private final int countThreshold;
    private final int silencePeriod;
    private int counter;
    private int silenceCountdown;
    private Window window;
    private volatile boolean isStarted = false;
wu-sheng's avatar
wu-sheng 已提交
57
    private volatile IndicatorValueType valueType;
wu-sheng's avatar
wu-sheng 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
    private Scope targetScope;

    public RunningRule(AlarmRule alarmRule) {
        this.ruleName = alarmRule.getAlarmRuleName();

        // Init the empty window for alarming rule.
        window = new Window(alarmRule.getPeriod());

        threshold = new Threshold(alarmRule.getAlarmRuleName(), alarmRule.getThreshold());
        op = OP.get(alarmRule.getOp());

        this.countThreshold = alarmRule.getCount();
        this.silencePeriod = alarmRule.getSilencePeriod();
        // -1 means silence countdown is not running.
        silenceCountdown = -1;
    }

    /**
wu-sheng's avatar
wu-sheng 已提交
76 77
     * Receive indicator result from persistence, after it is saved into storage. In alarm, only minute dimensionality
     * indicators are expected to process.
wu-sheng's avatar
wu-sheng 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
     *
     * @param indicator
     */
    public void in(MetaInAlarm meta, Indicator indicator) {
        if (!isStarted) {
            return;
        }

        if (valueType == null) {
            if (indicator instanceof LongValueHolder) {
                valueType = IndicatorValueType.LONG;
                threshold.setType(IndicatorValueType.LONG);
            } else if (indicator instanceof IntValueHolder) {
                valueType = IndicatorValueType.INT;
                threshold.setType(IndicatorValueType.INT);
            } else if (indicator instanceof DoubleValueHolder) {
                valueType = IndicatorValueType.DOUBLE;
                threshold.setType(IndicatorValueType.DOUBLE);
            }
            targetScope = meta.getScope();
        }

        if (valueType != null) {
            window.add(indicator);
        }
    }

    /**
     * Start this rule in running mode.
wu-sheng's avatar
wu-sheng 已提交
107
     *
wu-sheng's avatar
wu-sheng 已提交
108 109
     * @param current
     */
110
    public void start(LocalDateTime current) {
wu-sheng's avatar
wu-sheng 已提交
111 112 113 114 115 116
        window.start(current);
        isStarted = true;
    }

    /**
     * Move the buffer window to give time.
wu-sheng's avatar
wu-sheng 已提交
117
     *
wu-sheng's avatar
wu-sheng 已提交
118 119 120 121 122 123 124 125 126
     * @param targetTime of moving target
     */
    public void moveTo(LocalDateTime targetTime) {
        window.moveTo(targetTime);
    }

    /**
     * Check the conditions, decide to whether trigger alarm.
     */
127
    public AlarmMessage check() {
wu-sheng's avatar
wu-sheng 已提交
128 129 130 131 132 133 134 135 136 137 138
        boolean isMatched = window.isMatch();

        /**
         * When
         * 1. Metric value threshold triggers alarm by rule
         * 2. Counter reaches the count threshold;
         * 3. Isn't in silence stage, judged by SilenceCountdown(!=0).
         */
        if (isMatched) {
            counter++;
            if (counter >= countThreshold && silenceCountdown < 1) {
139
                return triggerAlarm();
wu-sheng's avatar
wu-sheng 已提交
140 141
            } else {
                silenceCountdown--;
wu-sheng's avatar
wu-sheng 已提交
142 143 144 145 146 147 148
            }
        } else {
            silenceCountdown--;
            if (counter > 0) {
                counter--;
            }
        }
149
        return AlarmMessage.NONE;
wu-sheng's avatar
wu-sheng 已提交
150 151 152 153 154
    }

    /**
     * Trigger alarm callbacks.
     */
155
    private AlarmMessage triggerAlarm() {
wu-sheng's avatar
wu-sheng 已提交
156
        silenceCountdown = silencePeriod;
wu-sheng's avatar
wu-sheng 已提交
157
        AlarmMessage message = new AlarmMessage();
158
        return message;
wu-sheng's avatar
wu-sheng 已提交
159 160 161
    }

    /**
wu-sheng's avatar
wu-sheng 已提交
162 163
     * A indicator window, based on {@link AlarmRule#period}. This window slides with time, just keeps the recent
     * N(period) buckets.
wu-sheng's avatar
wu-sheng 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
     *
     * @author wusheng
     */
    public class Window {
        private LocalDateTime endTime;
        private int period;

        private LinkedList<Indicator> values;
        private ReentrantLock lock = new ReentrantLock();

        public Window(int period) {
            this.period = period;
            init();
        }

        public void start(LocalDateTime current) {
            this.endTime = current;
        }

        public void moveTo(LocalDateTime current) {
            lock.lock();
            try {
                int minutes = Minutes.minutesBetween(endTime, current).getMinutes();
                if (minutes <= 0) {
                    return;
                }
                if (minutes > values.size()) {
                    // re-init
                    init();
                } else {
                    for (int i = 0; i < minutes; i++) {
                        values.removeFirst();
                        values.addLast(null);
                    }
                }
                endTime = current;
            } finally {
                lock.unlock();
            }
        }

        public void add(Indicator indicator) {
            long bucket = indicator.getTimeBucket();

            LocalDateTime timebucket = TIME_BUCKET_FORMATTER.parseLocalDateTime(bucket + "");

wu-sheng's avatar
wu-sheng 已提交
210
            int minutes = Minutes.minutesBetween(timebucket, endTime).getMinutes();
wu-sheng's avatar
wu-sheng 已提交
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
            if (minutes == -1) {
                this.moveTo(timebucket);

            }

            lock.lock();
            try {
                if (minutes < 0) {
                    // At any moment, should NOT be here
                    // Add this code just because of my obsession :P
                    return;
                }

                if (minutes >= values.size()) {
                    // too old data
                    // also should happen, but maybe if agent/probe mechanism time is not right.
                    return;
                }

wu-sheng's avatar
wu-sheng 已提交
230
                values.set(values.size() - minutes - 1, indicator);
wu-sheng's avatar
wu-sheng 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
            } finally {
                lock.unlock();
            }
        }

        public boolean isMatch() {
            int matchCount = 0;
            for (Indicator indicator : values) {
                if (indicator == null) {
                    continue;
                }

                switch (valueType) {
                    case LONG:
                        long lvalue = ((LongValueHolder)indicator).getValue();
                        long lexpected = RunningRule.this.threshold.getLongThreshold();
                        switch (op) {
                            case GREATER:
                                if (lvalue > lexpected)
                                    matchCount++;
                                break;
                            case LESS:
                                if (lvalue < lexpected)
                                    matchCount++;
                                break;
                            case EQUAL:
                                if (lvalue == lexpected)
                                    matchCount++;
                                break;
                        }
                        break;
                    case INT:
                        int ivalue = ((IntValueHolder)indicator).getValue();
                        int iexpected = RunningRule.this.threshold.getIntThreshold();
                        switch (op) {
                            case LESS:
                                if (ivalue < iexpected)
                                    matchCount++;
                                break;
                            case GREATER:
                                if (ivalue > iexpected)
                                    matchCount++;
                                break;
                            case EQUAL:
                                if (ivalue == iexpected)
                                    matchCount++;
                                break;
                        }
                        break;
                    case DOUBLE:
                        double dvalue = ((DoubleValueHolder)indicator).getValue();
                        double dexpected = RunningRule.this.threshold.getDoubleThreadhold();
                        switch (op) {
                            case EQUAL:
                                // NOTICE: double equal is not reliable in Java,
                                // match result is not predictable
                                if (dvalue == dexpected)
                                    matchCount++;
                                break;
                            case GREATER:
                                if (dvalue > dexpected)
                                    matchCount++;
                                break;
                            case LESS:
                                if (dvalue < dexpected)
                                    matchCount++;
                                break;
                        }
                        break;
                }
            }

            // Reach the threshold in current bucket.
            return matchCount >= countThreshold;
        }

        private void init() {
            values = new LinkedList();
            for (int i = 0; i < period; i++) {
                values.add(null);
            }
        }
    }
}