From f803c968cb326777f832dbf62fe2c715f1545667 Mon Sep 17 00:00:00 2001 From: Alvari Date: Tue, 28 Jan 2025 12:41:17 +1300 Subject: [PATCH] refactored 2 instances of concatenating a string within a loop for performance reasons. Also added a missing override annotation to ErrorCode.java to align with codestyle conventions --- .../core/thread/JobTriggerPoolHelper.java | 400 +++++++++--------- .../cn/admin/service/impl/AdminBizImpl.java | 360 +++++++++------- .../com/larkmt/cn/admin/util/ErrorCode.java | 36 +- 3 files changed, 422 insertions(+), 374 deletions(-) diff --git a/larkmidtable-web/larkmt-admin/src/main/java/com/larkmt/cn/admin/core/thread/JobTriggerPoolHelper.java b/larkmidtable-web/larkmt-admin/src/main/java/com/larkmt/cn/admin/core/thread/JobTriggerPoolHelper.java index 15c66fc6..98b6dda1 100644 --- a/larkmidtable-web/larkmt-admin/src/main/java/com/larkmt/cn/admin/core/thread/JobTriggerPoolHelper.java +++ b/larkmidtable-web/larkmt-admin/src/main/java/com/larkmt/cn/admin/core/thread/JobTriggerPoolHelper.java @@ -4,29 +4,26 @@ import cn.hutool.core.util.IdUtil; import com.larkmt.cn.admin.core.conf.ExcecutorConfig; import com.larkmt.cn.admin.core.conf.JobAdminConfig; -import com.larkmt.cn.admin.core.trigger.TriggerTypeEnum; import com.larkmt.cn.admin.core.trigger.JobTrigger; +import com.larkmt.cn.admin.core.trigger.TriggerTypeEnum; import com.larkmt.cn.admin.entity.JobInfo; import com.larkmt.cn.admin.entity.JobLog; import com.larkmt.core.biz.model.ReturnT; import com.larkmt.core.log.JobLogger; import com.larkmt.core.util.Constants; -import com.larkmt.core.util.DateUtil; import com.larkmt.core.util.ProcessUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.io.FileNotFoundException; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; -import java.text.ParseException; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * job trigger thread pool helper @@ -34,202 +31,213 @@ * @author xuxueli 2018-07-03 21:08:07 */ public class JobTriggerPoolHelper { - private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class); - - - // ---------------------- trigger pool ---------------------- - - // fast/slow thread pool - private ThreadPoolExecutor fastTriggerPool = null; - private ThreadPoolExecutor slowTriggerPool = null; - - public void start() { - fastTriggerPool = new ThreadPoolExecutor( - 10, - JobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue(1000), - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "LarkMidTable, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()); - } - }); - - slowTriggerPool = new ThreadPoolExecutor( - 10, - JobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), - 60L, - TimeUnit.SECONDS, - new LinkedBlockingQueue(2000), - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "LarkMidTable, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()); - } - }); + private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class); + + // ---------------------- trigger pool ---------------------- + + // fast/slow thread pool + private ThreadPoolExecutor fastTriggerPool = null; + private ThreadPoolExecutor slowTriggerPool = null; + + public void start() { + fastTriggerPool = + new ThreadPoolExecutor( + 10, + JobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue(1000), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread( + r, "LarkMidTable, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()); + } + }); + + slowTriggerPool = + new ThreadPoolExecutor( + 10, + JobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue(2000), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread( + r, "LarkMidTable, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()); + } + }); + } + + public void stop() { + // triggerPool.shutdown(); + fastTriggerPool.shutdownNow(); + slowTriggerPool.shutdownNow(); + logger.info(">>>>>>>>> LarkMidTable trigger thread pool shutdown success."); + } + + // job timeout count + private volatile long minTim = System.currentTimeMillis() / 60000; // ms > min + private volatile ConcurrentMap jobTimeoutCountMap = + new ConcurrentHashMap<>(); + + /** add trigger */ + public void addTrigger( + final int jobId, + final TriggerTypeEnum triggerType, + final int failRetryCount, + final String executorShardingParam, + final String executorParam) { + + // choose thread pool + ThreadPoolExecutor triggerPool_ = fastTriggerPool; + AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); + if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min + triggerPool_ = slowTriggerPool; } - - - public void stop() { - //triggerPool.shutdown(); - fastTriggerPool.shutdownNow(); - slowTriggerPool.shutdownNow(); - logger.info(">>>>>>>>> LarkMidTable trigger thread pool shutdown success."); - } - - - // job timeout count - private volatile long minTim = System.currentTimeMillis() / 60000; // ms > min - private volatile ConcurrentMap jobTimeoutCountMap = new ConcurrentHashMap<>(); - - - /** - * add trigger - */ - public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) { - - // choose thread pool - ThreadPoolExecutor triggerPool_ = fastTriggerPool; - AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); - if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min - triggerPool_ = slowTriggerPool; - } - // trigger - triggerPool_.execute(() -> { - long start = System.currentTimeMillis(); - try { - // do trigger - JobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } finally { - // check timeout-count-map - long minTim_now = System.currentTimeMillis() / 60000; - if (minTim != minTim_now) { - minTim = minTim_now; - jobTimeoutCountMap.clear(); - } - // incr timeout-count-map - long cost = System.currentTimeMillis() - start; - if (cost > 500) { // ob-timeout threshold 500ms - AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)); - if (timeoutCount != null) { - timeoutCount.incrementAndGet(); - } - } + // trigger + triggerPool_.execute( + () -> { + long start = System.currentTimeMillis(); + try { + // do trigger + JobTrigger.trigger( + jobId, triggerType, failRetryCount, executorShardingParam, executorParam); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + // check timeout-count-map + long minTim_now = System.currentTimeMillis() / 60000; + if (minTim != minTim_now) { + minTim = minTim_now; + jobTimeoutCountMap.clear(); + } + // incr timeout-count-map + long cost = System.currentTimeMillis() - start; + if (cost > 500) { // ob-timeout threshold 500ms + AtomicInteger timeoutCount = + jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)); + if (timeoutCount != null) { + timeoutCount.incrementAndGet(); + } } + } }); + } + + // ---------------------- helper ---------------------- + + private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper(); + + public static void toStart() { + helper.start(); + } + + public static void toStop() { + helper.stop(); + } + + /** + * @param jobId + * @param triggerType + * @param failRetryCount >=0: use this param <0: use param from job info config + * @param executorShardingParam + * @param executorParam null: use job param not null: cover job param + */ + public static void trigger( + int jobId, + TriggerTypeEnum triggerType, + int failRetryCount, + String executorShardingParam, + String executorParam) { + helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam); + } + + public static String[] buildFlinkXExecutorCmd( + String flinkXShPath, String tmpFilePath, int jobId) { + long timestamp = System.currentTimeMillis(); + List cmdArr = new ArrayList<>(); + if (JobTriggerPoolHelper.isWindows()) { + cmdArr.add(Constants.CMDWINDOW); + cmdArr.add(flinkXShPath); + cmdArr.add(tmpFilePath); + } else { + cmdArr.add(Constants.CMDLINUX); + cmdArr.add(flinkXShPath); + cmdArr.add(tmpFilePath); } - - - // ---------------------- helper ---------------------- - - private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper(); - - public static void toStart() { - helper.start(); + String logHome = ExcecutorConfig.getExcecutorConfig().getFlinkxlogHome(); + File folder = new File(logHome); + if (!folder.exists() && !folder.isDirectory()) { + folder.mkdirs(); } - - public static void toStop() { - helper.stop(); + cmdArr.add(logHome + "/" + jobId + "" + timestamp + ".out"); + logger.info(cmdArr + " " + flinkXShPath + " " + tmpFilePath); + return cmdArr.toArray(new String[cmdArr.size()]); + } + + public static boolean isWindows() { + return System.getProperty("os.name").toLowerCase().contains("windows"); + } + + public static void runJob(int jobId) { + try { + JobInfo jobInfo = JobAdminConfig.getAdminConfig().getJobInfoMapper().loadById(jobId); + StringBuilder cmdstr = new StringBuilder(); + String tmpFilePath = ""; + String[] cmdarrayFinal = null; + tmpFilePath = generateTemJsonFile(jobInfo.getJobJson()); + cmdarrayFinal = + buildFlinkXExecutorCmd( + ExcecutorConfig.getExcecutorConfig().getFlinkxHome(), tmpFilePath, jobId); + for (int j = 0; j < cmdarrayFinal.length; j++) { + cmdstr.append(cmdarrayFinal[j] + " "); + } + final Process process = Runtime.getRuntime().exec(cmdstr.toString()); + String prcsId = ProcessUtil.getProcessId(process); + JobLogger.log("Execute: " + cmdstr); + JobLogger.log("process id: " + prcsId); + if (FileUtil.exist(tmpFilePath)) { + // FileUtil.del(new File(tmpFilePath)); + } + // 记录日志 + Calendar calendar = Calendar.getInstance(); + calendar.setTime(new Date()); + calendar.set(Calendar.MILLISECOND, 0); + Date triggerTime = calendar.getTime(); + JobLog jobLog = new JobLog(); + jobLog.setJobGroup(jobInfo.getJobGroup()); + jobLog.setJobId(jobInfo.getId()); + jobLog.setTriggerTime(triggerTime); + jobLog.setJobDesc(jobInfo.getJobDesc()); + jobLog.setHandleTime(triggerTime); + jobLog.setTriggerCode(ReturnT.SUCCESS_CODE); + jobLog.setHandleCode(0); + jobLog.setProcessId(prcsId); + // 设置job的执行路径 + jobLog.setExecutorAddress(cmdarrayFinal[3]); + JobAdminConfig.getAdminConfig().getJobLogMapper().save(jobLog); + } catch (Exception e) { + e.printStackTrace(); } + } - /** - * @param jobId - * @param triggerType - * @param failRetryCount >=0: use this param - * <0: use param from job info config - * @param executorShardingParam - * @param executorParam null: use job param - * not null: cover job param - */ - public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) { - helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam); + private static String generateTemJsonFile(String jobJson) { + String jsonPath = ""; + jsonPath = ExcecutorConfig.getExcecutorConfig().getFlinkxjsonPath(); + if (!FileUtil.exist(jsonPath)) { + FileUtil.mkdir(jsonPath); } - - public static String[] buildFlinkXExecutorCmd(String flinkXShPath, String tmpFilePath,int jobId) { - long timestamp = System.currentTimeMillis(); - List cmdArr = new ArrayList<>(); - if(JobTriggerPoolHelper.isWindows()) { - cmdArr.add(Constants.CMDWINDOW); - cmdArr.add(flinkXShPath); - cmdArr.add(tmpFilePath); - } else { - cmdArr.add(Constants.CMDLINUX); - cmdArr.add(flinkXShPath); - cmdArr.add(tmpFilePath); - } - String logHome = ExcecutorConfig.getExcecutorConfig().getFlinkxlogHome(); - File folder = new File(logHome); - if (!folder.exists() && !folder.isDirectory()) { - folder.mkdirs(); - } - cmdArr.add(logHome+"/"+jobId+""+timestamp+".out"); - logger.info(cmdArr + " " + flinkXShPath + " " + tmpFilePath); - return cmdArr.toArray(new String[cmdArr.size()]); - } - - public static boolean isWindows() { - return System.getProperty("os.name").toLowerCase().contains("windows"); - } - - public static void runJob(int jobId) { - try { - JobInfo jobInfo = JobAdminConfig.getAdminConfig().getJobInfoMapper().loadById(jobId); - String cmdstr = ""; - String tmpFilePath =""; - String[] cmdarrayFinal = null; - tmpFilePath = generateTemJsonFile(jobInfo.getJobJson()); - cmdarrayFinal = buildFlinkXExecutorCmd(ExcecutorConfig.getExcecutorConfig().getFlinkxHome(), tmpFilePath,jobId); - for (int j = 0; j < cmdarrayFinal.length; j++) { - cmdstr += cmdarrayFinal[j] + " "; - } - final Process process = Runtime.getRuntime().exec(cmdstr); - String prcsId = ProcessUtil.getProcessId(process); - JobLogger.log("Execute: " + cmdstr); - JobLogger.log("process id: " + prcsId); - if (FileUtil.exist(tmpFilePath)) { - // FileUtil.del(new File(tmpFilePath)); - } - // 记录日志 - Calendar calendar = Calendar.getInstance(); - calendar.setTime(new Date()); - calendar.set(Calendar.MILLISECOND, 0); - Date triggerTime = calendar.getTime(); - JobLog jobLog = new JobLog(); - jobLog.setJobGroup(jobInfo.getJobGroup()); - jobLog.setJobId(jobInfo.getId()); - jobLog.setTriggerTime(triggerTime); - jobLog.setJobDesc(jobInfo.getJobDesc()); - jobLog.setHandleTime(triggerTime); - jobLog.setTriggerCode(ReturnT.SUCCESS_CODE); - jobLog.setHandleCode(0); - jobLog.setProcessId(prcsId); - // 设置job的执行路径 - jobLog.setExecutorAddress(cmdarrayFinal[3]); - JobAdminConfig.getAdminConfig().getJobLogMapper().save(jobLog); - } catch (Exception e) { - e.printStackTrace(); - } - } - - private static String generateTemJsonFile(String jobJson) { - String jsonPath = ""; - jsonPath = ExcecutorConfig.getExcecutorConfig().getFlinkxjsonPath(); - if (!FileUtil.exist(jsonPath)) { - FileUtil.mkdir(jsonPath); - } - String tmpFilePath = jsonPath + "jobTmp-" + IdUtil.simpleUUID() + ".json"; - //jobJSON进行替换操作 - // 根据json写入到临时本地文件 - try (PrintWriter writer = new PrintWriter(tmpFilePath, "UTF-8")) { - writer.println(jobJson); - } catch (FileNotFoundException | UnsupportedEncodingException e) { - JobLogger.log("JSON 临时文件写入异常:" + e.getMessage()); - } - return tmpFilePath; - } - + String tmpFilePath = jsonPath + "jobTmp-" + IdUtil.simpleUUID() + ".json"; + // jobJSON进行替换操作 + // 根据json写入到临时本地文件 + try (PrintWriter writer = new PrintWriter(tmpFilePath, "UTF-8")) { + writer.println(jobJson); + } catch (FileNotFoundException | UnsupportedEncodingException e) { + JobLogger.log("JSON 临时文件写入异常:" + e.getMessage()); + } + return tmpFilePath; + } } diff --git a/larkmidtable-web/larkmt-admin/src/main/java/com/larkmt/cn/admin/service/impl/AdminBizImpl.java b/larkmidtable-web/larkmt-admin/src/main/java/com/larkmt/cn/admin/service/impl/AdminBizImpl.java index 7b01deb0..dc4b76d1 100644 --- a/larkmidtable-web/larkmt-admin/src/main/java/com/larkmt/cn/admin/service/impl/AdminBizImpl.java +++ b/larkmidtable-web/larkmt-admin/src/main/java/com/larkmt/cn/admin/service/impl/AdminBizImpl.java @@ -16,198 +16,238 @@ import com.larkmt.core.biz.model.ReturnT; import com.larkmt.core.enums.IncrementTypeEnum; import com.larkmt.core.handler.IJobHandler; +import java.text.MessageFormat; +import java.util.Date; +import java.util.List; +import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; -import javax.annotation.Resource; -import java.text.MessageFormat; -import java.util.Date; -import java.util.List; - /** * @author xuxueli 2017-07-27 21:54:20 */ @Service public class AdminBizImpl implements AdminBiz { - private static Logger logger = LoggerFactory.getLogger(AdminBizImpl.class); - - @Resource - public JobLogMapper jobLogMapper; - @Resource - private JobInfoMapper jobInfoMapper; - @Resource - private JobRegistryMapper jobRegistryMapper; - - @Override - public ReturnT callback(List callbackParamList) { - for (HandleCallbackParam handleCallbackParam : callbackParamList) { - ReturnT callbackResult = callback(handleCallbackParam); - logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", - (callbackResult.getCode() == IJobHandler.SUCCESS.getCode() ? "success" : "fail"), handleCallbackParam, callbackResult); - } - - return ReturnT.SUCCESS; + private static Logger logger = LoggerFactory.getLogger(AdminBizImpl.class); + + @Resource public JobLogMapper jobLogMapper; + @Resource private JobInfoMapper jobInfoMapper; + @Resource private JobRegistryMapper jobRegistryMapper; + + @Override + public ReturnT callback(List callbackParamList) { + for (HandleCallbackParam handleCallbackParam : callbackParamList) { + ReturnT callbackResult = callback(handleCallbackParam); + logger.debug( + ">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}", + (callbackResult.getCode() == IJobHandler.SUCCESS.getCode() ? "success" : "fail"), + handleCallbackParam, + callbackResult); } - @Override - public ReturnT processCallback(List callbackParamList) { - for (HandleProcessCallbackParam handleProcessCallbackParam : callbackParamList) { - ReturnT callbackResult = processCallback(handleProcessCallbackParam); - logger.debug(">>>>>>>>> JobApiController.processCallback {}, handleCallbackParam={}, callbackResult={}", - (callbackResult.getCode() == IJobHandler.SUCCESS.getCode() ? "success" : "fail"), handleProcessCallbackParam, callbackResult); - } - return ReturnT.SUCCESS; + return ReturnT.SUCCESS; + } + + @Override + public ReturnT processCallback(List callbackParamList) { + for (HandleProcessCallbackParam handleProcessCallbackParam : callbackParamList) { + ReturnT callbackResult = processCallback(handleProcessCallbackParam); + logger.debug( + ">>>>>>>>> JobApiController.processCallback {}, handleCallbackParam={}," + + " callbackResult={}", + (callbackResult.getCode() == IJobHandler.SUCCESS.getCode() ? "success" : "fail"), + handleProcessCallbackParam, + callbackResult); } - - private ReturnT processCallback(HandleProcessCallbackParam handleProcessCallbackParam) { - int result = jobLogMapper.updateProcessId(handleProcessCallbackParam.getLogId(), handleProcessCallbackParam.getProcessId()); - return result > 0 ? ReturnT.FAIL : ReturnT.SUCCESS; + return ReturnT.SUCCESS; + } + + private ReturnT processCallback(HandleProcessCallbackParam handleProcessCallbackParam) { + int result = + jobLogMapper.updateProcessId( + handleProcessCallbackParam.getLogId(), handleProcessCallbackParam.getProcessId()); + return result > 0 ? ReturnT.FAIL : ReturnT.SUCCESS; + } + + private ReturnT callback(HandleCallbackParam handleCallbackParam) { + // valid log item + JobLog log = jobLogMapper.load(handleCallbackParam.getLogId()); + if (log == null) { + return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); + } + if (log.getHandleCode() > 0) { + return new ReturnT( + ReturnT.FAIL_CODE, + "log repeate callback."); // avoid repeat callback, trigger child job etc } + // trigger success, to trigger child job + StringBuilder callbackMsg = null; + + int resultCode = handleCallbackParam.getExecuteResult().getCode(); + + if (IJobHandler.SUCCESS.getCode() == resultCode) { + + JobInfo jobInfo = jobInfoMapper.loadById(log.getJobId()); + + updateIncrementParam(log, jobInfo.getIncrementType()); + + if (jobInfo != null + && jobInfo.getChildJobId() != null + && jobInfo.getChildJobId().trim().length() > 0) { + callbackMsg = + new StringBuilder( + "

