xxl-job添加监控
xxl-job添加监控
xxl-job 版本 2.3.0
实现JobAlarm
任务失败监控
@Component
public class PromethuesAlarm implements JobAlarm, MeterBinder {
private static Logger logger = LoggerFactory.getLogger(EmailJobAlarm.class);
private MeterRegistry registry
@Override
public void bindTo(MeterRegistry registry) {
multiTaggedCounter = new MultiTaggedCounter("xxl.job.admin.alarm.counter", registry, "jobGroup","jobId", "jobDesc");
}
/**
* fail alarm
*
* @param jobLog
*/
public boolean doAlarm(XxlJobInfo info, XxlJobLog jobLog){
boolean alarmResult = true;
if (info!=null) {
multiTaggedCounter.increment(String.valueOf(info.getJobGroup()), String.valueOf(info.getId()),info.getJobDesc());
}
return alarmResult;
}
}
MultiTaggedCounter, 为什么要是用这个东西呢?避免相同tag重复注册。 其实对于counter每次直接registry.counter(…).increment()也是可以的,与之对应的MultiTaggedGauge还有点用。
public class MultiTaggedCounter {
private String name;
private String[] tagNames;
private MeterRegistry registry;
private Map<String, Counter> counterMap = new ConcurrentHashMap<>();
public MultiTaggedCounter(String name, MeterRegistry registry, String... tags) {
this.name = name;
this.tagNames = tags;
this.registry = registry;
}
public void increment(String... tagValues) {
String valuesString = Arrays.toString(tagValues);
if (tagValues.length != tagNames.length) {
throw new IllegalArgumentException("Counter tags mismatch! Expected args are " + Arrays.toString(tagNames) + ", provided tags are " + valuesString);
}
Counter counter = counterMap.get(valuesString);
if (counter == null) {
List<Tag> tags = new ArrayList<>(tagNames.length);
for (int i = 0; i < tagNames.length; i++) {
tags.add(new ImmutableTag(tagNames[i], tagValues[i]));
}
counter = Counter.builder(name).tags(tags).register(registry);
counterMap.put(valuesString, counter);
}
counter.increment();
}
}
schdule 扫描当前运行的任务
使用schedule 定时扫log表,用当前时间减触发时间
1、为什么要用两层map? 对于分片广播和并发策略跑的任务,同一个jobId有多台机器运行。所有Map<jobId,Map<logId,time»两层map.
2、为什么不直接每一个logId的运行时间?加一个tagId=logId? 触发的任务太多,logId无限增长,对prometheus也有压力
3、使用schedule定位1分钟,是否不精确? 的确如此,耗时1分钟内job的无法监控到。但目标监控触发运行时间超时很长的job。
4、为什么不在xxl-job触发任务时打点,收到任务处理完成时结束计算耗时? 因为xxl-job触发任务和回调都是异步的,xxl-job多机部署时不一定是原先触发任务的机器处理任务结束的回调请求。 虽然加redis也能解决分布式问题,但改动多一些。
@Component
public class JobRunningAlarm implements MeterBinder {
private Map<Integer,Map<Long, Long>> runningJobTriggerTimeMap = new ConcurrentHashMap<>();
@Resource
private XxlJobLogDao xxlJobLogDao;
@Resource
private XxlJobInfoDao xxlJobInfoDao;
private MeterRegistry registry;
@Override
public void bindTo(MeterRegistry meterRegistry) {
this.registry = meterRegistry;
}
@Scheduled(fixedDelay = 60000)
public void monitor(){
int pageSize=100;
int offset=0;
List<XxlJobLog> runningJob = xxlJobLogDao.pageList(offset,pageSize,0,0,null,null,3);
Set<Long> updateRunningJobSet=new HashSet<>();
while(!runningJob.isEmpty()) {
runningJob.forEach(x->{
Map<Long, Long> triggerTimeMap;
if(runningJobTriggerTimeMap.containsKey(x.getJobId())){
triggerTimeMap = runningJobTriggerTimeMap.get(x.getJobId());
triggerTimeMap.put(x.getId(), x.getTriggerTime().getTime());
} else {
triggerTimeMap = new ConcurrentHashMap<>();
triggerTimeMap.put(x.getId(), x.getTriggerTime().getTime());
runningJobTriggerTimeMap.put(x.getJobId(),triggerTimeMap);
XxlJobInfo jobInfo=xxlJobInfoDao.loadById(x.getJobId());
Gauge.builder("xxl_job_running_time_max", () -> getMaxTime(x.getJobId()))
.tags(Tags.of("job_id", String.valueOf(x.getJobId()),"jobDesc", jobInfo.getJobDesc(),
"author", jobInfo.getAuthor()))
.register(registry);
Gauge.builder("xxl_job_running_time_min", () -> getMinTime(x.getJobId()))
.tags(Tags.of("job_id", String.valueOf(x.getJobId()),"jobDesc", jobInfo.getJobDesc(),
"author", jobInfo.getAuthor()))
.register(registry);
Gauge.builder("xxl_job_running_time_total", () -> getTotalTime(x.getJobId()))
.tags(Tags.of("job_id", String.valueOf(x.getJobId()),"jobDesc", jobInfo.getJobDesc(),
"author", jobInfo.getAuthor()))
.register(registry);
Gauge.builder("xxl_job_running_time_count", () -> getRunningJobCount(x.getJobId()))
.tags(Tags.of("job_id", String.valueOf(x.getJobId()),"jobDesc", jobInfo.getJobDesc(),
"author", jobInfo.getAuthor()))
.register(registry);
}
});
updateRunningJobSet.addAll(runningJob.stream().map(XxlJobLog::getId).collect(Collectors.toList()));
offset+=pageSize;
runningJob = xxlJobLogDao.pageList(offset,pageSize,0,0,null,null,3);
}
remove(updateRunningJobSet);
}
private Long getMaxTime(Integer jobId){
long now=System.currentTimeMillis();
Map<Long, Long> triggerTimeMap = runningJobTriggerTimeMap.get(jobId);
return triggerTimeMap.values().stream().map(x-> now-x).max(Long::compareTo).orElse(null);
}
private Long getMinTime(Integer jobId){
long now=System.currentTimeMillis();
Map<Long, Long> triggerTimeMap = runningJobTriggerTimeMap.get(jobId);
return triggerTimeMap.values().stream().map(x-> now-x).min(Long::compareTo).orElse(null);
}
private Long getTotalTime(Integer jobId){
long now=System.currentTimeMillis();
Map<Long, Long> triggerTimeMap = runningJobTriggerTimeMap.get(jobId);
return triggerTimeMap.values().stream().map(x-> now-x).reduce(Long::sum).orElse(null);
}
private Integer getRunningJobCount(Integer jobId){
Map<Long, Long> triggerTimeMap = runningJobTriggerTimeMap.get(jobId);
return triggerTimeMap.size();
}
private void remove(Set<Long> updateRunningJobSet){
if(updateRunningJobSet.isEmpty()){
runningJobTriggerTimeMap.values().forEach(Map::clear);
} else {
runningJobTriggerTimeMap.values().forEach(map -> map.entrySet().removeIf(entry -> !updateRunningJobSet.contains(entry.getKey())));
}
}
}
留下评论