SqlTask.java 22.8 KB
Newer Older
K
Kirs 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dolphinscheduler.plugin.task.sql;

20 21
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
22 23
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
24
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
25 26 27 28 29 30
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
31 32 33 34 35 36 37 38 39 40
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
41
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
K
Kirs 已提交
42
import org.apache.dolphinscheduler.spi.enums.DbType;
43 44

import org.apache.commons.collections4.CollectionUtils;
45
import org.apache.commons.lang3.StringUtils;
46

K
Kirs 已提交
47 48 49 50 51 52 53 54
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.MessageFormat;
import java.util.ArrayList;
55
import java.util.Collections;
K
Kirs 已提交
56 57 58 59 60 61 62 63
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

64 65 66 67 68
import org.slf4j.Logger;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

69
public class SqlTask extends AbstractTask {
K
Kirs 已提交
70 71 72 73

    /**
     * taskExecutionContext
     */
74
    private TaskExecutionContext taskExecutionContext;
K
Kirs 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87

    /**
     * sql parameters
     */
    private SqlParameters sqlParameters;

    /**
     * base datasource
     */
    private BaseConnectionParam baseConnectionParam;

    /**
     * create function format
88
     * include replace here which can be compatible with more cases, for example a long-running Spark session in Kyuubi will keep its own temp functions instead of destroying them right away
K
Kirs 已提交
89
     */
90 91
    private static final String CREATE_OR_REPLACE_FUNCTION_FORMAT =
            "create or replace temporary function {0} as ''{1}''";
K
Kirs 已提交
92

93 94 95
    /**
     * default query sql limit
     */
96
    private static final int QUERY_LIMIT = 10000;
97

98 99
    private SQLTaskExecutionContext sqlTaskExecutionContext;

100 101
    public static final int TEST_FLAG_YES = 1;

102 103
    private static final String SQL_SEPARATOR = ";\n";

K
Kirs 已提交
104 105 106 107 108
    /**
     * Abstract Yarn Task
     *
     * @param taskRequest taskRequest
     */
109
    public SqlTask(TaskExecutionContext taskRequest) {
K
Kirs 已提交
110 111 112 113 114
        super(taskRequest);
        this.taskExecutionContext = taskRequest;
        this.sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);

        assert sqlParameters != null;
115 116 117
        if (taskExecutionContext.getTestFlag() == TEST_FLAG_YES && this.sqlParameters.getDatasource() == 0) {
            throw new RuntimeException("unbound test data source");
        }
K
Kirs 已提交
118 119 120
        if (!sqlParameters.checkParameters()) {
            throw new RuntimeException("sql task params is not valid");
        }
121

122 123
        sqlTaskExecutionContext =
                sqlParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
K
Kirs 已提交
124 125 126 127
    }

    @Override
    public AbstractParameters getParameters() {
K
Kirs 已提交
128
        return sqlParameters;
K
Kirs 已提交
129 130 131
    }

    @Override
132
    public void handle(TaskCallBack taskCallBack) throws TaskException {
K
Kirs 已提交
133
        logger.info("Full sql parameters: {}", sqlParameters);
134 135
        logger.info(
                "sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit  {}",
K
Kirs 已提交
136 137 138 139 140 141 142 143 144
                sqlParameters.getType(),
                sqlParameters.getDatasource(),
                sqlParameters.getSql(),
                sqlParameters.getLocalParams(),
                sqlParameters.getUdfs(),
                sqlParameters.getShowType(),
                sqlParameters.getConnParams(),
                sqlParameters.getVarPool(),
                sqlParameters.getLimit());
145
        String separator = SQL_SEPARATOR;
K
Kirs 已提交
146 147 148
        try {

            // get datasource
149
            baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(
K
Kirs 已提交
150 151
                    DbType.valueOf(sqlParameters.getType()),
                    sqlTaskExecutionContext.getConnectionParams());
152 153 154
            if (DbType.valueOf(sqlParameters.getType()).isSupportMultipleStatement()) {
                separator = "";
            }
K
Kirs 已提交
155
            // ready to execute SQL and parameter entity Map
156
            List<SqlBinds> mainStatementSqlBinds = split(sqlParameters.getSql(), separator)
157 158 159 160
                    .stream()
                    .map(this::getSqlAndSqlParamsMap)
                    .collect(Collectors.toList());

K
Kirs 已提交
161 162 163 164 165 166 167 168 169 170 171
            List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements())
                    .orElse(new ArrayList<>())
                    .stream()
                    .map(this::getSqlAndSqlParamsMap)
                    .collect(Collectors.toList());
            List<SqlBinds> postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements())
                    .orElse(new ArrayList<>())
                    .stream()
                    .map(this::getSqlAndSqlParamsMap)
                    .collect(Collectors.toList());

