SparkTask.java 3.8 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.server.worker.task.spark;
L
ligang 已提交
18

T
Tboy 已提交
19 20
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.SparkVersion;
Q
qiaozhanwei 已提交
21 22 23 24 25 26 27 28 29
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
L
ligang 已提交
30 31 32 33 34 35 36
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
37
 * spark task
L
ligang 已提交
38 39 40 41
 */
public class SparkTask extends AbstractYarnTask {

  /**
T
Tboy 已提交
42 43 44 45 46 47
   * spark1 command
   */
  private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit";

  /**
   * spark2 command
L
ligang 已提交
48
   */
T
Tboy 已提交
49
  private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit";
L
ligang 已提交
50 51 52 53 54 55

  /**
   *  spark parameters
   */
  private SparkParameters sparkParameters;

L
lgcareer 已提交
56 57
  public SparkTask(TaskProps props, Logger logger) {
    super(props, logger);
L
ligang 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
  }

  @Override
  public void init() {

    logger.info("spark task params {}", taskProps.getTaskParams());

    sparkParameters = JSONUtils.parseObject(taskProps.getTaskParams(), SparkParameters.class);

    if (!sparkParameters.checkParameters()) {
      throw new RuntimeException("spark task params is not valid");
    }
    sparkParameters.setQueue(taskProps.getQueue());

    if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
      String args = sparkParameters.getMainArgs();

      /**
       *  combining local and global parameters
       */
      Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
              taskProps.getDefinedParams(),
              sparkParameters.getLocalParametersMap(),
journey2018's avatar
journey2018 已提交
81 82
              taskProps.getCmdTypeIfComplement(),
              taskProps.getScheduleTime());
L
ligang 已提交
83 84 85 86 87 88 89 90
      if (paramsMap != null ){
        args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
      }
      sparkParameters.setMainArgs(args);
    }
  }

  /**
91 92
   * create command
   * @return command
L
ligang 已提交
93 94 95 96 97
   */
  @Override
  protected String buildCommand() {
    List<String> args = new ArrayList<>();

T
Tboy 已提交
98 99 100 101 102 103 104 105
    //spark version
    String sparkCommand = SPARK2_COMMAND;

    if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
      sparkCommand = SPARK1_COMMAND;
    }

    args.add(sparkCommand);
L
ligang 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122

    // other parameters
    args.addAll(SparkArgsUtils.buildArgs(sparkParameters));

    String command = ParameterUtils
            .convertParameterPlaceholders(String.join(" ", args), taskProps.getDefinedParams());

    logger.info("spark task command : {}", command);

    return command;
  }

  @Override
  public AbstractParameters getParameters() {
    return sparkParameters;
  }
}