>>>>>>>>>>>" + + I18nUtil.getString("jobconf_trigger_child_run") + + "<<<<<<<<<<<
"); + + String[] childJobIds = jobInfo.getChildJobId().split(","); + for (int i = 0; i < childJobIds.length; i++) { + int childJobId = + (childJobIds[i] != null + && childJobIds[i].trim().length() > 0 + && isNumeric(childJobIds[i])) + ? Integer.valueOf(childJobIds[i]) + : -1; + if (childJobId > 0) { + + JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null); + ReturnT triggerChildResult = ReturnT.SUCCESS; + + // add msg + callbackMsg.append( + MessageFormat.format( + I18nUtil.getString("jobconf_callback_child_msg1"), + (i + 1), + childJobIds.length, + childJobIds[i], + (triggerChildResult.getCode() == ReturnT.SUCCESS_CODE + ? I18nUtil.getString("system_success") + : I18nUtil.getString("system_fail")), + triggerChildResult.getMsg())); + } else { + callbackMsg.append( + MessageFormat.format( + I18nUtil.getString("jobconf_callback_child_msg2"), + (i + 1), + childJobIds.length, + childJobIds[i])); + } + } + } + } - private ReturnT callback(HandleCallbackParam handleCallbackParam) { - // valid log item - JobLog log = jobLogMapper.load(handleCallbackParam.getLogId()); - if (log == null) { - return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); - } - if (log.getHandleCode() > 0) { - return new ReturnT(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc - } - - // trigger success, to trigger child job - String callbackMsg = null; - int resultCode = handleCallbackParam.getExecuteResult().getCode(); - - if (IJobHandler.SUCCESS.getCode() == resultCode) { - - JobInfo jobInfo = jobInfoMapper.loadById(log.getJobId()); - - updateIncrementParam(log, jobInfo.getIncrementType()); - - if (jobInfo != null && jobInfo.getChildJobId() != null && jobInfo.getChildJobId().trim().length() > 0) { - callbackMsg = "