172
            List<String> createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncParametersList(), logger);
K
Kirs 已提交
173 174

            // execute sql task
175
            executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
K
Kirs 已提交
176 177 178 179 180

            setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);

        } catch (Exception e) {
            setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
181
            logger.error("sql task error", e);
182
            throw new TaskException("Execute sql task failed", e);
K
Kirs 已提交
183 184 185
        }
    }

186 187 188 189 190
    @Override
    public void cancel() throws TaskException {

    }

191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
    /**
     * split sql by segment separator
     * <p>The segment separator is used
     * when the data source does not support multi-segment SQL execution,
     * and the client needs to split the SQL and execute it multiple times.</p>
     * @param sql
     * @param segmentSeparator
     * @return
     */
    public static List<String> split(String sql, String segmentSeparator) {
        if (StringUtils.isEmpty(segmentSeparator)) {
            return Collections.singletonList(sql);
        }

        String[] lines = sql.split(segmentSeparator);
        List<String> segments = new ArrayList<>();
        for (String line : lines) {
            if (line.trim().isEmpty() || line.startsWith("--")) {
                continue;
            }
            segments.add(line);
        }
        return segments;
    }

K
Kirs 已提交
216 217 218
    /**
     * execute function and sql
     *
219
     * @param mainStatementsBinds main statements binds
K
Kirs 已提交
220 221 222 223
     * @param preStatementsBinds pre statements binds
     * @param postStatementsBinds post statements binds
     * @param createFuncs create functions
     */
224
    public void executeFuncAndSql(List<SqlBinds> mainStatementsBinds,
K
Kirs 已提交
225 226 227 228 229 230 231
                                  List<SqlBinds> preStatementsBinds,
                                  List<SqlBinds> postStatementsBinds,
                                  List<String> createFuncs) throws Exception {
        Connection connection = null;
        try {

            // create connection
232 233
            connection = DataSourceClientProvider.getInstance().getConnection(DbType.valueOf(sqlParameters.getType()),
                    baseConnectionParam);
K
Kirs 已提交
234 235 236 237 238
            // create temp function
            if (CollectionUtils.isNotEmpty(createFuncs)) {
                createTempFunction(connection, createFuncs);
            }

239 240
            // pre execute
            executeUpdate(connection, preStatementsBinds, "pre");
K
Kirs 已提交
241

242
            // main execute
K
Kirs 已提交
243 244 245 246
            String result = null;
            // decide whether to executeQuery or executeUpdate based on sqlType
            if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
                // query statements need to be convert to JsonArray and inserted into Alert to send
247
                result = executeQuery(connection, mainStatementsBinds.get(0), "main");
K
Kirs 已提交
248 249
            } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
                // non query statement
250
                String updateResult = executeUpdate(connection, mainStatementsBinds, "main");
K
Kirs 已提交
251 252
                result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams());
            }
253
            // deal out params
K
Kirs 已提交
254
            sqlParameters.dealOutParam(result);
255 256 257

            // post execute
            executeUpdate(connection, postStatementsBinds, "post");
