Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
DolphinScheduler社区
DolphinScheduler
提交
a9f72c20
DolphinScheduler
项目概览
DolphinScheduler社区
/
DolphinScheduler
与 Fork 源项目一致
Fork自
apache / DolphinScheduler
通知
67
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
DolphinScheduler
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
a9f72c20
编写于
6月 02, 2020
作者:
张
张世鸣
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Using Jackson instead of Fastjson
上级
8f8ddae0
变更
37
隐藏空白更改
内联
并排
Showing
37 changed file
with
118 addition
and
161 deletion
+118
-161
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplate.java
...hinscheduler/alert/template/impl/DefaultHTMLTemplate.java
+1
-1
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtils.java
...rg/apache/dolphinscheduler/alert/utils/DingTalkUtils.java
+2
-2
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java
...e/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java
+2
-4
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java
...a/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java
+1
-0
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/JSONUtils.java
...va/org/apache/dolphinscheduler/alert/utils/JSONUtils.java
+0
-69
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplateTest.java
...cheduler/alert/template/impl/DefaultHTMLTemplateTest.java
+1
-1
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtilsTest.java
...pache/dolphinscheduler/alert/utils/DingTalkUtilsTest.java
+1
-1
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtilsTest.java
...lphinscheduler/alert/utils/EnterpriseWeChatUtilsTest.java
+1
-0
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/JSONUtilsTest.java
...rg/apache/dolphinscheduler/alert/utils/JSONUtilsTest.java
+2
-1
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/MailUtilsTest.java
...rg/apache/dolphinscheduler/alert/utils/MailUtilsTest.java
+1
-0
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
...pache/dolphinscheduler/api/service/DataSourceService.java
+3
-3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
.../apache/dolphinscheduler/api/service/ExecutorService.java
+5
-5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
...olphinscheduler/api/service/ProcessDefinitionService.java
+2
-2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
.../dolphinscheduler/api/service/ProcessInstanceService.java
+6
-6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
...apache/dolphinscheduler/api/service/ResourcesService.java
+4
-5
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
...va/org/apache/dolphinscheduler/common/model/TaskNode.java
+1
-1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
...org/apache/dolphinscheduler/common/utils/HadoopUtils.java
+5
-4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
...a/org/apache/dolphinscheduler/common/utils/JSONUtils.java
+30
-3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
.../apache/dolphinscheduler/common/utils/ParameterUtils.java
+1
-1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
...ava/org/apache/dolphinscheduler/common/utils/ResInfo.java
+1
-1
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java
...g/apache/dolphinscheduler/common/utils/JSONUtilsTest.java
+4
-4
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
...che/dolphinscheduler/common/utils/ParameterUtilsTest.java
+4
-4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
...apache/dolphinscheduler/dao/entity/ProcessDefinition.java
+4
-3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
...ler/server/master/consumer/TaskPriorityQueueConsumer.java
+5
-5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
...eduler/server/master/runner/MasterBaseTaskExecThread.java
+1
-1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
...lphinscheduler/server/master/runner/MasterExecThread.java
+2
-2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
...rg/apache/dolphinscheduler/server/utils/AlertManager.java
+2
-2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
...heduler/server/worker/processor/TaskExecuteProcessor.java
+2
-5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
...phinscheduler/server/worker/runner/TaskExecuteThread.java
+1
-1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
...he/dolphinscheduler/server/worker/task/http/HttpTask.java
+3
-6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
...heduler/server/worker/task/processdure/ProcedureTask.java
+2
-1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
...ache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+1
-1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
.../dolphinscheduler/server/master/MasterExecThreadTest.java
+1
-1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
...org/apache/dolphinscheduler/server/master/ParamsTest.java
+1
-1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
.../apache/dolphinscheduler/server/utils/ParamUtilsTest.java
+4
-3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
...ache/dolphinscheduler/service/process/ProcessService.java
+10
-10
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
...ache/dolphinscheduler/service/quartz/QuartzExecutors.java
+1
-1
未找到文件。
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplate.java
浏览文件 @
a9f72c20
...
...
@@ -18,11 +18,11 @@ package org.apache.dolphinscheduler.alert.template.impl;
import
org.apache.dolphinscheduler.alert.template.AlertTemplate
;
import
org.apache.dolphinscheduler.alert.utils.Constants
;
import
org.apache.dolphinscheduler.alert.utils.JSONUtils
;
import
org.apache.dolphinscheduler.common.enums.ShowType
;
import
org.apache.dolphinscheduler.common.utils.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
java.util.*
;
...
...
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtils.java
浏览文件 @
a9f72c20
...
...
@@ -17,7 +17,7 @@
package
org.apache.dolphinscheduler.alert.utils
;
import
com.alibaba.fastjson.JSON
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
org.apache.commons.codec.binary.StringUtils
;
import
org.apache.http.HttpEntity
;
import
org.apache.http.HttpHost
;
...
...
@@ -129,7 +129,7 @@ public class DingTalkUtils {
textContent
.
put
(
"content"
,
txt
);
items
.
put
(
"text"
,
textContent
);
return
JSON
.
toJSON
String
(
items
);
return
JSON
Utils
.
toJson
String
(
items
);
}
...
...
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java
浏览文件 @
a9f72c20
...
...
@@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.alert.utils;
import
org.apache.dolphinscheduler.common.enums.ShowType
;
import
org.apache.dolphinscheduler.common.utils.StringUtils
;
import
com.alibaba.fastjson.JSON
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
com.google.common.reflect.TypeToken
;
import
org.apache.dolphinscheduler.plugin.model.AlertData
;
...
...
@@ -99,9 +99,7 @@ public class EnterpriseWeChatUtils {
response
.
close
();
}
Map
<
String
,
Object
>
map
=
JSON
.
parseObject
(
resp
,
new
TypeToken
<
Map
<
String
,
Object
>>()
{
}.
getType
());
Map
<
String
,
Object
>
map
=
JSONUtils
.
parseObject
(
resp
,
Map
.
class
);
return
map
.
get
(
"access_token"
).
toString
();
}
finally
{
httpClient
.
close
();
...
...
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java
浏览文件 @
a9f72c20
...
...
@@ -30,6 +30,7 @@ import java.io.File;
import
java.io.FileOutputStream
;
import
java.io.IOException
;
import
java.util.*
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
/**
* excel utils
...
...
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/JSONUtils.java
已删除
100644 → 0
浏览文件 @
8f8ddae0
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.dolphinscheduler.alert.utils
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONArray
;
import
org.apache.dolphinscheduler.common.utils.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.Collections
;
import
java.util.List
;
/**
* json utils
*/
public
class
JSONUtils
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
JSONUtils
.
class
);
/**
* object to json string
* @param object the object to be converted to json
* @return json string
*/
public
static
String
toJsonString
(
Object
object
)
{
try
{
return
JSON
.
toJSONString
(
object
,
false
);
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"Json deserialization exception."
,
e
);
}
}
/**
* json to list
*
* @param json the json
* @param clazz c
* @param <T> the generic clazz
* @return the result list or empty list
*/
public
static
<
T
>
List
<
T
>
toList
(
String
json
,
Class
<
T
>
clazz
)
{
if
(
StringUtils
.
isEmpty
(
json
))
{
return
Collections
.
emptyList
();
}
try
{
return
JSON
.
parseArray
(
json
,
clazz
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"JSONArray.parseArray exception!"
,
e
);
}
return
Collections
.
emptyList
();
}
}
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/template/impl/DefaultHTMLTemplateTest.java
浏览文件 @
a9f72c20
...
...
@@ -16,7 +16,7 @@
*/
package
org.apache.dolphinscheduler.alert.template.impl
;
import
org.apache.dolphinscheduler.
alert
.utils.JSONUtils
;
import
org.apache.dolphinscheduler.
common
.utils.JSONUtils
;
import
org.apache.dolphinscheduler.common.enums.ShowType
;
import
org.junit.Test
;
import
org.slf4j.Logger
;
...
...
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtilsTest.java
浏览文件 @
a9f72c20
...
...
@@ -118,7 +118,7 @@ public class DingTalkUtilsTest {
logger
.
info
(
"test support utf8, actual:"
+
msg
);
logger
.
info
(
"test support utf8, actual:"
+
DingTalkUtils
.
isEnableDingTalk
);
String
expect
=
"{\"
text\":{\"content\":\"this is test:中文\"},\"msgtype\":\"text\"
}"
;
String
expect
=
"{\"
msgtype\":\"text\",\"text\":{\"content\":\"this is test:中文\"}
}"
;
Assert
.
assertEquals
(
expect
,
msg
);
}
...
...
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtilsTest.java
浏览文件 @
a9f72c20
...
...
@@ -34,6 +34,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import
java.io.IOException
;
import
java.util.*
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
/**
* Please manually modify the configuration file before testing.
...
...
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/JSONUtilsTest.java
浏览文件 @
a9f72c20
...
...
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import
java.util.ArrayList
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
static
org
.
junit
.
Assert
.*;
...
...
@@ -67,7 +68,7 @@ public class JSONUtilsTest {
logger
.
info
(
result
);
//Equal result with expected string
assertEquals
(
result
,
expected
);
assertEquals
(
result
,
"[{\"database client connections\":\"190\",\"mysql address\":\"192.168.xx.xx\",\"mysql service name\":\"mysql200\",\"no index of number\":\"80\",\"port\":\"3306\"}]"
);
//If param is null, then return null string
result
=
JSONUtils
.
toJsonString
(
null
);
...
...
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/MailUtilsTest.java
浏览文件 @
a9f72c20
...
...
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
import
java.util.*
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
/**
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
浏览文件 @
a9f72c20
...
...
@@ -317,7 +317,7 @@ public class DataSourceService extends BaseService{
String
connectionParams
=
dataSource
.
getConnectionParams
();
JSONObject
object
=
JSON
.
parseObject
(
connectionParams
);
object
.
put
(
Constants
.
PASSWORD
,
Constants
.
XXXXXX
);
dataSource
.
setConnectionParams
(
JSONUtils
.
toJson
(
object
));
dataSource
.
setConnectionParams
(
JSONUtils
.
toJson
String
(
object
));
}
}
...
...
@@ -542,9 +542,9 @@ public class DataSourceService extends BaseService{
}
if
(
logger
.
isDebugEnabled
()){
logger
.
info
(
"parameters map-----"
+
JSON
.
toJSON
String
(
parameterMap
));
logger
.
info
(
"parameters map-----"
+
JSON
Utils
.
toJson
String
(
parameterMap
));
}
return
JSON
.
toJSON
String
(
parameterMap
);
return
JSON
Utils
.
toJson
String
(
parameterMap
);
}
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
浏览文件 @
a9f72c20
...
...
@@ -509,7 +509,7 @@ public class ExecutorService extends BaseService{
if
(
warningType
!=
null
){
command
.
setWarningType
(
warningType
);
}
command
.
setCommandParam
(
JSONUtils
.
toJson
(
cmdParam
));
command
.
setCommandParam
(
JSONUtils
.
toJson
String
(
cmdParam
));
command
.
setExecutorId
(
executorId
);
command
.
setWarningGroupId
(
warningGroupId
);
command
.
setProcessInstancePriority
(
processInstancePriority
);
...
...
@@ -532,7 +532,7 @@ public class ExecutorService extends BaseService{
if
(
runMode
==
RunMode
.
RUN_MODE_SERIAL
){
cmdParam
.
put
(
CMDPARAM_COMPLEMENT_DATA_START_DATE
,
DateUtils
.
dateToString
(
start
));
cmdParam
.
put
(
CMDPARAM_COMPLEMENT_DATA_END_DATE
,
DateUtils
.
dateToString
(
end
));
command
.
setCommandParam
(
JSONUtils
.
toJson
(
cmdParam
));
command
.
setCommandParam
(
JSONUtils
.
toJson
String
(
cmdParam
));
return
processService
.
createCommand
(
command
);
}
else
if
(
runMode
==
RunMode
.
RUN_MODE_PARALLEL
){
List
<
Schedule
>
schedules
=
processService
.
queryReleaseSchedulerListByProcessDefinitionId
(
processDefineId
);
...
...
@@ -547,7 +547,7 @@ public class ExecutorService extends BaseService{
for
(
Date
date
:
listDate
)
{
cmdParam
.
put
(
CMDPARAM_COMPLEMENT_DATA_START_DATE
,
DateUtils
.
dateToString
(
date
));
cmdParam
.
put
(
CMDPARAM_COMPLEMENT_DATA_END_DATE
,
DateUtils
.
dateToString
(
date
));
command
.
setCommandParam
(
JSONUtils
.
toJson
(
cmdParam
));
command
.
setCommandParam
(
JSONUtils
.
toJson
String
(
cmdParam
));
processService
.
createCommand
(
command
);
}
return
listDate
.
size
();
...
...
@@ -558,7 +558,7 @@ public class ExecutorService extends BaseService{
runCunt
+=
1
;
cmdParam
.
put
(
CMDPARAM_COMPLEMENT_DATA_START_DATE
,
DateUtils
.
dateToString
(
start
));
cmdParam
.
put
(
CMDPARAM_COMPLEMENT_DATA_END_DATE
,
DateUtils
.
dateToString
(
start
));
command
.
setCommandParam
(
JSONUtils
.
toJson
(
cmdParam
));
command
.
setCommandParam
(
JSONUtils
.
toJson
String
(
cmdParam
));
processService
.
createCommand
(
command
);
start
=
DateUtils
.
getSomeDay
(
start
,
1
);
}
...
...
@@ -570,7 +570,7 @@ public class ExecutorService extends BaseService{
processDefineId
,
schedule
);
}
}
else
{
command
.
setCommandParam
(
JSONUtils
.
toJson
(
cmdParam
));
command
.
setCommandParam
(
JSONUtils
.
toJson
String
(
cmdParam
));
return
processService
.
createCommand
(
command
);
}
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
浏览文件 @
a9f72c20
...
...
@@ -636,7 +636,7 @@ public class ProcessDefinitionService extends BaseDAGService {
try
{
out
=
response
.
getOutputStream
();
buff
=
new
BufferedOutputStream
(
out
);
buff
.
write
(
JSON
.
toJSON
String
(
processDefinitionList
).
getBytes
(
StandardCharsets
.
UTF_8
));
buff
.
write
(
JSON
Utils
.
toJson
String
(
processDefinitionList
).
getBytes
(
StandardCharsets
.
UTF_8
));
buff
.
flush
();
buff
.
close
();
}
catch
(
IOException
e
)
{
...
...
@@ -756,7 +756,7 @@ public class ProcessDefinitionService extends BaseDAGService {
public
Map
<
String
,
Object
>
importProcessDefinition
(
User
loginUser
,
MultipartFile
file
,
String
currentProjectName
)
{
Map
<
String
,
Object
>
result
=
new
HashMap
<>(
5
);
String
processMetaJson
=
FileUtils
.
file2String
(
file
);
List
<
ProcessMeta
>
processMetaList
=
JSON
.
parseArray
(
processMetaJson
,
ProcessMeta
.
class
);
List
<
ProcessMeta
>
processMetaList
=
JSON
Utils
.
toList
(
processMetaJson
,
ProcessMeta
.
class
);
//check file content
if
(
CollectionUtils
.
isEmpty
(
processMetaList
))
{
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
浏览文件 @
a9f72c20
...
...
@@ -242,7 +242,7 @@ public class ProcessInstanceService extends BaseDAGService {
if
(
logResult
.
getCode
()
==
Status
.
SUCCESS
.
ordinal
()){
String
log
=
(
String
)
logResult
.
getData
();
Map
<
String
,
DependResult
>
resultMap
=
parseLogForDependentResult
(
log
);
taskInstance
.
setDependentResult
(
JSONUtils
.
toJson
(
resultMap
));
taskInstance
.
setDependentResult
(
JSONUtils
.
toJson
String
(
resultMap
));
}
}
}
...
...
@@ -380,7 +380,7 @@ public class ProcessInstanceService extends BaseDAGService {
return
result
;
}
originDefParams
=
JSONUtils
.
toJson
(
processData
.
getGlobalParams
());
originDefParams
=
JSONUtils
.
toJson
String
(
processData
.
getGlobalParams
());
List
<
Property
>
globalParamList
=
processData
.
getGlobalParams
();
Map
<
String
,
String
>
globalParamMap
=
globalParamList
.
stream
().
collect
(
Collectors
.
toMap
(
Property:
:
getProp
,
Property:
:
getValue
));
globalParams
=
ParameterUtils
.
curingGlobalParams
(
globalParamMap
,
globalParamList
,
...
...
@@ -529,16 +529,16 @@ public class ProcessInstanceService extends BaseDAGService {
List
<
Property
>
globalParams
=
new
ArrayList
<>();
if
(
userDefinedParams
!=
null
&&
userDefinedParams
.
length
()
>
0
)
{
globalParams
=
JSON
.
parseArray
(
userDefinedParams
,
Property
.
class
);
globalParams
=
JSON
Utils
.
toList
(
userDefinedParams
,
Property
.
class
);
}
List
<
TaskNode
>
taskNodeList
=
workflowData
.
getTasks
();
// global param string
String
globalParamStr
=
JSONUtils
.
toJson
(
globalParams
);
String
globalParamStr
=
JSONUtils
.
toJson
String
(
globalParams
);
globalParamStr
=
ParameterUtils
.
convertParameterPlaceholders
(
globalParamStr
,
timeParams
);
globalParams
=
JSON
.
parseArray
(
globalParamStr
,
Property
.
class
);
globalParams
=
JSON
Utils
.
toList
(
globalParamStr
,
Property
.
class
);
for
(
Property
property
:
globalParams
)
{
timeParams
.
put
(
property
.
getProp
(),
property
.
getValue
());
}
...
...
@@ -551,7 +551,7 @@ public class ProcessInstanceService extends BaseDAGService {
String
localParams
=
map
.
get
(
LOCAL_PARAMS
);
if
(
localParams
!=
null
&&
!
localParams
.
isEmpty
())
{
localParams
=
ParameterUtils
.
convertParameterPlaceholders
(
localParams
,
timeParams
);
List
<
Property
>
localParamsList
=
JSON
.
parseArray
(
localParams
,
Property
.
class
);
List
<
Property
>
localParamsList
=
JSON
Utils
.
toList
(
localParams
,
Property
.
class
);
Map
<
String
,
Object
>
localParamsMap
=
new
HashMap
<>();
localParamsMap
.
put
(
"taskType"
,
taskNode
.
getType
());
localParamsMap
.
put
(
"localParamsList"
,
localParamsList
);
...
...
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
浏览文件 @
a9f72c20
...
...
@@ -16,10 +16,9 @@
*/
package
org.apache.dolphinscheduler.api.service
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.serializer.SerializerFeature
;
import
com.baomidou.mybatisplus.core.metadata.IPage
;
import
com.baomidou.mybatisplus.extension.plugins.pagination.Page
;
import
com.fasterxml.jackson.databind.SerializationFeature
;
import
org.apache.commons.collections.BeanMap
;
import
org.apache.dolphinscheduler.api.dto.resources.ResourceComponent
;
import
org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter
;
...
...
@@ -544,7 +543,7 @@ public class ResourcesService extends BaseService {
}
List
<
Resource
>
allResourceList
=
resourcesMapper
.
queryResourceListAuthored
(
userId
,
type
.
ordinal
(),
0
);
Visitor
resourceTreeVisitor
=
new
ResourceTreeVisitor
(
allResourceList
);
//JSONArray jsonArray = JSON
.parseArray(JSON.toJSON
String(resourceTreeVisitor.visit().getChildren(), SerializerFeature.SortField));
//JSONArray jsonArray = JSON
Utils.toList(JSONUtils.toJson
String(resourceTreeVisitor.visit().getChildren(), SerializerFeature.SortField));
result
.
put
(
Constants
.
DATA_LIST
,
resourceTreeVisitor
.
visit
().
getChildren
());
putMsg
(
result
,
Status
.
SUCCESS
);
...
...
@@ -1128,8 +1127,8 @@ public class ResourcesService extends BaseService {
}
List
<
Resource
>
authedResources
=
resourcesMapper
.
queryAuthorizedResourceList
(
userId
);
Visitor
visitor
=
new
ResourceTreeVisitor
(
authedResources
);
logger
.
info
(
JSON
.
toJSONString
(
visitor
.
visit
(),
SerializerFeature
.
SortField
));
String
jsonTreeStr
=
JSON
.
toJSONString
(
visitor
.
visit
().
getChildren
(),
SerializerFeature
.
SortField
);
logger
.
info
(
JSON
Utils
.
toJsonString
(
visitor
.
visit
(),
SerializationFeature
.
ORDER_MAP_ENTRIES_BY_KEYS
));
String
jsonTreeStr
=
JSON
Utils
.
toJsonString
(
visitor
.
visit
().
getChildren
(),
SerializationFeature
.
ORDER_MAP_ENTRIES_BY_KEYS
);
logger
.
info
(
jsonTreeStr
);
result
.
put
(
Constants
.
DATA_LIST
,
visitor
.
visit
().
getChildren
());
putMsg
(
result
,
Status
.
SUCCESS
);
...
...
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
浏览文件 @
a9f72c20
...
...
@@ -195,7 +195,7 @@ public class TaskNode {
public
void
setDepList
(
List
<
String
>
depList
)
throws
JsonProcessingException
{
this
.
depList
=
depList
;
this
.
preTasks
=
JSONUtils
.
toJson
(
depList
);
this
.
preTasks
=
JSONUtils
.
toJson
String
(
depList
);
}
public
String
getLoc
()
{
...
...
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
浏览文件 @
a9f72c20
...
...
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.utils;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONException
;
import
com.alibaba.fastjson.JSONObject
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
com.google.common.cache.CacheBuilder
;
import
com.google.common.cache.CacheLoader
;
import
com.google.common.cache.LoadingCache
;
...
...
@@ -421,15 +422,15 @@ public class HadoopUtils implements Closeable {
String
responseContent
=
HttpUtils
.
get
(
applicationUrl
);
if
(
responseContent
!=
null
)
{
JSONObject
jsonObject
=
JSON
.
parseObject
(
responseContent
);
result
=
jsonObject
.
getJSONObject
(
"app"
).
getString
(
"finalStatus"
);
ObjectNode
jsonObject
=
JSONUtils
.
parseObject
(
responseContent
);
result
=
jsonObject
.
path
(
"app"
).
path
(
"finalStatus"
).
asText
(
);
}
else
{
//may be in job history
String
jobHistoryUrl
=
getJobHistoryUrl
(
applicationId
);
logger
.
info
(
"jobHistoryUrl={}"
,
jobHistoryUrl
);
responseContent
=
HttpUtils
.
get
(
jobHistoryUrl
);
JSONObject
jsonObject
=
JSONObject
.
parseObject
(
responseContent
);
result
=
jsonObject
.
getJSONObject
(
"job"
).
getString
(
"state"
);
ObjectNode
jsonObject
=
JSONUtils
.
parseObject
(
responseContent
);
result
=
jsonObject
.
path
(
"job"
).
path
(
"state"
).
asText
(
);
}
switch
(
result
)
{
...
...
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
浏览文件 @
a9f72c20
...
...
@@ -50,15 +50,30 @@ public class JSONUtils {
objectMapper
.
configure
(
DeserializationFeature
.
ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT
,
true
).
setTimeZone
(
TimeZone
.
getDefault
());
}
public
static
ArrayNode
createArrayNode
()
{
return
objectMapper
.
createArrayNode
();
}
public
static
ObjectNode
createObjectNode
()
{
return
objectMapper
.
createObjectNode
();
}
public
static
JsonNode
toJsonNode
(
Object
obj
)
{
return
objectMapper
.
valueToTree
(
obj
);
}
/**
* json representation of object
*
* @param object object
* @param feature feature
* @return object to json string
*/
public
static
String
toJson
(
Object
object
)
{
public
static
String
toJson
String
(
Object
object
,
SerializationFeature
feature
)
{
try
{
return
objectMapper
.
writeValueAsString
(
object
);
ObjectWriter
writer
=
objectMapper
.
writer
(
feature
);
return
writer
.
writeValueAsString
(
object
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"object to json exception!"
,
e
);
}
...
...
@@ -66,7 +81,6 @@ public class JSONUtils {
return
null
;
}
/**
* This method deserializes the specified Json into an object of the specified class. It is not
* suitable to use if the specified class is a generic type since it will not have the generic
...
...
@@ -95,6 +109,19 @@ public class JSONUtils {
}
public
static
<
T
>
T
parseObject
(
String
json
,
TypeReference
typeReference
)
{
if
(
StringUtils
.
isEmpty
(
json
))
{
return
null
;
}
try
{
return
objectMapper
.
readValue
(
json
,
typeReference
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"parse object exception!"
,
e
);
}
return
null
;
}
/**
* json to list
*
...
...
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
浏览文件 @
a9f72c20
...
...
@@ -196,7 +196,7 @@ public class ParameterUtils {
property
.
setValue
(
val
);
}
}
return
JSONUtils
.
toJson
(
globalParamList
);
return
JSONUtils
.
toJson
String
(
globalParamList
);
}
...
...
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
浏览文件 @
a9f72c20
...
...
@@ -85,7 +85,7 @@ public class ResInfo {
*/
public
static
String
getResInfoJson
(
double
cpuUsage
,
double
memoryUsage
,
double
loadAverage
){
ResInfo
resInfo
=
new
ResInfo
(
cpuUsage
,
memoryUsage
,
loadAverage
);
return
JSONUtils
.
toJson
(
resInfo
);
return
JSONUtils
.
toJson
String
(
resInfo
);
}
...
...
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java
浏览文件 @
a9f72c20
...
...
@@ -87,7 +87,7 @@ public class JSONUtilsTest {
List
<
LinkedHashMap
<
String
,
String
>>
maps
=
new
ArrayList
<>();
maps
.
add
(
0
,
map1
);
String
resultJson
=
JSONUtils
.
toJson
(
maps
);
String
resultJson
=
JSONUtils
.
toJson
String
(
maps
);
return
resultJson
;
}
...
...
@@ -96,14 +96,14 @@ public class JSONUtilsTest {
Map
<
String
,
String
>
map
=
new
HashMap
<>();
map
.
put
(
"foo"
,
"bar"
);
Assert
.
assertEquals
(
"{\"foo\":\"bar\"}"
,
JSONUtils
.
toJson
(
map
));
Assert
.
assertEquals
(
"{\"foo\":\"bar\"}"
,
JSONUtils
.
toJson
String
(
map
));
Assert
.
assertEquals
(
String
.
valueOf
((
Object
)
null
),
JSONUtils
.
toJson
(
null
));
String
.
valueOf
((
Object
)
null
),
JSONUtils
.
toJson
String
(
null
));
}
@Test
public
void
testParseObject
()
{
Assert
.
assertNull
(
JSONUtils
.
parseObject
(
""
,
null
));
Assert
.
assertNull
(
JSONUtils
.
parseObject
(
""
));
Assert
.
assertNull
(
JSONUtils
.
parseObject
(
"foo"
,
String
.
class
));
}
...
...
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
浏览文件 @
a9f72c20
...
...
@@ -91,13 +91,13 @@ public class ParameterUtilsTest {
globalParamList
.
add
(
property
);
String
result2
=
ParameterUtils
.
curingGlobalParams
(
null
,
globalParamList
,
CommandType
.
START_CURRENT_TASK_PROCESS
,
scheduleTime
);
Assert
.
assertEquals
(
result2
,
JSONUtils
.
toJson
(
globalParamList
));
Assert
.
assertEquals
(
result2
,
JSONUtils
.
toJson
String
(
globalParamList
));
String
result3
=
ParameterUtils
.
curingGlobalParams
(
globalParamMap
,
globalParamList
,
CommandType
.
START_CURRENT_TASK_PROCESS
,
null
);
Assert
.
assertEquals
(
result3
,
JSONUtils
.
toJson
(
globalParamList
));
Assert
.
assertEquals
(
result3
,
JSONUtils
.
toJson
String
(
globalParamList
));
String
result4
=
ParameterUtils
.
curingGlobalParams
(
globalParamMap
,
globalParamList
,
CommandType
.
START_CURRENT_TASK_PROCESS
,
scheduleTime
);
Assert
.
assertEquals
(
result4
,
JSONUtils
.
toJson
(
globalParamList
));
Assert
.
assertEquals
(
result4
,
JSONUtils
.
toJson
String
(
globalParamList
));
//test var $ startsWith
globalParamMap
.
put
(
"bizDate"
,
"${system.biz.date}"
);
...
...
@@ -113,7 +113,7 @@ public class ParameterUtilsTest {
globalParamList
.
add
(
property4
);
String
result5
=
ParameterUtils
.
curingGlobalParams
(
globalParamMap
,
globalParamList
,
CommandType
.
START_CURRENT_TASK_PROCESS
,
scheduleTime
);
Assert
.
assertEquals
(
result5
,
JSONUtils
.
toJson
(
globalParamList
));
Assert
.
assertEquals
(
result5
,
JSONUtils
.
toJson
String
(
globalParamList
));
}
/**
...
...
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
浏览文件 @
a9f72c20
...
...
@@ -25,6 +25,7 @@ import com.baomidou.mybatisplus.annotation.TableField;
import
com.baomidou.mybatisplus.annotation.TableId
;
import
com.baomidou.mybatisplus.annotation.TableName
;
import
com.baomidou.mybatisplus.core.toolkit.StringUtils
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
java.util.Date
;
import
java.util.List
;
...
...
@@ -271,7 +272,7 @@ public class ProcessDefinition {
}
public
void
setGlobalParams
(
String
globalParams
)
{
this
.
globalParamList
=
JSON
.
parseArray
(
globalParams
,
Property
.
class
);
this
.
globalParamList
=
JSON
Utils
.
toList
(
globalParams
,
Property
.
class
);
this
.
globalParams
=
globalParams
;
}
...
...
@@ -280,7 +281,7 @@ public class ProcessDefinition {
}
public
void
setGlobalParamList
(
List
<
Property
>
globalParamList
)
{
this
.
globalParams
=
JSON
.
toJSON
String
(
globalParamList
);
this
.
globalParams
=
JSON
Utils
.
toJson
String
(
globalParamList
);
this
.
globalParamList
=
globalParamList
;
}
...
...
@@ -288,7 +289,7 @@ public class ProcessDefinition {
List
<
Property
>
propList
;
if
(
globalParamMap
==
null
&&
StringUtils
.
isNotEmpty
(
globalParams
))
{
propList
=
JSON
.
parseArray
(
globalParams
,
Property
.
class
);
propList
=
JSON
Utils
.
toList
(
globalParams
,
Property
.
class
);
globalParamMap
=
propList
.
stream
().
collect
(
Collectors
.
toMap
(
Property:
:
getProp
,
Property:
:
getValue
));
}
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
浏览文件 @
a9f72c20
...
...
@@ -157,7 +157,7 @@ public class TaskPriorityQueueConsumer extends Thread{
TaskType
taskType
=
TaskType
.
valueOf
(
taskInstance
.
getTaskType
());
// task node
TaskNode
taskNode
=
JSON
Object
.
parseObject
(
taskInstance
.
getTaskJson
(),
TaskNode
.
class
);
TaskNode
taskNode
=
JSON
Utils
.
parseObject
(
taskInstance
.
getTaskJson
(),
TaskNode
.
class
);
Integer
userId
=
taskInstance
.
getProcessDefine
()
==
null
?
0
:
taskInstance
.
getProcessDefine
().
getUserId
();
Tenant
tenant
=
processService
.
getTenantForProcess
(
taskInstance
.
getProcessInstance
().
getTenantId
(),
userId
);
...
...
@@ -225,7 +225,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* @param taskNode taskNode
*/
private
void
setProcedureTaskRelation
(
ProcedureTaskExecutionContext
procedureTaskExecutionContext
,
TaskNode
taskNode
)
{
ProcedureParameters
procedureParameters
=
JSON
Object
.
parseObject
(
taskNode
.
getParams
(),
ProcedureParameters
.
class
);
ProcedureParameters
procedureParameters
=
JSON
Utils
.
parseObject
(
taskNode
.
getParams
(),
ProcedureParameters
.
class
);
int
datasourceId
=
procedureParameters
.
getDatasource
();
DataSource
datasource
=
processService
.
findDataSourceById
(
datasourceId
);
procedureTaskExecutionContext
.
setConnectionParams
(
datasource
.
getConnectionParams
());
...
...
@@ -237,7 +237,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* @param taskNode taskNode
*/
private
void
setDataxTaskRelation
(
DataxTaskExecutionContext
dataxTaskExecutionContext
,
TaskNode
taskNode
)
{
DataxParameters
dataxParameters
=
JSON
Object
.
parseObject
(
taskNode
.
getParams
(),
DataxParameters
.
class
);
DataxParameters
dataxParameters
=
JSON
Utils
.
parseObject
(
taskNode
.
getParams
(),
DataxParameters
.
class
);
DataSource
dataSource
=
processService
.
findDataSourceById
(
dataxParameters
.
getDataSource
());
DataSource
dataTarget
=
processService
.
findDataSourceById
(
dataxParameters
.
getDataTarget
());
...
...
@@ -263,7 +263,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* @param taskNode taskNode
*/
private
void
setSqoopTaskRelation
(
SqoopTaskExecutionContext
sqoopTaskExecutionContext
,
TaskNode
taskNode
)
{
SqoopParameters
sqoopParameters
=
JSON
Object
.
parseObject
(
taskNode
.
getParams
(),
SqoopParameters
.
class
);
SqoopParameters
sqoopParameters
=
JSON
Utils
.
parseObject
(
taskNode
.
getParams
(),
SqoopParameters
.
class
);
SourceMysqlParameter
sourceMysqlParameter
=
JSONUtils
.
parseObject
(
sqoopParameters
.
getSourceParams
(),
SourceMysqlParameter
.
class
);
TargetMysqlParameter
targetMysqlParameter
=
JSONUtils
.
parseObject
(
sqoopParameters
.
getTargetParams
(),
TargetMysqlParameter
.
class
);
...
...
@@ -290,7 +290,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* @param taskNode taskNode
*/
private
void
setSQLTaskRelation
(
SQLTaskExecutionContext
sqlTaskExecutionContext
,
TaskNode
taskNode
)
{
SqlParameters
sqlParameters
=
JSON
Object
.
parseObject
(
taskNode
.
getParams
(),
SqlParameters
.
class
);
SqlParameters
sqlParameters
=
JSON
Utils
.
parseObject
(
taskNode
.
getParams
(),
SqlParameters
.
class
);
int
datasourceId
=
sqlParameters
.
getDatasource
();
DataSource
datasource
=
processService
.
findDataSourceById
(
datasourceId
);
sqlTaskExecutionContext
.
setConnectionParams
(
datasource
.
getConnectionParams
());
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
浏览文件 @
a9f72c20
...
...
@@ -191,7 +191,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
return
true
;
}
catch
(
Exception
e
){
logger
.
error
(
"submit task Exception: "
,
e
);
logger
.
error
(
"task error : %s"
,
JSONUtils
.
toJson
(
taskInstance
));
logger
.
error
(
"task error : %s"
,
JSONUtils
.
toJson
String
(
taskInstance
));
return
false
;
}
}
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
浏览文件 @
a9f72c20
...
...
@@ -295,7 +295,7 @@ public class MasterExecThread implements Runnable {
processInstance
.
setScheduleTime
(
scheduleDate
);
if
(
cmdParam
.
containsKey
(
Constants
.
CMDPARAM_RECOVERY_START_NODE_STRING
)){
cmdParam
.
remove
(
Constants
.
CMDPARAM_RECOVERY_START_NODE_STRING
);
processInstance
.
setCommandParam
(
JSONUtils
.
toJson
(
cmdParam
));
processInstance
.
setCommandParam
(
JSONUtils
.
toJson
String
(
cmdParam
));
}
List
<
TaskInstance
>
taskInstanceList
=
processService
.
findValidTaskListByProcessId
(
processInstance
.
getId
());
...
...
@@ -467,7 +467,7 @@ public class MasterExecThread implements Runnable {
// process instance id
taskInstance
.
setProcessInstanceId
(
processInstance
.
getId
());
// task instance node json
taskInstance
.
setTaskJson
(
JSON
.
toJSON
String
(
taskNode
));
taskInstance
.
setTaskJson
(
JSON
Utils
.
toJson
String
(
taskNode
));
// task instance type
taskInstance
.
setTaskType
(
taskNode
.
getType
());
// task instance whether alert
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
浏览文件 @
a9f72c20
...
...
@@ -145,7 +145,7 @@ public class AlertManager {
failedTaskMap
.
put
(
"log path"
,
task
.
getLogPath
());
failedTaskList
.
add
(
failedTaskMap
);
}
res
=
JSONUtils
.
toJson
(
failedTaskList
);
res
=
JSONUtils
.
toJson
String
(
failedTaskList
);
}
return
res
;
...
...
@@ -170,7 +170,7 @@ public class AlertManager {
toleranceWorkerContentMap
.
put
(
"task retry times"
,
String
.
valueOf
(
taskInstance
.
getRetryTimes
()));
toleranceTaskInstanceList
.
add
(
toleranceWorkerContentMap
);
}
return
JSONUtils
.
toJson
(
toleranceTaskInstanceList
);
return
JSONUtils
.
toJson
String
(
toleranceTaskInstanceList
);
}
/**
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
浏览文件 @
a9f72c20
...
...
@@ -25,12 +25,9 @@ import io.netty.channel.Channel;
import
org.apache.dolphinscheduler.common.Constants
;
import
org.apache.dolphinscheduler.common.enums.ExecutionStatus
;
import
org.apache.dolphinscheduler.common.enums.TaskType
;
import
org.apache.dolphinscheduler.common.utils.OSUtils
;
import
org.apache.dolphinscheduler.common.utils.RetryerUtils
;
import
org.apache.dolphinscheduler.common.utils.*
;
import
org.apache.dolphinscheduler.server.log.TaskLogDiscriminator
;
import
org.apache.dolphinscheduler.common.thread.ThreadUtils
;
import
org.apache.dolphinscheduler.common.utils.FileUtils
;
import
org.apache.dolphinscheduler.common.utils.Preconditions
;
import
org.apache.dolphinscheduler.remote.command.Command
;
import
org.apache.dolphinscheduler.remote.command.CommandType
;
import
org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand
;
...
...
@@ -89,7 +86,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
String
contextJson
=
taskRequestCommand
.
getTaskExecutionContext
();
TaskExecutionContext
taskExecutionContext
=
JSON
Object
.
parseObject
(
contextJson
,
TaskExecutionContext
.
class
);
TaskExecutionContext
taskExecutionContext
=
JSON
Utils
.
parseObject
(
contextJson
,
TaskExecutionContext
.
class
);
taskExecutionContext
.
setHost
(
OSUtils
.
getHost
()
+
":"
+
workerConfig
.
getListenPort
());
// local execute path
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
浏览文件 @
a9f72c20
...
...
@@ -80,7 +80,7 @@ public class TaskExecuteThread implements Runnable {
try
{
logger
.
info
(
"script path : {}"
,
taskExecutionContext
.
getExecutePath
());
// task node
TaskNode
taskNode
=
JSON
Object
.
parseObject
(
taskExecutionContext
.
getTaskJson
(),
TaskNode
.
class
);
TaskNode
taskNode
=
JSON
Utils
.
parseObject
(
taskExecutionContext
.
getTaskJson
(),
TaskNode
.
class
);
// copy hdfs/minio file to local
downloadResource
(
taskExecutionContext
.
getExecutePath
(),
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
浏览文件 @
a9f72c20
...
...
@@ -28,10 +28,7 @@ import org.apache.dolphinscheduler.common.process.HttpProperty;
import
org.apache.dolphinscheduler.common.process.Property
;
import
org.apache.dolphinscheduler.common.task.AbstractParameters
;
import
org.apache.dolphinscheduler.common.task.http.HttpParameters
;
import
org.apache.dolphinscheduler.common.utils.CollectionUtils
;
import
org.apache.dolphinscheduler.common.utils.DateUtils
;
import
org.apache.dolphinscheduler.common.utils.ParameterUtils
;
import
org.apache.dolphinscheduler.common.utils.StringUtils
;
import
org.apache.dolphinscheduler.common.utils.*
;
import
org.apache.dolphinscheduler.server.entity.TaskExecutionContext
;
import
org.apache.dolphinscheduler.server.utils.ParamUtils
;
import
org.apache.dolphinscheduler.server.worker.task.AbstractTask
;
...
...
@@ -102,7 +99,7 @@ public class HttpTask extends AbstractTask {
@Override
public
void
init
()
{
logger
.
info
(
"http task params {}"
,
taskExecutionContext
.
getTaskParams
());
this
.
httpParameters
=
JSON
Object
.
parseObject
(
taskExecutionContext
.
getTaskParams
(),
HttpParameters
.
class
);
this
.
httpParameters
=
JSON
Utils
.
parseObject
(
taskExecutionContext
.
getTaskParams
(),
HttpParameters
.
class
);
if
(!
httpParameters
.
checkParameters
())
{
throw
new
RuntimeException
(
"http task params is not valid"
);
...
...
@@ -152,7 +149,7 @@ public class HttpTask extends AbstractTask {
List
<
HttpProperty
>
httpPropertyList
=
new
ArrayList
<>();
if
(
CollectionUtils
.
isNotEmpty
(
httpParameters
.
getHttpParams
()
)){
for
(
HttpProperty
httpProperty:
httpParameters
.
getHttpParams
())
{
String
jsonObject
=
JSON
.
toJSON
String
(
httpProperty
);
String
jsonObject
=
JSON
Utils
.
toJson
String
(
httpProperty
);
String
params
=
ParameterUtils
.
convertParameterPlaceholders
(
jsonObject
,
ParamUtils
.
convert
(
paramsMap
));
logger
.
info
(
"http request params:{}"
,
params
);
httpPropertyList
.
add
(
JSON
.
parseObject
(
params
,
HttpProperty
.
class
));
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
浏览文件 @
a9f72c20
...
...
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.process.Property;
import
org.apache.dolphinscheduler.common.task.AbstractParameters
;
import
org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters
;
import
org.apache.dolphinscheduler.common.utils.CollectionUtils
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
org.apache.dolphinscheduler.common.utils.ParameterUtils
;
import
org.apache.dolphinscheduler.dao.datasource.BaseDataSource
;
import
org.apache.dolphinscheduler.dao.datasource.DataSourceFactory
;
...
...
@@ -74,7 +75,7 @@ public class ProcedureTask extends AbstractTask {
logger
.
info
(
"procedure task params {}"
,
taskExecutionContext
.
getTaskParams
());
this
.
procedureParameters
=
JSON
Object
.
parseObject
(
taskExecutionContext
.
getTaskParams
(),
ProcedureParameters
.
class
);
this
.
procedureParameters
=
JSON
Utils
.
parseObject
(
taskExecutionContext
.
getTaskParams
(),
ProcedureParameters
.
class
);
// check parameters
...
...
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
浏览文件 @
a9f72c20
...
...
@@ -86,7 +86,7 @@ public class SqlTask extends AbstractTask {
this
.
taskExecutionContext
=
taskExecutionContext
;
logger
.
info
(
"sql task params {}"
,
taskExecutionContext
.
getTaskParams
());
this
.
sqlParameters
=
JSON
Object
.
parseObject
(
taskExecutionContext
.
getTaskParams
(),
SqlParameters
.
class
);
this
.
sqlParameters
=
JSON
Utils
.
parseObject
(
taskExecutionContext
.
getTaskParams
(),
SqlParameters
.
class
);
if
(!
sqlParameters
.
checkParameters
())
{
throw
new
RuntimeException
(
"sql task params is not valid"
);
...
...
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
浏览文件 @
a9f72c20
...
...
@@ -85,7 +85,7 @@ public class MasterExecThreadTest {
Map
<
String
,
String
>
cmdParam
=
new
HashMap
<>();
cmdParam
.
put
(
CMDPARAM_COMPLEMENT_DATA_START_DATE
,
"2020-01-01 00:00:00"
);
cmdParam
.
put
(
CMDPARAM_COMPLEMENT_DATA_END_DATE
,
"2020-01-31 23:00:00"
);
Mockito
.
when
(
processInstance
.
getCommandParam
()).
thenReturn
(
JSON
.
toJSON
String
(
cmdParam
));
Mockito
.
when
(
processInstance
.
getCommandParam
()).
thenReturn
(
JSON
Utils
.
toJson
String
(
cmdParam
));
ProcessDefinition
processDefinition
=
new
ProcessDefinition
();
processDefinition
.
setGlobalParamMap
(
Collections
.
EMPTY_MAP
);
processDefinition
.
setGlobalParamList
(
Collections
.
EMPTY_LIST
);
...
...
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
浏览文件 @
a9f72c20
...
...
@@ -95,7 +95,7 @@ public class ParamsTest {
Map
<
String
,
Property
>
paramsMap
=
ParamUtils
.
convert
(
globalParams
,
globalParamsMap
,
localParams
,
CommandType
.
START_PROCESS
,
new
Date
());
logger
.
info
(
JSON
.
toJSON
String
(
paramsMap
));
logger
.
info
(
JSON
Utils
.
toJson
String
(
paramsMap
));
}
...
...
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
浏览文件 @
a9f72c20
...
...
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import
org.apache.dolphinscheduler.common.enums.DataType
;
import
org.apache.dolphinscheduler.common.enums.Direct
;
import
org.apache.dolphinscheduler.common.process.Property
;
import
org.apache.dolphinscheduler.common.utils.JSONUtils
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.slf4j.Logger
;
...
...
@@ -92,7 +93,7 @@ public class ParamUtilsTest {
//Invoke convert
Map
<
String
,
Property
>
paramsMap
=
ParamUtils
.
convert
(
globalParams
,
globalParamsMap
,
localParams
,
CommandType
.
START_PROCESS
,
date
);
String
result
=
JSON
.
toJSON
String
(
paramsMap
);
String
result
=
JSON
Utils
.
toJson
String
(
paramsMap
);
assertEquals
(
expected
,
result
);
for
(
Map
.
Entry
<
String
,
Property
>
entry
:
paramsMap
.
entrySet
())
{
...
...
@@ -104,7 +105,7 @@ public class ParamUtilsTest {
//Invoke convert with null globalParams
Map
<
String
,
Property
>
paramsMap1
=
ParamUtils
.
convert
(
null
,
globalParamsMap
,
localParams
,
CommandType
.
START_PROCESS
,
date
);
String
result1
=
JSON
.
toJSON
String
(
paramsMap1
);
String
result1
=
JSON
Utils
.
toJson
String
(
paramsMap1
);
assertEquals
(
expected1
,
result1
);
//Null check, invoke convert with null globalParams and null localParams
...
...
@@ -123,7 +124,7 @@ public class ParamUtilsTest {
//Invoke convert
Map
<
String
,
String
>
paramsMap
=
ParamUtils
.
convert
(
globalParams
);
String
result
=
JSON
.
toJSON
String
(
paramsMap
);
String
result
=
JSON
Utils
.
toJson
String
(
paramsMap
);
assertEquals
(
expected
,
result
);
logger
.
info
(
result
);
...
...
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
浏览文件 @
a9f72c20
...
...
@@ -331,7 +331,7 @@ public class ProcessService {
for
(
TaskNode
taskNode
:
taskNodeList
){
String
parameter
=
taskNode
.
getParams
();
if
(
parameter
.
contains
(
CMDPARAM_SUB_PROCESS_DEFINE_ID
)){
SubProcessParameters
subProcessParam
=
JSON
.
parseObject
(
parameter
,
SubProcessParameters
.
class
);
SubProcessParameters
subProcessParam
=
JSON
Utils
.
parseObject
(
parameter
,
SubProcessParameters
.
class
);
ids
.
add
(
subProcessParam
.
getProcessDefinitionId
());
recurseFindSubProcessId
(
subProcessParam
.
getProcessDefinitionId
(),
ids
);
}
...
...
@@ -366,7 +366,7 @@ public class ProcessService {
processInstance
.
getFailureStrategy
(),
processInstance
.
getExecutorId
(),
processInstance
.
getProcessDefinitionId
(),
JSONUtils
.
toJson
(
cmdParam
),
JSONUtils
.
toJson
String
(
cmdParam
),
processInstance
.
getWarningType
(),
processInstance
.
getWarningGroupId
(),
processInstance
.
getScheduleTime
(),
...
...
@@ -386,7 +386,7 @@ public class ProcessService {
originCommand
.
setId
(
0
);
originCommand
.
setCommandType
(
CommandType
.
RECOVER_WAITTING_THREAD
);
originCommand
.
setUpdateTime
(
new
Date
());
originCommand
.
setCommandParam
(
JSONUtils
.
toJson
(
cmdParam
));
originCommand
.
setCommandParam
(
JSONUtils
.
toJson
String
(
cmdParam
));
originCommand
.
setProcessInstancePriority
(
processInstance
.
getProcessInstancePriority
());
saveCommand
(
originCommand
);
}
...
...
@@ -600,7 +600,7 @@ public class ProcessService {
}
cmdParam
.
put
(
Constants
.
CMDPARAM_RECOVERY_START_NODE_STRING
,
String
.
join
(
Constants
.
COMMA
,
convertIntListToString
(
failedList
)));
processInstance
.
setCommandParam
(
JSONUtils
.
toJson
(
cmdParam
));
processInstance
.
setCommandParam
(
JSONUtils
.
toJson
String
(
cmdParam
));
processInstance
.
setRunTimes
(
runTime
+
1
);
break
;
case
START_CURRENT_TASK_PROCESS:
...
...
@@ -619,7 +619,7 @@ public class ProcessService {
initTaskInstance
(
this
.
findTaskInstanceById
(
taskId
));
}
cmdParam
.
put
(
Constants
.
CMDPARAM_RECOVERY_START_NODE_STRING
,
String
.
join
(
","
,
convertIntListToString
(
suspendedNodeList
)));
processInstance
.
setCommandParam
(
JSONUtils
.
toJson
(
cmdParam
));
processInstance
.
setCommandParam
(
JSONUtils
.
toJson
String
(
cmdParam
));
processInstance
.
setRunTimes
(
runTime
+
1
);
break
;
case
RECOVER_TOLERANCE_FAULT_PROCESS:
...
...
@@ -639,7 +639,7 @@ public class ProcessService {
// delete the recover task names from command parameter
if
(
cmdParam
.
containsKey
(
Constants
.
CMDPARAM_RECOVERY_START_NODE_STRING
)){
cmdParam
.
remove
(
Constants
.
CMDPARAM_RECOVERY_START_NODE_STRING
);
processInstance
.
setCommandParam
(
JSONUtils
.
toJson
(
cmdParam
));
processInstance
.
setCommandParam
(
JSONUtils
.
toJson
String
(
cmdParam
));
}
// delete all the valid tasks when repeat running
List
<
TaskInstance
>
validTaskList
=
findValidTaskListByProcessId
(
processInstance
.
getId
());
...
...
@@ -717,7 +717,7 @@ public class ProcessService {
&&
CMDPARAM_EMPTY_SUB_PROCESS
.
equals
(
paramMap
.
get
(
CMDPARAM_SUB_PROCESS
))){
paramMap
.
remove
(
CMDPARAM_SUB_PROCESS
);
paramMap
.
put
(
CMDPARAM_SUB_PROCESS
,
String
.
valueOf
(
subProcessInstance
.
getId
()));
subProcessInstance
.
setCommandParam
(
JSONUtils
.
toJson
(
paramMap
));
subProcessInstance
.
setCommandParam
(
JSONUtils
.
toJson
String
(
paramMap
));
subProcessInstance
.
setIsSubProcess
(
Flag
.
YES
);
this
.
saveProcessInstance
(
subProcessInstance
);
}
...
...
@@ -761,7 +761,7 @@ public class ProcessService {
subPropertyList
.
add
(
parent
);
}
}
return
JSONUtils
.
toJson
(
subPropertyList
);
return
JSONUtils
.
toJson
String
(
subPropertyList
);
}
/**
...
...
@@ -896,7 +896,7 @@ public class ProcessService {
updateProcessInstance
(
childInstance
);
}
// set sub work process command
String
processMapStr
=
JSONUtils
.
toJson
(
instanceMap
);
String
processMapStr
=
JSONUtils
.
toJson
String
(
instanceMap
);
Map
<
String
,
String
>
cmdParam
=
JSONUtils
.
toMap
(
processMapStr
);
if
(
commandType
==
CommandType
.
COMPLEMENT_DATA
||
...
...
@@ -906,7 +906,7 @@ public class ProcessService {
String
startTime
=
parentParam
.
get
(
CMDPARAM_COMPLEMENT_DATA_START_DATE
);
cmdParam
.
put
(
CMDPARAM_COMPLEMENT_DATA_END_DATE
,
endTime
);
cmdParam
.
put
(
CMDPARAM_COMPLEMENT_DATA_START_DATE
,
startTime
);
processMapStr
=
JSONUtils
.
toJson
(
cmdParam
);
processMapStr
=
JSONUtils
.
toJson
String
(
cmdParam
);
}
updateSubProcessDefinitionByParent
(
parentProcessInstance
,
childDefineId
);
...
...
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
浏览文件 @
a9f72c20
...
...
@@ -334,7 +334,7 @@ public class QuartzExecutors {
Map
<
String
,
Object
>
dataMap
=
new
HashMap
<>(
3
);
dataMap
.
put
(
PROJECT_ID
,
projectId
);
dataMap
.
put
(
SCHEDULE_ID
,
scheduleId
);
dataMap
.
put
(
SCHEDULE
,
JSONUtils
.
toJson
(
schedule
));
dataMap
.
put
(
SCHEDULE
,
JSONUtils
.
toJson
String
(
schedule
));
return
dataMap
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录