KubernetesLeaderElector.java 5.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.kubernetes.kubeclient.resources;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;

import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
38 39 40 41 42
 * Represent {@link KubernetesLeaderElector} in kubernetes. {@link LeaderElector#run()} is a
 * blocking call. It should be run in the IO executor, not the main thread. The lifecycle is bound
 * to single leader election. Once the leadership is revoked, as well as the {@link
 * LeaderCallbackHandler#notLeader()} is called, the {@link LeaderElector#run()} will finish. To
 * start another round of election, we need to trigger again.
43
 *
44 45 46 47
 * <p>{@link LeaderElector#run()} is responsible for creating the leader ConfigMap and continuously
 * update the annotation. The annotation key is {@link #LEADER_ANNOTATION_KEY} and the value is in
 * the following json format. metadata: annotations: control-plane.alpha.kubernetes.io/leader:
 * '{"holderIdentity":"623e39fb-70c3-44f1-811f-561ec4a28d75","leaseDuration":15.000000000,"acquireTime":"2020-10-20T04:06:31.431000Z","renewTime":"2020-10-22T08:51:36.843000Z","leaderTransitions":37981}'
48 49 50
 */
public class KubernetesLeaderElector {

51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderElector.class);

    @VisibleForTesting
    public static final String LEADER_ANNOTATION_KEY = "control-plane.alpha.kubernetes.io/leader";

    private final ExecutorService executorService =
            Executors.newSingleThreadExecutor(
                    new ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService"));

    private final LeaderElector<NamespacedKubernetesClient> internalLeaderElector;

    public KubernetesLeaderElector(
            NamespacedKubernetesClient kubernetesClient,
            KubernetesLeaderElectionConfiguration leaderConfig,
            LeaderCallbackHandler leaderCallbackHandler) {
        final LeaderElectionConfig leaderElectionConfig =
                new LeaderElectionConfigBuilder()
                        .withName(leaderConfig.getConfigMapName())
                        .withLeaseDuration(leaderConfig.getLeaseDuration())
                        .withLock(
                                new ConfigMapLock(
72
                                        kubernetesClient.getNamespace(),
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
                                        leaderConfig.getConfigMapName(),
                                        leaderConfig.getLockIdentity()))
                        .withRenewDeadline(leaderConfig.getRenewDeadline())
                        .withRetryPeriod(leaderConfig.getRetryPeriod())
                        .withLeaderCallbacks(
                                new LeaderCallbacks(
                                        leaderCallbackHandler::isLeader,
                                        leaderCallbackHandler::notLeader,
                                        newLeader ->
                                                LOG.info(
                                                        "New leader elected {} for {}.",
                                                        newLeader,
                                                        leaderConfig.getConfigMapName())))
                        .build();
        internalLeaderElector = new LeaderElector<>(kubernetesClient, leaderElectionConfig);
        LOG.info(
                "Create KubernetesLeaderElector {} with lock identity {}.",
                leaderConfig.getConfigMapName(),
                leaderConfig.getLockIdentity());
    }

    public void run() {
        executorService.submit(internalLeaderElector::run);
    }

    public void stop() {
        executorService.shutdownNow();
    }

    public static boolean hasLeadership(KubernetesConfigMap configMap, String lockIdentity) {
        final String leader = configMap.getAnnotations().get(LEADER_ANNOTATION_KEY);
        return leader != null && leader.contains(lockIdentity);
    }

    /** Callback handler for leader election. */
    public abstract static class LeaderCallbackHandler {

        public abstract void isLeader();

        public abstract void notLeader();
    }
114
}