K
Kirs 已提交
258 259 260 261
        } catch (Exception e) {
            logger.error("execute sql error: {}", e.getMessage());
            throw e;
        } finally {
262
            close(connection);
K
Kirs 已提交
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
        }
    }

    private String setNonQuerySqlReturn(String updateResult, List<Property> properties) {
        String result = null;
        for (Property info : properties) {
            if (Direct.OUT == info.getDirect()) {
                List<Map<String, String>> updateRL = new ArrayList<>();
                Map<String, String> updateRM = new HashMap<>();
                updateRM.put(info.getProp(), updateResult);
                updateRL.add(updateRM);
                result = JSONUtils.toJsonString(updateRL);
                break;
            }
        }
        return result;
    }

    /**
     * result process
     *
     * @param resultSet resultSet
     * @throws Exception Exception
     */
    private String resultProcess(ResultSet resultSet) throws Exception {
        ArrayNode resultJSONArray = JSONUtils.createArrayNode();
        if (resultSet != null) {
            ResultSetMetaData md = resultSet.getMetaData();
            int num = md.getColumnCount();

            int rowCount = 0;
294
            int limit = sqlParameters.getLimit() == 0 ? QUERY_LIMIT : sqlParameters.getLimit();
K
Kirs 已提交
295

296 297 298 299 300
            while (resultSet.next()) {
                if (rowCount == limit) {
                    logger.info("sql result limit : {} exceeding results are filtered", limit);
                    break;
                }
K
Kirs 已提交
301 302 303 304 305 306 307
                ObjectNode mapOfColValues = JSONUtils.createObjectNode();
                for (int i = 1; i <= num; i++) {
                    mapOfColValues.set(md.getColumnLabel(i), JSONUtils.toJsonNode(resultSet.getObject(i)));
                }
                resultJSONArray.add(mapOfColValues);
                rowCount++;
            }
308

309 310
            int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows()
                    : TaskConstants.DEFAULT_DISPLAY_ROWS;
311
            displayRows = Math.min(displayRows, rowCount);
K
Kirs 已提交
312 313 314 315 316 317
            logger.info("display sql result {} rows as follows:", displayRows);
            for (int i = 0; i < displayRows; i++) {
                String row = JSONUtils.toJsonString(resultJSONArray.get(i));
                logger.info("row {} : {}", i + 1, row);
            }
        }
318

319 320
        String result = resultJSONArray.isEmpty() ? JSONUtils.toJsonString(generateEmptyRow(resultSet))
                : JSONUtils.toJsonString(resultJSONArray);
321

322
        if (Boolean.TRUE.equals(sqlParameters.getSendEmail())) {
K
Kirs 已提交
323 324 325 326 327 328 329 330
            sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle())
                    ? sqlParameters.getTitle()
                    : taskExecutionContext.getTaskName() + " query result sets", result);
        }
        logger.debug("execute sql result : {}", result);
        return result;
    }

331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
    /**
     * generate empty Results as ArrayNode
     */
    private ArrayNode generateEmptyRow(ResultSet resultSet) throws SQLException {
        ArrayNode resultJSONArray = JSONUtils.createArrayNode();
        ObjectNode emptyOfColValues = JSONUtils.createObjectNode();
        if (resultSet != null) {
            ResultSetMetaData metaData = resultSet.getMetaData();
            int columnsNum = metaData.getColumnCount();
            logger.info("sql query results is empty");
            for (int i = 1; i <= columnsNum; i++) {
                emptyOfColValues.set(metaData.getColumnLabel(i), JSONUtils.toJsonNode(""));
            }
        } else {
            emptyOfColValues.set("error", JSONUtils.toJsonNode("resultSet is null"));
        }
        resultJSONArray.add(emptyOfColValues);
        return resultJSONArray;
    }

K
Kirs 已提交
351 352 353 354 355 356 357 358 359 360 361 362
    /**
     * send alert as an attachment
     *
     * @param title title
     * @param content content
     */
    private void sendAttachment(int groupId, String title, String content) {
        setNeedAlert(Boolean.TRUE);
        TaskAlertInfo taskAlertInfo = new TaskAlertInfo();
        taskAlertInfo.setAlertGroupId(groupId);
        taskAlertInfo.setContent(content);
        taskAlertInfo.setTitle(title);
363
        setTaskAlertInfo(taskAlertInfo);
K
Kirs 已提交
364 365
    }