>>>>>>>>>>>" + I18nUtil.getString("jobconf_trigger_child_run") + "<<<<<<<<<<<
"; - - String[] childJobIds = jobInfo.getChildJobId().split(","); - for (int i = 0; i < childJobIds.length; i++) { - int childJobId = (childJobIds[i] != null && childJobIds[i].trim().length() > 0 && isNumeric(childJobIds[i])) ? Integer.valueOf(childJobIds[i]) : -1; - if (childJobId > 0) { - - JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null); - ReturnT triggerChildResult = ReturnT.SUCCESS; - - // add msg - callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"), - (i + 1), - childJobIds.length, - childJobIds[i], - (triggerChildResult.getCode() == ReturnT.SUCCESS_CODE ? I18nUtil.getString("system_success") : I18nUtil.getString("system_fail")), - triggerChildResult.getMsg()); - } else { - callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"), - (i + 1), - childJobIds.length, - childJobIds[i]); - } - } - - } - } - - //kill execution timeout FlinkX process - if (!StringUtils.isEmpty(log.getProcessId()) && IJobHandler.FAIL_TIMEOUT.getCode() == resultCode) { - KillJob.trigger(log.getProcessId()); - } - - // handle msg - StringBuffer handleMsg = new StringBuffer(); - if (log.getHandleMsg() != null) { - handleMsg.append(log.getHandleMsg()).append("
"); - } - if (handleCallbackParam.getExecuteResult().getMsg() != null) { - handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); - } - if (callbackMsg != null) { - handleMsg.append(callbackMsg); - } - - if (handleMsg.length() > 15000) { - handleMsg = new StringBuffer(handleMsg.substring(0, 15000)); // text最大64kb 避免长度过长 - } - - // success, save log - log.setHandleTime(new Date()); - log.setHandleCode(resultCode); - log.setHandleMsg(handleMsg.toString()); - - jobLogMapper.updateHandleInfo(log); - jobInfoMapper.updateLastHandleCode(log.getJobId(), resultCode); - - return ReturnT.SUCCESS; + // kill execution timeout FlinkX process + if (!StringUtils.isEmpty(log.getProcessId()) + && IJobHandler.FAIL_TIMEOUT.getCode() == resultCode) { + KillJob.trigger(log.getProcessId()); } - private void updateIncrementParam(JobLog log, Integer incrementType) { - if (IncrementTypeEnum.ID.getCode() == incrementType) { - jobInfoMapper.incrementIdUpdate(log.getJobId(),log.getMaxId()); - } else if (IncrementTypeEnum.TIME.getCode() == incrementType) { - jobInfoMapper.incrementTimeUpdate(log.getJobId(), log.getTriggerTime()); - } + // handle msg + StringBuffer handleMsg = new StringBuffer(); + if (log.getHandleMsg() != null) { + handleMsg.append(log.getHandleMsg()).append("
"); + } + if (handleCallbackParam.getExecuteResult().getMsg() != null) { + handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); + } + if (callbackMsg != null) { + handleMsg.append(callbackMsg); } - private boolean isNumeric(String str) { - try { - Integer.valueOf(str); - return true; - } catch (NumberFormatException e) { - return false; - } + if (handleMsg.length() > 15000) { + handleMsg = new StringBuffer(handleMsg.substring(0, 15000)); // text最大64kb 避免长度过长 } - @Override - public ReturnT registry(RegistryParam registryParam) { + // success, save log + log.setHandleTime(new Date()); + log.setHandleCode(resultCode); + log.setHandleMsg(handleMsg.toString()); - // valid - if (!StringUtils.hasText(registryParam.getRegistryGroup()) - || !StringUtils.hasText(registryParam.getRegistryKey()) - || !StringUtils.hasText(registryParam.getRegistryValue())) { - return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument."); - } + jobLogMapper.updateHandleInfo(log); + jobInfoMapper.updateLastHandleCode(log.getJobId(), resultCode); - int ret = jobRegistryMapper.registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), - registryParam.getRegistryValue(), registryParam.getCpuUsage(), registryParam.getMemoryUsage(), registryParam.getLoadAverage(), new Date()); - if (ret < 1) { - jobRegistryMapper.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), - registryParam.getRegistryValue(), registryParam.getCpuUsage(), registryParam.getMemoryUsage(), registryParam.getLoadAverage(), new Date()); + return ReturnT.SUCCESS; + } - // fresh - freshGroupRegistryInfo(registryParam); - } - return ReturnT.SUCCESS; + private void updateIncrementParam(JobLog log, Integer incrementType) { + if (IncrementTypeEnum.ID.getCode() == incrementType) { + jobInfoMapper.incrementIdUpdate(log.getJobId(), log.getMaxId()); + } else if (IncrementTypeEnum.TIME.getCode() == incrementType) { + jobInfoMapper.incrementTimeUpdate(log.getJobId(), log.getTriggerTime()); + } + } + + private boolean isNumeric(String str) { + try { + Integer.valueOf(str); + return true; + } catch (NumberFormatException e) { + return false; } + } - @Override - public ReturnT registryRemove(RegistryParam registryParam) { + @Override + public ReturnT registry(RegistryParam registryParam) { - // valid - if (!StringUtils.hasText(registryParam.getRegistryGroup()) - || !StringUtils.hasText(registryParam.getRegistryKey()) - || !StringUtils.hasText(registryParam.getRegistryValue())) { - return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument."); - } + // valid + if (!StringUtils.hasText(registryParam.getRegistryGroup()) + || !StringUtils.hasText(registryParam.getRegistryKey()) + || !StringUtils.hasText(registryParam.getRegistryValue())) { + return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument."); + } + + int ret = + jobRegistryMapper.registryUpdate( + registryParam.getRegistryGroup(), + registryParam.getRegistryKey(), + registryParam.getRegistryValue(), + registryParam.getCpuUsage(), + registryParam.getMemoryUsage(), + registryParam.getLoadAverage(), + new Date()); + if (ret < 1) { + jobRegistryMapper.registrySave( + registryParam.getRegistryGroup(), + registryParam.getRegistryKey(), + registryParam.getRegistryValue(), + registryParam.getCpuUsage(), + registryParam.getMemoryUsage(), + registryParam.getLoadAverage(), + new Date()); + + // fresh + freshGroupRegistryInfo(registryParam); + } + return ReturnT.SUCCESS; + } - int ret = jobRegistryMapper.registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue()); - if (ret > 0) { + @Override + public ReturnT registryRemove(RegistryParam registryParam) { - // fresh - freshGroupRegistryInfo(registryParam); - } - return ReturnT.SUCCESS; + // valid + if (!StringUtils.hasText(registryParam.getRegistryGroup()) + || !StringUtils.hasText(registryParam.getRegistryKey()) + || !StringUtils.hasText(registryParam.getRegistryValue())) { + return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument."); } - private void freshGroupRegistryInfo(RegistryParam registryParam) { - // Under consideration, prevent affecting core tables + int ret = + jobRegistryMapper.registryDelete( + registryParam.getRegistryGroup(), + registryParam.getRegistryKey(), + registryParam.getRegistryValue()); + if (ret > 0) { + + // fresh + freshGroupRegistryInfo(registryParam); } + return ReturnT.SUCCESS; + } + private void freshGroupRegistryInfo(RegistryParam registryParam) { + // Under consideration, prevent affecting core tables + } } diff --git a/larkmidtable-web/larkmt-admin/src/main/java/com/larkmt/cn/admin/util/ErrorCode.java b/larkmidtable-web/larkmt-admin/src/main/java/com/larkmt/cn/admin/util/ErrorCode.java index 8befdf1d..f860ebef 100644 --- a/larkmidtable-web/larkmt-admin/src/main/java/com/larkmt/cn/admin/util/ErrorCode.java +++ b/larkmidtable-web/larkmt-admin/src/main/java/com/larkmt/cn/admin/util/ErrorCode.java @@ -2,32 +2,32 @@ /** * 尤其注意:最好提供toString()实现。例如: - * + * *
- * 
+ *
  * @Override
  * public String toString() {
  * 	return String.format("Code:[%s], Description:[%s]. ", this.code, this.describe);
  * }
  * 
- * */ public interface ErrorCode { - // 错误码编号 - String getCode(); + // 错误码编号 + String getCode(); - // 错误码描述 - String getDescription(); + // 错误码描述 + String getDescription(); - /** 必须提供toString的实现 - * - *
-	 * @Override
-	 * public String toString() {
-	 * 	return String.format("Code:[%s], Description:[%s]. ", this.code, this.describe);
-	 * }
-	 * 
- * - */ - String toString(); + /** + * 必须提供toString的实现 + * + *
+   * @Override
+   * public String toString() {
+   * 	return String.format("Code:[%s], Description:[%s]. ", this.code, this.describe);
+   * }
+   * 
+ */ + @Override + String toString(); }