TaskExecuteThread.java 9.9 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.server.worker.runner;
L
ligang 已提交
18

19 20 21 22 23 24
import java.io.File;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
B
bao liang 已提交
25
import java.util.Set;
L
ligang 已提交
26

B
bao liang 已提交
27
import org.apache.commons.collections.MapUtils;
Q
qiaozhanwei 已提交
28
import org.apache.dolphinscheduler.common.Constants;
Q
qiaozhanwei 已提交
29 30 31 32
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
Q
qiaozhanwei 已提交
33
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
34 35 36 37
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
T
Tboy 已提交
38
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
Q
qiaozhanwei 已提交
39
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
40 41
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
Q
qiaozhanwei 已提交
42
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
Q
qiaozhanwei 已提交
43 44
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
45
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
L
ligang 已提交
46 47 48 49 50 51 52
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 *  task scheduler thread
 */
T
Tboy 已提交
53
public class TaskExecuteThread implements Runnable {
L
ligang 已提交
54 55 56 57

    /**
     * logger
     */
T
Tboy 已提交
58
    private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
L
ligang 已提交
59 60 61 62

    /**
     *  task instance
     */
T
Tboy 已提交
63
    private TaskExecutionContext taskExecutionContext;
L
ligang 已提交
64 65

    /**
journey2018's avatar
journey2018 已提交
66
     *  abstract task
L
ligang 已提交
67 68 69
     */
    private AbstractTask task;

T
Tboy 已提交
70
    /**
T
Tboy 已提交
71
     *  task callback service
T
Tboy 已提交
72
     */
T
Tboy 已提交
73
    private TaskCallbackService taskCallbackService;
T
Tboy 已提交
74

75 76 77 78 79
    /**
     * taskExecutionContextCacheManager
     */
    private TaskExecutionContextCacheManager taskExecutionContextCacheManager;

80 81 82 83 84
    /**
     * task logger
     */
    private Logger taskLogger;

85
    /**
86
     *  constructor
T
Tboy 已提交
87
     * @param taskExecutionContext taskExecutionContext
T
Tboy 已提交
88
     * @param taskCallbackService taskCallbackService
89
     */
90 91 92
    public TaskExecuteThread(TaskExecutionContext taskExecutionContext
            , TaskCallbackService taskCallbackService
            , Logger taskLogger) {
T
Tboy 已提交
93
        this.taskExecutionContext = taskExecutionContext;
T
Tboy 已提交
94
        this.taskCallbackService = taskCallbackService;
95
        this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
96
        this.taskLogger = taskLogger;
L
ligang 已提交
97 98 99
    }

    @Override
journey2018's avatar
journey2018 已提交
100
    public void run() {
L
ligang 已提交
101

T
Tboy 已提交
102
        TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
L
ligang 已提交
103
        try {
T
Tboy 已提交
104
            logger.info("script path : {}", taskExecutionContext.getExecutePath());
journey2018's avatar
journey2018 已提交
105
            // task node
张世鸣 已提交
106
            TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
L
ligang 已提交
107

journey2018's avatar
journey2018 已提交
108
            // copy hdfs/minio file to local
109
            downloadResource(taskExecutionContext.getExecutePath(),
110
                    taskExecutionContext.getResources(),
L
ligang 已提交
111 112
                    logger);

113 114 115 116
            taskExecutionContext.setTaskParams(taskNode.getParams());
            taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
            taskExecutionContext.setDefinedParams(getGlobalParamsMap());

117
            // set task timeout
118
            setTaskTimeout(taskExecutionContext, taskNode);
119

120
            taskExecutionContext.setTaskAppId(String.format("%s_%s_%s",
T
Tboy 已提交
121 122
                    taskExecutionContext.getProcessDefineId(),
                    taskExecutionContext.getProcessInstanceId(),
T
Tboy 已提交
123
                    taskExecutionContext.getTaskInstanceId()));
124

125
            task = TaskManager.newTask(taskExecutionContext, taskLogger);
126 127 128 129 130 131 132 133 134 135

            // task init
            task.init();

            // task handle
            task.handle();

            // task result process
            task.after();

T
Tboy 已提交
136 137
            responseCommand.setStatus(task.getExitStatus().getCode());
            responseCommand.setEndTime(new Date());
138 139
            responseCommand.setProcessId(task.getProcessId());
            responseCommand.setAppIds(task.getAppIds());
T
Tboy 已提交
140
            logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
L
ligang 已提交
141
        }catch (Exception e){
journey2018's avatar
journey2018 已提交
142
            logger.error("task scheduler failure", e);
L
ligang 已提交
143
            kill();
T
Tboy 已提交
144 145
            responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
            responseCommand.setEndTime(new Date());
146 147
            responseCommand.setProcessId(task.getProcessId());
            responseCommand.setAppIds(task.getAppIds());
T
Tboy 已提交
148
        } finally {
Q
qiaozhanwei 已提交
149
            try {
150
                taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
Q
qiaozhanwei 已提交
151 152 153 154 155
                taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
            }catch (Exception e){
                ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
                taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
            }
L
ligang 已提交
156 157 158 159
        }
    }

