PythonCommandExecutor.java 5.8 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
17

Q
qiaozhanwei 已提交
18
package org.apache.dolphinscheduler.server.worker.task;
L
ligang 已提交
19

Q
qiaozhanwei 已提交
20 21
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.FileUtils;
22
import org.apache.dolphinscheduler.common.utils.StringUtils;
Q
qiaozhanwei 已提交
23
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
L
ligang 已提交
24

25 26 27 28 29
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
L
ligang 已提交
30 31 32
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
33
import java.util.Collections;
L
ligang 已提交
34 35
import java.util.List;
import java.util.function.Consumer;
36 37 38 39
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
L
ligang 已提交
40 41 42 43 44 45

/**
 * python command executor
 */
public class PythonCommandExecutor extends AbstractCommandExecutor {

46 47 48
    /**
     * logger
     */
49 50
    private static final Logger logger = LoggerFactory.getLogger(PythonCommandExecutor.class);

51 52 53
    /**
     * python
     */
L
ligang 已提交
54
    public static final String PYTHON = "python";
55
    private static final Pattern PYTHON_PATH_PATTERN = Pattern.compile("/bin/python[\\d.]*$");
L
ligang 已提交
56

57 58 59
    /**
     * constructor
     * @param logHandler    log handler
60
     * @param taskExecutionContext       taskExecutionContext
61 62
     * @param logger        logger
     */
L
ligang 已提交
63
    public PythonCommandExecutor(Consumer<List<String>> logHandler,
64
                                 TaskExecutionContext taskExecutionContext,
journey2018's avatar
journey2018 已提交
65
                                 Logger logger) {
66
        super(logHandler,taskExecutionContext,logger);
L
ligang 已提交
67 68 69 70 71 72
    }


    /**
     * build command file path
     *
73
     * @return command file path
L
ligang 已提交
74 75 76
     */
    @Override
    protected String buildCommandFilePath() {
77
        return String.format("%s/py_%s.command", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
L
ligang 已提交
78 79 80 81
    }

    /**
     * create command file if not exists
82 83 84
     * @param execCommand   exec command
     * @param commandFile   command file
     * @throws IOException  io exception
L
ligang 已提交
85 86 87
     */
    @Override
    protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
88
        logger.info("tenantCode :{}, task dir:{}", taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
L
ligang 已提交
89 90 91 92

        if (!Files.exists(Paths.get(commandFile))) {
            logger.info("generate command file:{}", commandFile);

93
            StringBuilder sb = new StringBuilder();
L
ligang 已提交
94 95 96
            sb.append("#-*- encoding=utf8 -*-\n");

            sb.append("\n\n");
97
            sb.append(execCommand);
L
ligang 已提交
98 99 100
            logger.info(sb.toString());

            // write data to file
journey2018's avatar
journey2018 已提交
101 102 103
            FileUtils.writeStringToFile(new File(commandFile),
                    sb.toString(),
                    StandardCharsets.UTF_8);
L
ligang 已提交
104 105 106
        }
    }

107 108 109 110 111 112 113 114 115 116
    /**
     * get command options
     * @return command options list
     */
    @Override
    protected List<String> commandOptions() {
        // unbuffered binary stdout and stderr
        return Collections.singletonList("-u");
    }

117
    /**
118 119
     * Gets the command path to which Python can execute
     * @return python command path
120
     */
L
ligang 已提交
121
    @Override
122
    protected String commandInterpreter() {
123
        String pythonHome = getPythonHome(taskExecutionContext.getEnvFile());
124 125 126 127 128 129 130 131 132 133 134
        return getPythonCommand(pythonHome);
    }

    /**
     * get python command
     *
     * @param pythonHome python home
     * @return python command
     */
    public static String getPythonCommand(String pythonHome) {
        if (StringUtils.isEmpty(pythonHome)) {
135 136
            return PYTHON;
        }
137 138 139 140 141 142 143
        File file = new File(pythonHome);
        if (file.exists() && file.isFile()) {
            return pythonHome;
        }
        if (PYTHON_PATH_PATTERN.matcher(pythonHome).find()) {
            return pythonHome;
        }
144
        return Paths.get(pythonHome, "/bin/python").toString();
L
ligang 已提交
145 146
    }

147
    /**
148
     * get python home
journey2018's avatar
journey2018 已提交
149
     *
150 151
     * @param envPath env path
     * @return python home
152
     */
153
    public static String getPythonHome(String envPath) {
154 155 156 157
        BufferedReader br = null;
        StringBuilder sb = new StringBuilder();
        try {
            br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath)));
journey2018's avatar
journey2018 已提交
158
            String line;
159 160
            while ((line = br.readLine()) != null) {
                if (line.contains(Constants.PYTHON_HOME)) {
161 162 163 164 165
                    sb.append(line);
                    break;
                }
            }
            String result = sb.toString();
166
            if (StringUtils.isEmpty(result)) {
167 168
                return null;
            }
journey2018's avatar
journey2018 已提交
169
            String[] arrs = result.split(Constants.EQUAL_SIGN);
170
            if (arrs.length == 2) {
171 172
                return arrs[1];
            }
173 174 175
        } catch (IOException e) {
            logger.error("read file failure", e);
        } finally {
176
            try {
177
                if (br != null) {
178 179 180
                    br.close();
                }
            } catch (IOException e) {
181
                logger.error(e.getMessage(), e);
182 183 184 185 186
            }
        }
        return null;
    }

L
ligang 已提交
187
}