KubernetesLeaderElector.java 5.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.kubernetes.kubeclient.resources;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;

import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
38 39 40 41 42
 * Represent {@link KubernetesLeaderElector} in kubernetes. {@link LeaderElector#run()} is a
 * blocking call. It should be run in the IO executor, not the main thread. The lifecycle is bound
 * to single leader election. Once the leadership is revoked, as well as the {@link
 * LeaderCallbackHandler#notLeader()} is called, the {@link LeaderElector#run()} will finish. To
 * start another round of election, we need to trigger again.
43
 *
44 45 46 47
 * <p>{@link LeaderElector#run()} is responsible for creating the leader ConfigMap and continuously
 * update the annotation. The annotation key is {@link #LEADER_ANNOTATION_KEY} and the value is in
 * the following json format. metadata: annotations: control-plane.alpha.kubernetes.io/leader:
 * '{"holderIdentity":"623e39fb-70c3-44f1-811f-561ec4a28d75","leaseDuration":15.000000000,"acquireTime":"2020-10-20T04:06:31.431000Z","renewTime":"2020-10-22T08:51:36.843000Z","leaderTransitions":37981}'
48 49 50
 */
public class KubernetesLeaderElector {

51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderElector.class);

    @VisibleForTesting
    public static final String LEADER_ANNOTATION_KEY = "control-plane.alpha.kubernetes.io/leader";

    private final ExecutorService executorService =
            Executors.newSingleThreadExecutor(
                    new ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService"));

    private final LeaderElector<NamespacedKubernetesClient> internalLeaderElector;

    public KubernetesLeaderElector(
            NamespacedKubernetesClient kubernetesClient,
            String namespace,
            KubernetesLeaderElectionConfiguration leaderConfig,
            LeaderCallbackHandler leaderCallbackHandler) {
        final LeaderElectionConfig leaderElectionConfig =
                new LeaderElectionConfigBuilder()
                        .withName(leaderConfig.getConfigMapName())
                        .withLeaseDuration(leaderConfig.getLeaseDuration())
                        .withLock(
                                new ConfigMapLock(
                                        namespace,
                                        leaderConfig.getConfigMapName(),
                                        leaderConfig.getLockIdentity()))
                        .withRenewDeadline(leaderConfig.getRenewDeadline())
                        .withRetryPeriod(leaderConfig.getRetryPeriod())
                        .withLeaderCallbacks(
                                new LeaderCallbacks(
                                        leaderCallbackHandler::isLeader,
                                        leaderCallbackHandler::notLeader,
                                        newLeader ->
                                                LOG.info(
                                                        "New leader elected {} for {}.",
                                                        newLeader,
                                                        leaderConfig.getConfigMapName())))
                        .build();
        internalLeaderElector = new LeaderElector<>(kubernetesClient, leaderElectionConfig);
        LOG.info(
                "Create KubernetesLeaderElector {} with lock identity {}.",
                leaderConfig.getConfigMapName(),
                leaderConfig.getLockIdentity());
    }

    public void run() {
        executorService.submit(internalLeaderElector::run);
    }

    public void stop() {
        executorService.shutdownNow();
    }

    public static boolean hasLeadership(KubernetesConfigMap configMap, String lockIdentity) {
        final String leader = configMap.getAnnotations().get(LEADER_ANNOTATION_KEY);
        return leader != null && leader.contains(lockIdentity);
    }

    /** Callback handler for leader election. */
    public abstract static class LeaderCallbackHandler {

        public abstract void isLeader();

        public abstract void notLeader();
    }
115
}