366
    private String executeQuery(Connection connection, SqlBinds sqlBinds, String handlerType) throws Exception {
367
        try (PreparedStatement statement = prepareStatementAndBind(connection, sqlBinds)) {
368
            logger.info("{} statement execute query, for sql: {}", handlerType, sqlBinds.getSql());
369 370
            ResultSet resultSet = statement.executeQuery();
            return resultProcess(resultSet);
K
Kirs 已提交
371 372 373
        }
    }

374 375
    private String executeUpdate(Connection connection, List<SqlBinds> statementsBinds,
                                 String handlerType) throws Exception {
376 377 378 379
        int result = 0;
        for (SqlBinds sqlBind : statementsBinds) {
            try (PreparedStatement statement = prepareStatementAndBind(connection, sqlBind)) {
                result = statement.executeUpdate();
380 381
                logger.info("{} statement execute update result: {}, for sql: {}", handlerType, result,
                        sqlBind.getSql());
K
Kirs 已提交
382 383
            }
        }
384
        return String.valueOf(result);
K
Kirs 已提交
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
    }

    /**
     * create temp function
     *
     * @param connection connection
     * @param createFuncs createFuncs
     */
    private void createTempFunction(Connection connection,
                                    List<String> createFuncs) throws Exception {
        try (Statement funcStmt = connection.createStatement()) {
            for (String createFunc : createFuncs) {
                logger.info("hive create function sql: {}", createFunc);
                funcStmt.execute(createFunc);
            }
        }
    }

    /**
     * close jdbc resource
     *
     * @param connection connection
     */
408
    private void close(Connection connection) {
K
Kirs 已提交
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                logger.error("close connection error : {}", e.getMessage(), e);
            }
        }
    }

    /**
     * preparedStatement bind
     *
     * @param connection connection
     * @param sqlBinds sqlBinds
     * @return PreparedStatement
     * @throws Exception Exception
     */
    private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) {
        // is the timeout set
        boolean timeoutFlag = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED
                || taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
430 431
        try {
            PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql());
K
Kirs 已提交
432 433 434 435 436 437 438 439 440 441
            if (timeoutFlag) {
                stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
            }
            Map<Integer, Property> params = sqlBinds.getParamsMap();
            if (params != null) {
                for (Map.Entry<Integer, Property> entry : params.entrySet()) {
                    Property prop = entry.getValue();
                    ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
                }
            }
442 443
            logger.info("prepare statement replace sql : {}, sql parameters : {}", sqlBinds.getSql(),
                    sqlBinds.getParamsMap());
K
Kirs 已提交
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
            return stmt;
        } catch (Exception exception) {
            throw new TaskException("SQL task prepareStatementAndBind error", exception);
        }
    }

    /**
     * print replace sql
     *
     * @param content content
     * @param formatSql format sql
     * @param rgex rgex
     * @param sqlParamsMap sql params map
     */
    private void printReplacedSql(String content, String formatSql, String rgex, Map<Integer, Property> sqlParamsMap) {
459
        // parameter print style
K
Kirs 已提交
460 461 462 463 464 465
        logger.info("after replace sql , preparing : {}", formatSql);
        StringBuilder logPrint = new StringBuilder("replaced sql , parameters:");
        if (sqlParamsMap == null) {
            logger.info("printReplacedSql: sqlParamsMap is null.");
        } else {
            for (int i = 1; i <= sqlParamsMap.size(); i++) {
466 467
                logPrint.append(sqlParamsMap.get(i).getValue()).append("(").append(sqlParamsMap.get(i).getType())
                        .append(")");
K
Kirs 已提交
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
            }
        }
        logger.info("Sql Params are {}", logPrint);
    }

    /**
     * ready to execute SQL and parameter entity Map
     *
     * @return SqlBinds
     */
    private SqlBinds getSqlAndSqlParamsMap(String sql) {
        Map<Integer, Property> sqlParamsMap = new HashMap<>();
        StringBuilder sqlBuilder = new StringBuilder();

        // combining local and global parameters
483
        Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
K
Kirs 已提交
484 485 486 487 488 489 490 491 492 493 494 495 496 497

        // spell SQL according to the final user-defined variable
        if (paramsMap == null) {
            sqlBuilder.append(sql);
            return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
        }

        if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
            String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(),
                    ParamUtils.convert(paramsMap));
            logger.info("SQL title : {}", title);
            sqlParameters.setTitle(title);
        }

