PythonCommandExecutor.java 5.6 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.server.worker.task;
L
ligang 已提交
18

Q
qiaozhanwei 已提交
19 20
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.FileUtils;
21
import org.apache.dolphinscheduler.common.utils.StringUtils;
L
ligang 已提交
22
import org.slf4j.Logger;
23
import org.slf4j.LoggerFactory;
L
ligang 已提交
24

25
import java.io.*;
L
ligang 已提交
26 27 28 29 30 31 32 33 34 35 36 37
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;

/**
 * python command executor
 */
public class PythonCommandExecutor extends AbstractCommandExecutor {

38 39 40
    /**
     * logger
     */
41 42
    private static final Logger logger = LoggerFactory.getLogger(PythonCommandExecutor.class);

43 44 45
    /**
     * python
     */
L
ligang 已提交
46 47 48
    public static final String PYTHON = "python";


49 50 51 52 53 54 55 56 57 58 59 60
    /**
     * constructor
     * @param logHandler    log handler
     * @param taskDir       task dir
     * @param taskAppId     task app id
     * @param taskInstId    task instance id
     * @param tenantCode    tenant code
     * @param envFile       env file
     * @param startTime     start time
     * @param timeout       timeout
     * @param logger        logger
     */
L
ligang 已提交
61
    public PythonCommandExecutor(Consumer<List<String>> logHandler,
journey2018's avatar
journey2018 已提交
62 63 64 65 66 67 68 69 70
                                 String taskDir,
                                 String taskAppId,
                                 int taskInstId,
                                 String tenantCode,
                                 String envFile,
                                 Date startTime,
                                 int timeout,
                                 Logger logger) {
        super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout, logger);
L
ligang 已提交
71 72 73 74 75 76
    }


    /**
     * build command file path
     *
77
     * @return command file path
L
ligang 已提交
78 79 80 81 82 83 84 85
     */
    @Override
    protected String buildCommandFilePath() {
        return String.format("%s/py_%s.command", taskDir, taskAppId);
    }

    /**
     * create command file if not exists
86 87 88
     * @param execCommand   exec command
     * @param commandFile   command file
     * @throws IOException  io exception
L
ligang 已提交
89 90 91
     */
    @Override
    protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
journey2018's avatar
journey2018 已提交
92
        logger.info("tenantCode :{}, task dir:{}", tenantCode, taskDir);
L
ligang 已提交
93 94 95 96

        if (!Files.exists(Paths.get(commandFile))) {
            logger.info("generate command file:{}", commandFile);

97
            StringBuilder sb = new StringBuilder();
L
ligang 已提交
98 99 100
            sb.append("#-*- encoding=utf8 -*-\n");

            sb.append("\n\n");
101
            sb.append(execCommand);
L
ligang 已提交
102 103 104
            logger.info(sb.toString());

            // write data to file
journey2018's avatar
journey2018 已提交
105 106 107
            FileUtils.writeStringToFile(new File(commandFile),
                    sb.toString(),
                    StandardCharsets.UTF_8);
L
ligang 已提交
108 109 110
        }
    }

111 112 113 114
    /**
     * get python home
     * @return python home
     */
L
ligang 已提交
115
    @Override
116
    protected String commandInterpreter() {
journey2018's avatar
journey2018 已提交
117
        String pythonHome = getPythonHome(envFile);
118 119 120
        if (StringUtils.isEmpty(pythonHome)){
            return PYTHON;
        }
journey2018's avatar
journey2018 已提交
121
        return pythonHome;
L
ligang 已提交
122 123
    }

124 125 126 127 128
    /**
     * check find yarn application id
     * @param line line
     * @return boolean
     */
L
ligang 已提交
129 130 131 132 133
    @Override
    protected boolean checkFindApp(String line) {
        return true;
    }

134 135

    /**
journey2018's avatar
journey2018 已提交
136 137 138 139 140 141 142 143
     *  get the absolute path of the Python command
     *  note :
     *  common.properties
     *  PYTHON_HOME configured under common.properties is Python absolute path, not PYTHON_HOME itself
     *
     *  for example :
     *  your PYTHON_HOM is /opt/python3.7/
     *  you must set PYTHON_HOME is /opt/python3.7/python under nder common.properties
144
     *  dolphinscheduler.env.path file.
journey2018's avatar
journey2018 已提交
145
     *
146 147
     * @param envPath env path
     * @return python home
148 149 150 151 152 153
     */
    private static String getPythonHome(String envPath){
        BufferedReader br = null;
        StringBuilder sb = new StringBuilder();
        try {
            br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath)));
journey2018's avatar
journey2018 已提交
154
            String line;
155
            while ((line = br.readLine()) != null){
156
                if (line.contains(Constants.PYTHON_HOME)){
157 158 159 160 161 162 163 164
                    sb.append(line);
                    break;
                }
            }
            String result = sb.toString();
            if (org.apache.commons.lang.StringUtils.isEmpty(result)){
                return null;
            }
journey2018's avatar
journey2018 已提交
165
            String[] arrs = result.split(Constants.EQUAL_SIGN);
166 167 168 169 170
            if (arrs.length == 2){
                return arrs[1];
            }

        }catch (IOException e){
journey2018's avatar
journey2018 已提交
171
            logger.error("read file failure",e);
172 173 174 175 176 177 178 179 180 181 182 183
        }finally {
            try {
                if (br != null){
                    br.close();
                }
            } catch (IOException e) {
                logger.error(e.getMessage(),e);
            }
        }
        return null;
    }

L
ligang 已提交
184
}