From 7029062f4c2f247e9eac333e28e36e66b03fb435 Mon Sep 17 00:00:00 2001 From: Kirs Date: Tue, 14 Sep 2021 01:41:54 +0800 Subject: [PATCH] =?UTF-8?q?[Improvement][Task]Check=20the=20task=20plugin?= =?UTF-8?q?=20configuration=20when=20the=20worke=E2=80=A6=20(#6184)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [Improvement][Task]Check the task plugin configuration when the worker starts --- .../server/worker/WorkerServer.java | 10 ++++-- .../worker/runner/TaskExecuteThread.java | 19 ++++++++---- .../exception/PluginNotFoundException.java | 31 +++++++++++++++++++ .../server/StandaloneServer.java | 13 ++++++++ 4 files changed, 65 insertions(+), 8 deletions(-) create mode 100644 dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/exception/PluginNotFoundException.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 05a01403a..9705b4480 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -35,10 +35,13 @@ import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThr import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException; import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader; import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig; import org.apache.dolphinscheduler.spi.utils.StringUtils; +import org.apache.commons.collections4.MapUtils; + import java.util.Set; import javax.annotation.PostConstruct; @@ -180,12 +183,15 @@ public class WorkerServer implements IStoppable { taskPluginManagerConfig.setMavenLocalRepository(workerConfig.getMavenLocalRepository().trim()); } - DolphinPluginLoader alertPluginLoader = new DolphinPluginLoader(taskPluginManagerConfig, ImmutableList.of(taskPluginManager)); + DolphinPluginLoader taskPluginLoader = new DolphinPluginLoader(taskPluginManagerConfig, ImmutableList.of(taskPluginManager)); try { - alertPluginLoader.loadPlugins(); + taskPluginLoader.loadPlugins(); } catch (Exception e) { throw new RuntimeException("Load Task Plugin Failed !", e); } + if (MapUtils.isEmpty(taskPluginManager.getTaskChannelMap())) { + throw new PluginNotFoundException("Task Plugin Not Found,Please Check Config File"); + } } public void close(String cause) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index c06b2653d..5a164e887 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException; import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.TaskChannel; import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager; @@ -61,7 +62,7 @@ import org.slf4j.LoggerFactory; import com.github.rholder.retry.RetryException; /** - * task scheduler thread + * task scheduler thread */ public class TaskExecuteThread implements Runnable, Delayed { @@ -103,7 +104,8 @@ public class TaskExecuteThread implements Runnable, Delayed { private TaskPluginManager taskPluginManager; /** - * constructor + * constructor + * * @param taskExecutionContext taskExecutionContext * @param taskCallbackService taskCallbackService */ @@ -128,7 +130,7 @@ public class TaskExecuteThread implements Runnable, Delayed { @Override public void run() { - TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(),taskExecutionContext.getProcessInstanceId()); + TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId()); try { logger.info("script path : {}", taskExecutionContext.getExecutePath()); // check if the OS user exists @@ -161,6 +163,9 @@ public class TaskExecuteThread implements Runnable, Delayed { taskExecutionContext.getTaskInstanceId())); TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()); + if (null == taskChannel) { + throw new PluginNotFoundException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType())); + } //TODO Temporary operation, To be adjusted TaskRequest taskRequest = JSONUtils.parseObject(JSONUtils.toJsonString(taskExecutionContext), TaskRequest.class); @@ -229,6 +234,7 @@ public class TaskExecuteThread implements Runnable, Delayed { /** * get global paras map + * * @return map */ private Map getGlobalParamsMap() { @@ -251,7 +257,7 @@ public class TaskExecuteThread implements Runnable, Delayed { try { task.cancelApplication(true); } catch (Exception e) { - logger.error(e.getMessage(),e); + logger.error(e.getMessage(), e); } } } @@ -270,7 +276,7 @@ public class TaskExecuteThread implements Runnable, Delayed { Set> resEntries = projectRes.entrySet(); - for (Map.Entry resource : resEntries) { + for (Map.Entry resource : resEntries) { String fullName = resource.getKey(); String tenantCode = resource.getValue(); File resFile = new File(execLocalPath, fullName); @@ -282,7 +288,7 @@ public class TaskExecuteThread implements Runnable, Delayed { logger.info("get resource file from hdfs :{}", resHdfsPath); HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true); } catch (Exception e) { - logger.error(e.getMessage(),e); + logger.error(e.getMessage(), e); throw new RuntimeException(e.getMessage()); } } else { @@ -329,6 +335,7 @@ public class TaskExecuteThread implements Runnable, Delayed { /** * get current TaskExecutionContext + * * @return TaskExecutionContext */ public TaskExecutionContext getTaskExecutionContext() { diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/exception/PluginNotFoundException.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/exception/PluginNotFoundException.java new file mode 100644 index 000000000..2153299f1 --- /dev/null +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/exception/PluginNotFoundException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.spi.exception; + +public class PluginNotFoundException extends RuntimeException { + + private static final long serialVersionUID = -5487812425126112159L; + + public PluginNotFoundException(String message, Throwable cause) { + super(message, cause); + } + + public PluginNotFoundException(String message) { + super(message); + } +} diff --git a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java index 5360ddabe..94b6ca7bc 100644 --- a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java +++ b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java @@ -61,6 +61,8 @@ public class StandaloneServer { startAlertServer(); + setTaskPlugin(); + new SpringApplicationBuilder( ApiApplicationServer.class, MasterServer.class, @@ -114,4 +116,15 @@ public class StandaloneServer { final ScriptRunner runner = new ScriptRunner(ds.getConnection(), true, true); runner.runScript(new FileReader("sql/dolphinscheduler_h2.sql")); } + + private static void setTaskPlugin() { + final Path taskPluginPath = Paths.get( + StandaloneServer.class.getProtectionDomain().getCodeSource().getLocation().getPath(), + "../../../dolphinscheduler-task-plugin/dolphinscheduler-task-shell/pom.xml" + ).toAbsolutePath(); + if (Files.exists(taskPluginPath)) { + System.setProperty("task.plugin.binding", taskPluginPath.toString()); + System.setProperty("task.plugin.dir", ""); + } + } } -- GitLab