DefaultKubeClientFactory.java 4.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.kubernetes.kubeclient;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.FileUtils;

import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
29
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
30 31 32 33 34 35 36 37
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

38
/** Default implementation of the {@link KubeClientFactory}. */
39 40
public class DefaultKubeClientFactory implements KubeClientFactory {

41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
    private static final Logger LOG = LoggerFactory.getLogger(DefaultKubeClientFactory.class);

    private static final DefaultKubeClientFactory INSTANCE = new DefaultKubeClientFactory();

    public static DefaultKubeClientFactory getInstance() {
        return INSTANCE;
    }

    public FlinkKubeClient fromConfiguration(Configuration flinkConfig) {
        return fromConfiguration(flinkConfig, createThreadPoolForAsyncIO());
    }

    public FlinkKubeClient fromConfiguration(Configuration flinkConfig, Executor ioExecutor) {
        final Config config;

        final String kubeContext = flinkConfig.getString(KubernetesConfigOptions.CONTEXT);
        if (kubeContext != null) {
            LOG.info("Configuring kubernetes client to use context {}.", kubeContext);
        }

        final String kubeConfigFile =
                flinkConfig.getString(KubernetesConfigOptions.KUBE_CONFIG_FILE);
        if (kubeConfigFile != null) {
            LOG.debug("Trying to load kubernetes config from file: {}.", kubeConfigFile);
            try {
                // If kubeContext is null, the default context in the kubeConfigFile will be used.
                // Note: the third parameter kubeconfigPath is optional and is set to null. It is
                // only used to rewrite
                // relative tls asset paths inside kubeconfig when a file is passed, and in the case
                // that the kubeconfig
                // references some assets via relative paths.
                config =
                        Config.fromKubeconfig(
                                kubeContext,
                                FileUtils.readFileUtf8(new File(kubeConfigFile)),
                                null);
            } catch (IOException e) {
                throw new KubernetesClientException("Load kubernetes config failed.", e);
            }
        } else {
            LOG.debug("Trying to load default kubernetes config.");

            config = Config.autoConfigure(kubeContext);
        }

86 87 88 89 90
        final String namespace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
        LOG.debug("Setting namespace of Kubernetes client to {}", namespace);
        config.setNamespace(namespace);

        final NamespacedKubernetesClient client = new DefaultKubernetesClient(config);
91 92 93 94 95 96 97

        return new Fabric8FlinkKubeClient(flinkConfig, client, () -> ioExecutor);
    }

    private static Executor createThreadPoolForAsyncIO() {
        return Executors.newFixedThreadPool(2, new ExecutorThreadFactory("FlinkKubeClient-IO"));
    }
98
}