package com.xxl.job.core.thread; import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.RegistryConfig; import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.util.FileUtil; import com.xxl.job.core.util.JacksonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * Created by xuxueli on 16/7/22. */ public class TriggerCallbackThread { private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class); private static TriggerCallbackThread instance = new TriggerCallbackThread(); public static TriggerCallbackThread getInstance(){ return instance; } /** * job results callback queue */ private LinkedBlockingQueue callBackQueue = new LinkedBlockingQueue(); public static void pushCallBack(HandleCallbackParam callback){ getInstance().callBackQueue.add(callback); logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId()); } /** * callback thread */ private Thread triggerCallbackThread; private Thread triggerRetryCallbackThread; private volatile boolean toStop = false; public void start() { // valid if (XxlJobExecutor.getAdminBizList() == null) { logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null."); return; } // callback triggerCallbackThread = new Thread(new Runnable() { @Override public void run() { // normal callback while(!toStop){ try { HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { // callback list param List callbackParamList = new ArrayList(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); // callback, will retry if error if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } } // last callback try { List callbackParamList = new ArrayList(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory."); } }); triggerCallbackThread.setDaemon(true); triggerCallbackThread.start(); // retry triggerRetryCallbackThread = new Thread(new Runnable() { @Override public void run() { while(!toStop){ try { retryFailCallbackFile(); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } try { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory."); } }); triggerRetryCallbackThread.setDaemon(true); triggerRetryCallbackThread.start(); } public void toStop(){ toStop = true; // stop callback, interrupt and wait triggerCallbackThread.interrupt(); try { triggerCallbackThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } // stop retry, interrupt and wait triggerRetryCallbackThread.interrupt(); try { triggerRetryCallbackThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } /** * do callback, will retry if error * @param callbackParamList */ private void doCallback(List callbackParamList){ boolean callbackRet = false; // callback, will retry if error for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT callbackResult = adminBiz.callback(callbackParamList); if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { callbackLog(callbackParamList, "
----------- xxl-job job callback finish."); callbackRet = true; break; } else { callbackLog(callbackParamList, "
----------- xxl-job job callback fail, callbackResult:" + callbackResult); } } catch (Exception e) { callbackLog(callbackParamList, "
----------- xxl-job job callback error, errorMsg:" + e.getMessage()); } } if (!callbackRet) { appendFailCallbackFile(callbackParamList); } } /** * callback log */ private void callbackLog(List callbackParamList, String logContent){ for (HandleCallbackParam callbackParam: callbackParamList) { String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId()); XxlJobFileAppender.contextHolder.set(logFileName); XxlJobLogger.log(logContent); } } // ---------------------- fail-callback file ---------------------- private static String failCallbackFileName = XxlJobFileAppender.getLogPath().concat(File.separator).concat("xxl-job-callback").concat(".log"); private void appendFailCallbackFile(List callbackParamList){ // append file String content = JacksonUtil.writeValueAsString(callbackParamList); FileUtil.appendFileLine(failCallbackFileName, content); } private void retryFailCallbackFile(){ // load and clear file List fileLines = FileUtil.loadFileLines(failCallbackFileName); FileUtil.deleteFile(failCallbackFileName); // parse List failCallbackParamList = new ArrayList<>(); if (fileLines!=null && fileLines.size()>0) { for (String line: fileLines) { List failCallbackParamListTmp = JacksonUtil.readValue(line, List.class, HandleCallbackParam.class); if (failCallbackParamListTmp!=null && failCallbackParamListTmp.size()>0) { failCallbackParamList.addAll(failCallbackParamListTmp); } } } // retry callback, 100 lines per page if (failCallbackParamList.size()>0) { int pagesize = 100; List pageData = new ArrayList<>(); for (int i = 0; i < failCallbackParamList.size(); i++) { pageData.add(failCallbackParamList.get(i)); if (i>0 && i%pagesize == 0) { doCallback(pageData); pageData.clear(); } } if (pageData.size() > 0) { doCallback(pageData); } } } }