498 499 500 501
        // new
        // replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job
        sql = ParameterUtils.replaceScheduleTime(sql,
                DateUtils.timeStampToDate(taskExecutionContext.getScheduleTime()));
K
Kirs 已提交
502
        // special characters need to be escaped, ${} needs to be escaped
503 504
        setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId());
        // Replace the original value in sql !{...} ,Does not participate in precompilation
K
Kirs 已提交
505 506 507 508
        String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*";
        sql = replaceOriginalValue(sql, rgexo, paramsMap);
        // replace the ${} of the SQL statement with the Placeholder
        String formatSql = sql.replaceAll(rgex, "?");
509 510
        // Convert the list parameter
        formatSql = ParameterUtils.expandListParameter(sqlParamsMap, formatSql);
K
Kirs 已提交
511
        sqlBuilder.append(formatSql);
512
        // print replace sql
K
Kirs 已提交
513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
        printReplacedSql(sql, formatSql, rgex, sqlParamsMap);
        return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
    }

    private String replaceOriginalValue(String content, String rgex, Map<String, Property> sqlParamsMap) {
        Pattern pattern = Pattern.compile(rgex);
        while (true) {
            Matcher m = pattern.matcher(content);
            if (!m.find()) {
                break;
            }
            String paramName = m.group(1);
            String paramValue = sqlParamsMap.get(paramName).getValue();
            content = m.replaceFirst(paramValue);
        }
        return content;
    }

    /**
     * create function list
     *
534
     * @param udfFuncParameters udfFuncParameters
K
Kirs 已提交
535
     * @param logger logger
536
     * @return
K
Kirs 已提交
537
     */
538
    private List<String> createFuncs(List<UdfFuncParameters> udfFuncParameters, Logger logger) {
K
Kirs 已提交
539

540
        if (CollectionUtils.isEmpty(udfFuncParameters)) {
K
Kirs 已提交
541 542 543
            logger.info("can't find udf function resource");
            return null;
        }
544
        // build jar sql
545
        List<String> funcList = buildJarSql(udfFuncParameters);
546

K
Kirs 已提交
547
        // build temp function sql
548 549
        List<String> tempFuncList = buildTempFuncSql(udfFuncParameters);
        funcList.addAll(tempFuncList);
K
Kirs 已提交
550 551 552 553 554
        return funcList;
    }

    /**
     * build temp function sql
555 556
     * @param udfFuncParameters udfFuncParameters
     * @return
K
Kirs 已提交
557
     */
558 559
    private List<String> buildTempFuncSql(List<UdfFuncParameters> udfFuncParameters) {
        return udfFuncParameters.stream().map(value -> MessageFormat
560 561
                .format(CREATE_OR_REPLACE_FUNCTION_FORMAT, value.getFuncName(), value.getClassName()))
                .collect(Collectors.toList());
K
Kirs 已提交
562 563
    }

564 565
    /**
     * build jar sql
566 567
     * @param udfFuncParameters udfFuncParameters
     * @return
568
     */
569 570 571
    private List<String> buildJarSql(List<UdfFuncParameters> udfFuncParameters) {
        return udfFuncParameters.stream().map(value -> {
            String defaultFS = value.getDefaultFS();
572
            String prefixPath = defaultFS.startsWith("file://") ? "file://" : defaultFS;
573 574
            String uploadPath = CommonUtils.getHdfsUdfDir(value.getTenantCode());
            String resourceFullName = value.getResourceName();
575
            return String.format("add jar %s", resourceFullName);
576
        }).collect(Collectors.toList());
577 578
    }

K
Kirs 已提交
579
}