未验证 提交 75a26de5 编写于 作者: K Kirs 提交者: GitHub

[1.3.9-prepare#6387][Task] sql limit param no default value (#6521)

* [1.3.9-prepare#6387][Task] sql limit param no default value

issue #6337
pr #6387

* [1.3.9-prepare#6387][Task] sql limit param no default value

issue #6337
pr #6387

* [1.3.9-prepare#6387][Task] sql limit param no default value

issue #6337
pr #6387

* [#6337]When the SQL result reaches the limit value, increase the log result prompt
上级 f28faaea
......@@ -75,17 +75,18 @@ import org.slf4j.Logger;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
/**
* sql task
*/
public class SqlTask extends AbstractTask {
/**
* sql parameters
* sql parameters
*/
private SqlParameters sqlParameters;
/**
* alert dao
* alert dao
*/
private AlertDao alertDao;
/**
......@@ -98,6 +99,11 @@ public class SqlTask extends AbstractTask {
*/
private TaskExecutionContext taskExecutionContext;
/**
* default query sql limit
*/
private static final int QUERY_LIMIT = 10000;
public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
......@@ -168,10 +174,11 @@ public class SqlTask extends AbstractTask {
/**
* ready to execute SQL and parameter entity Map
*
* @return SqlBinds
*/
private SqlBinds getSqlAndSqlParamsMap(String sql) {
Map<Integer,Property> sqlParamsMap = new HashMap<>();
Map<Integer, Property> sqlParamsMap = new HashMap<>();
StringBuilder sqlBuilder = new StringBuilder();
// find process instance by task id
......@@ -186,17 +193,17 @@ public class SqlTask extends AbstractTask {
sqlBuilder.append(sql);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
// spell SQL according to the final user-defined variable
if (StringUtils.isNotEmpty(sqlParameters.getTitle())){
if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(),
ParamUtils.convert(paramsMap));
logger.info("SQL title : {}",title);
logger.info("SQL title : {}", title);
sqlParameters.setTitle(title);
}
//new
//replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job
sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime());
......@@ -220,15 +227,16 @@ public class SqlTask extends AbstractTask {
/**
* execute function and sql
* @param mainSqlBinds main sql binds
* @param preStatementsBinds pre statements binds
* @param postStatementsBinds post statements binds
* @param createFuncs create functions
*
* @param mainSqlBinds main sql binds
* @param preStatementsBinds pre statements binds
* @param postStatementsBinds post statements binds
* @param createFuncs create functions
*/
public void executeFuncAndSql(SqlBinds mainSqlBinds,
List<SqlBinds> preStatementsBinds,
List<SqlBinds> postStatementsBinds,
List<String> createFuncs) throws Exception {
List<SqlBinds> preStatementsBinds,
List<SqlBinds> postStatementsBinds,
List<String> createFuncs) throws Exception {
Connection connection = null;
PreparedStatement stmt = null;
ResultSet resultSet = null;
......@@ -239,11 +247,11 @@ public class SqlTask extends AbstractTask {
connection = createConnection();
// create temp function
if (CollectionUtils.isNotEmpty(createFuncs)) {
createTempFunction(connection,createFuncs);
createTempFunction(connection, createFuncs);
}
// pre sql
preSql(connection,preStatementsBinds);
preSql(connection, preStatementsBinds);
stmt = prepareStatementAndBind(connection, mainSqlBinds);
// decide whether to executeQuery or executeUpdate based on sqlType
......@@ -257,13 +265,13 @@ public class SqlTask extends AbstractTask {
stmt.executeUpdate();
}
postSql(connection,postStatementsBinds);
postSql(connection, postStatementsBinds);
} catch (Exception e) {
logger.error("execute sql error: {}", e.getMessage());
throw e;
} finally {
close(resultSet,stmt,connection);
close(resultSet, stmt, connection);
}
}
......@@ -271,16 +279,17 @@ public class SqlTask extends AbstractTask {
* result process
*
* @param resultSet resultSet
* @throws Exception
*/
private void resultProcess(ResultSet resultSet) throws Exception{
private void resultProcess(ResultSet resultSet) throws Exception {
JSONArray resultJSONArray = new JSONArray();
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
int rowCount = 0;
while (rowCount < sqlParameters.getLimit() && resultSet.next()) {
int limit = sqlParameters.getLimit() == 0 ? QUERY_LIMIT : sqlParameters.getLimit();
while (rowCount < limit && resultSet.next()) {
JSONObject mapOfColValues = new JSONObject(true);
for (int i = 1; i <= num; i++) {
mapOfColValues.put(md.getColumnLabel(i), resultSet.getObject(i));
......@@ -288,16 +297,22 @@ public class SqlTask extends AbstractTask {
resultJSONArray.add(mapOfColValues);
rowCount++;
}
String result = JSONUtils.toJsonString(resultJSONArray);
logger.debug("execute sql result : {}", result);
int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows() : Constants.DEFAULT_DISPLAY_ROWS;
displayRows = Math.min(displayRows, resultJSONArray.size());
logger.info("display sql result {} rows as follows:", displayRows);
for (int i = 0; i < displayRows; i++) {
String row = JSONUtils.toJsonString(resultJSONArray.get(i));
logger.info("row {} : {}", i + 1, row);
}
if (resultSet.next()) {
logger.info("sql result limit : {} exceeding results are filtered", limit);
resultJSONArray.add(String.format("sql result limit : %d exceeding results are filtered", limit));
}
if (sqlParameters.getSendEmail() == null || sqlParameters.getSendEmail()) {
sendAttachment(StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets",
......@@ -306,17 +321,17 @@ public class SqlTask extends AbstractTask {
}
/**
* pre sql
* pre sql
*
* @param connection connection
* @param preStatementsBinds preStatementsBinds
*/
private void preSql(Connection connection,
List<SqlBinds> preStatementsBinds) throws Exception{
for (SqlBinds sqlBind: preStatementsBinds) {
try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){
List<SqlBinds> preStatementsBinds) throws Exception {
for (SqlBinds sqlBind : preStatementsBinds) {
try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)) {
int result = pstmt.executeUpdate();
logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql());
logger.info("pre statement execute result: {}, for sql: {}", result, sqlBind.getSql());
}
}
......@@ -327,26 +342,25 @@ public class SqlTask extends AbstractTask {
*
* @param connection connection
* @param postStatementsBinds postStatementsBinds
* @throws Exception
*/
private void postSql(Connection connection,
List<SqlBinds> postStatementsBinds) throws Exception{
for (SqlBinds sqlBind: postStatementsBinds) {
try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){
List<SqlBinds> postStatementsBinds) throws Exception {
for (SqlBinds sqlBind : postStatementsBinds) {
try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)) {
int result = pstmt.executeUpdate();
logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql());
logger.info("post statement execute result: {},for sql: {}", result, sqlBind.getSql());
}
}
}
/**
* create temp function
*
* @param connection connection
* @param createFuncs createFuncs
* @throws Exception
*/
private void createTempFunction(Connection connection,
List<String> createFuncs) throws Exception{
List<String> createFuncs) throws Exception {
try (Statement funcStmt = connection.createStatement()) {
for (String createFunc : createFuncs) {
logger.info("hive create function sql: {}", createFunc);
......@@ -354,14 +368,14 @@ public class SqlTask extends AbstractTask {
}
}
}
/**
* create connection
*
* @return connection
* @throws Exception Exception
*/
private Connection createConnection() throws Exception{
private Connection createConnection() throws Exception {
// if hive , load connection params if exists
Connection connection = null;
if (HIVE == DbType.valueOf(sqlParameters.getType())) {
......@@ -375,7 +389,7 @@ public class SqlTask extends AbstractTask {
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
paramProp);
}else{
} else {
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
baseDataSource.getUser(),
baseDataSource.getPassword());
......@@ -384,7 +398,7 @@ public class SqlTask extends AbstractTask {
}
/**
* close jdbc resource
* close jdbc resource
*
* @param resultSet resultSet
* @param pstmt pstmt
......@@ -392,36 +406,37 @@ public class SqlTask extends AbstractTask {
*/
private void close(ResultSet resultSet,
PreparedStatement pstmt,
Connection connection){
if (resultSet != null){
Connection connection) {
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException e) {
logger.error("close result set error : {}",e.getMessage(),e);
logger.error("close result set error : {}", e.getMessage(), e);
}
}
if (pstmt != null){
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException e) {
logger.error("close prepared statement error : {}",e.getMessage(),e);
logger.error("close prepared statement error : {}", e.getMessage(), e);
}
}
if (connection != null){
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
logger.error("close connection error : {}",e.getMessage(),e);
logger.error("close connection error : {}", e.getMessage(), e);
}
}
}
/**
* preparedStatement bind
*
* @param connection connection
* @param sqlBinds sqlBinds
* @param sqlBinds sqlBinds
* @return PreparedStatement
* @throws Exception Exception
*/
......@@ -430,11 +445,11 @@ public class SqlTask extends AbstractTask {
boolean timeoutFlag = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED ||
TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql());
if(timeoutFlag){
if (timeoutFlag) {
stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
}
Map<Integer, Property> params = sqlBinds.getParamsMap();
if(params != null) {
if (params != null) {
for (Map.Entry<Integer, Property> entry : params.entrySet()) {
Property prop = entry.getValue();
ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
......@@ -446,23 +461,24 @@ public class SqlTask extends AbstractTask {
/**
* send mail as an attachment
* @param title title
* @param content content
*
* @param title title
* @param content content
*/
public void sendAttachment(String title,String content){
public void sendAttachment(String title, String content) {
List<User> users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId());
// receiving group list
List<String> receiversList = new ArrayList<>();
for(User user:users){
for (User user : users) {
receiversList.add(user.getEmail().trim());
}
// custom receiver
String receivers = sqlParameters.getReceivers();
if (StringUtils.isNotEmpty(receivers)){
if (StringUtils.isNotEmpty(receivers)) {
String[] splits = receivers.split(COMMA);
for (String receiver : splits){
for (String receiver : splits) {
receiversList.add(receiver.trim());
}
}
......@@ -471,49 +487,49 @@ public class SqlTask extends AbstractTask {
List<String> receiversCcList = new ArrayList<>();
// Custom Copier
String receiversCc = sqlParameters.getReceiversCc();
if (StringUtils.isNotEmpty(receiversCc)){
if (StringUtils.isNotEmpty(receiversCc)) {
String[] splits = receiversCc.split(COMMA);
for (String receiverCc : splits){
for (String receiverCc : splits) {
receiversCcList.add(receiverCc.trim());
}
}
String showTypeName = sqlParameters.getShowType().replace(COMMA,"").trim();
if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
String showTypeName = sqlParameters.getShowType().replace(COMMA, "").trim();
if (EnumUtils.isValidEnum(ShowType.class, showTypeName)) {
Map<String, Object> mailResult = MailUtils.sendMails(receiversList,
receiversCcList, title, content, ShowType.valueOf(showTypeName).getDescp());
if(!(boolean) mailResult.get(STATUS)){
if (!(boolean) mailResult.get(STATUS)) {
throw new RuntimeException("send mail failed!");
}
}else{
logger.error("showType: {} is not valid " ,showTypeName);
throw new RuntimeException(String.format("showType: %s is not valid ",showTypeName));
} else {
logger.error("showType: {} is not valid ", showTypeName);
throw new RuntimeException(String.format("showType: %s is not valid ", showTypeName));
}
}
/**
* regular expressions match the contents between two specified strings
* @param content content
* @param rgex rgex
* @param sqlParamsMap sql params map
* @param paramsPropsMap params props map
*
* @param content content
* @param rgex rgex
* @param sqlParamsMap sql params map
* @param paramsPropsMap params props map
*/
public void setSqlParamsMap(String content, String rgex, Map<Integer,Property> sqlParamsMap, Map<String,Property> paramsPropsMap){
public void setSqlParamsMap(String content, String rgex, Map<Integer, Property> sqlParamsMap, Map<String, Property> paramsPropsMap) {
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
int index = 1;
while (m.find()) {
String paramName = m.group(1);
Property prop = paramsPropsMap.get(paramName);
Property prop = paramsPropsMap.get(paramName);
if (prop == null) {
logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
+ " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskExecutionContext.getTaskInstanceId());
}
else {
sqlParamsMap.put(index,prop);
index ++;
} else {
sqlParamsMap.put(index, prop);
index++;
logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content);
}
......@@ -522,22 +538,22 @@ public class SqlTask extends AbstractTask {
/**
* print replace sql
* @param content content
* @param formatSql format sql
* @param rgex rgex
* @param sqlParamsMap sql params map
*
* @param content content
* @param formatSql format sql
* @param rgex rgex
* @param sqlParamsMap sql params map
*/
public void printReplacedSql(String content, String formatSql,String rgex, Map<Integer,Property> sqlParamsMap){
public void printReplacedSql(String content, String formatSql, String rgex, Map<Integer, Property> sqlParamsMap) {
//parameter print style
logger.info("after replace sql , preparing : {}" , formatSql);
logger.info("after replace sql , preparing : {}", formatSql);
StringBuilder logPrint = new StringBuilder("replaced sql , parameters:");
if (sqlParamsMap == null) {
logger.info("printReplacedSql: sqlParamsMap is null.");
}
else {
for(int i=1;i<=sqlParamsMap.size();i++){
logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")");
}
} else {
for (int i = 1; i <= sqlParamsMap.size(); i++) {
logPrint.append(sqlParamsMap.get(i).getValue() + "(" + sqlParamsMap.get(i).getType() + ")");
}
}
logger.info("Sql Params are {}", logPrint);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册