    /**
journey2018's avatar
journey2018 已提交
160 161 162 163 164 165 166
     * get global paras map
     * @return
     */
    private Map<String, String> getGlobalParamsMap() {
        Map<String,String> globalParamsMap = new HashMap<>(16);

        // global params string
T
Tboy 已提交
167
        String globalParamsStr = taskExecutionContext.getGlobalParams();
journey2018's avatar
journey2018 已提交
168
        if (globalParamsStr != null) {
169
            List<Property> globalParamsList = JSONUtils.toList(globalParamsStr, Property.class);
journey2018's avatar
journey2018 已提交
170 171 172 173
            globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)));
        }
        return globalParamsMap;
    }
174

journey2018's avatar
journey2018 已提交
175 176
    /**
     * set task timeout
177
     * @param taskExecutionContext TaskExecutionContext
L
ligang 已提交
178 179
     * @param taskNode
     */
180
    private void setTaskTimeout(TaskExecutionContext taskExecutionContext, TaskNode taskNode) {
journey2018's avatar
journey2018 已提交
181
        // the default timeout is the maximum value of the integer
182
        taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
L
ligang 已提交
183 184
        TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
        if (taskTimeoutParameter.getEnable()){
journey2018's avatar
journey2018 已提交
185
            // get timeout strategy
186
            taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode());
L
ligang 已提交
187 188 189 190 191
            switch (taskTimeoutParameter.getStrategy()){
                case WARN:
                    break;
                case FAILED:
                    if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval() * 60) {
192
                        taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60);
L
ligang 已提交
193 194 195 196
                    }
                    break;
                case WARNFAILED:
                    if (Integer.MAX_VALUE > taskTimeoutParameter.getInterval() * 60) {
197
                        taskExecutionContext.setTaskTimeout(taskTimeoutParameter.getInterval() * 60);
L
ligang 已提交
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
                    }
                    break;
                default:
                    logger.error("not support task timeout strategy: {}", taskTimeoutParameter.getStrategy());
                    throw new IllegalArgumentException("not support task timeout strategy");

            }
        }
    }


    /**
     *  kill task
     */
    public void kill(){
        if (task != null){
            try {
                task.cancelApplication(true);
            }catch (Exception e){
                logger.error(e.getMessage(),e);
            }
        }
    }


    /**
224
     * download resource file
L
ligang 已提交
225 226 227 228 229
     *
     * @param execLocalPath
     * @param projectRes
     * @param logger
     */
230
    private void downloadResource(String execLocalPath,
B
bao liang 已提交
231
                                  Map<String,String> projectRes,
232
                                  Logger logger) throws Exception {
B
bao liang 已提交
233
        if (MapUtils.isEmpty(projectRes)){
234 235 236
            return;
        }

B
bao liang 已提交
237 238 239 240 241 242
        Set<Map.Entry<String, String>> resEntries = projectRes.entrySet();

        for (Map.Entry<String,String> resource : resEntries) {
            String fullName = resource.getKey();
            String tenantCode = resource.getValue();
            File resFile = new File(execLocalPath, fullName);
L
ligang 已提交
243 244
            if (!resFile.exists()) {
                try {
journey2018's avatar
journey2018 已提交
245
                    // query the tenant code of the resource according to the name of the resource
B
bao liang 已提交
246
                    String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tenantCode, fullName);
L
ligang 已提交
247 248

                    logger.info("get resource file from hdfs :{}", resHdfsPath);
B
bao liang 已提交
249
                    HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true);
L
ligang 已提交
250 251 252 253 254 255 256 257 258 259
                }catch (Exception e){
                    logger.error(e.getMessage(),e);
                    throw new RuntimeException(e.getMessage());
                }
            } else {
                logger.info("file : {} exists ", resFile.getName());
            }
        }
    }
}