xxl-job添加监控

xxl-job 版本 2.3.0

实现JobAlarm

任务失败监控

@Component
public class PromethuesAlarm implements JobAlarm, MeterBinder {
    private static Logger logger = LoggerFactory.getLogger(EmailJobAlarm.class);

    private MeterRegistry registry
    @Override
    public void bindTo(MeterRegistry registry) {
        multiTaggedCounter = new MultiTaggedCounter("xxl.job.admin.alarm.counter", registry, "jobGroup","jobId", "jobDesc");
    }
    /**
     * fail alarm
     *
     * @param jobLog
     */
    public boolean doAlarm(XxlJobInfo info, XxlJobLog jobLog){
        boolean alarmResult = true;

        if (info!=null) {
            multiTaggedCounter.increment(String.valueOf(info.getJobGroup()), String.valueOf(info.getId()),info.getJobDesc());
        }

        return alarmResult;
    }
}

MultiTaggedCounter, 为什么要是用这个东西呢?避免相同tag重复注册。 其实对于counter每次直接registry.counter(…).increment()也是可以的,与之对应的MultiTaggedGauge还有点用。

public class MultiTaggedCounter {
    private String name;
    private String[] tagNames;
    private MeterRegistry registry;
    private Map<String, Counter> counterMap = new ConcurrentHashMap<>();

    public MultiTaggedCounter(String name, MeterRegistry registry, String... tags) {
        this.name = name;
        this.tagNames = tags;
        this.registry = registry;
    }

    public void increment(String... tagValues) {
        String valuesString = Arrays.toString(tagValues);
        if (tagValues.length != tagNames.length) {
            throw new IllegalArgumentException("Counter tags mismatch! Expected args are " + Arrays.toString(tagNames) + ", provided tags are " + valuesString);
        }
        Counter counter = counterMap.get(valuesString);
        if (counter == null) {
            List<Tag> tags = new ArrayList<>(tagNames.length);
            for (int i = 0; i < tagNames.length; i++) {
                tags.add(new ImmutableTag(tagNames[i], tagValues[i]));
            }
            counter = Counter.builder(name).tags(tags).register(registry);
            counterMap.put(valuesString, counter);
        }
        counter.increment();
    }
}

schdule 扫描当前运行的任务

使用schedule 定时扫log表,用当前时间减触发时间

1、为什么要用两层map? 对于分片广播和并发策略跑的任务,同一个jobId有多台机器运行。所有Map<jobId,Map<logId,time»两层map.

2、为什么不直接每一个logId的运行时间?加一个tagId=logId? 触发的任务太多,logId无限增长,对prometheus也有压力

3、使用schedule定位1分钟,是否不精确? 的确如此,耗时1分钟内job的无法监控到。但目标监控触发运行时间超时很长的job。

4、为什么不在xxl-job触发任务时打点,收到任务处理完成时结束计算耗时? 因为xxl-job触发任务和回调都是异步的,xxl-job多机部署时不一定是原先触发任务的机器处理任务结束的回调请求。 虽然加redis也能解决分布式问题,但改动多一些。

@Component
public class JobRunningAlarm implements MeterBinder {

    private Map<Integer,Map<Long, Long>> runningJobTriggerTimeMap = new ConcurrentHashMap<>();

    @Resource
    private XxlJobLogDao xxlJobLogDao;

    @Resource
    private XxlJobInfoDao xxlJobInfoDao;

    private MeterRegistry registry;

    @Override
    public void bindTo(MeterRegistry meterRegistry) {
        this.registry = meterRegistry;
    }

    @Scheduled(fixedDelay = 60000)
    public void monitor(){
        int pageSize=100;
        int offset=0;
        List<XxlJobLog> runningJob = xxlJobLogDao.pageList(offset,pageSize,0,0,null,null,3);
        Set<Long> updateRunningJobSet=new HashSet<>();
        while(!runningJob.isEmpty()) {
            runningJob.forEach(x->{
                Map<Long, Long> triggerTimeMap;
                if(runningJobTriggerTimeMap.containsKey(x.getJobId())){
                    triggerTimeMap = runningJobTriggerTimeMap.get(x.getJobId());
                    triggerTimeMap.put(x.getId(), x.getTriggerTime().getTime());
                } else {
                    triggerTimeMap = new ConcurrentHashMap<>();
                    triggerTimeMap.put(x.getId(), x.getTriggerTime().getTime());
                    runningJobTriggerTimeMap.put(x.getJobId(),triggerTimeMap);
                    XxlJobInfo jobInfo=xxlJobInfoDao.loadById(x.getJobId());
                    Gauge.builder("xxl_job_running_time_max", () -> getMaxTime(x.getJobId()))
                            .tags(Tags.of("job_id", String.valueOf(x.getJobId()),"jobDesc", jobInfo.getJobDesc(),
                                    "author", jobInfo.getAuthor()))
                            .register(registry);
                    Gauge.builder("xxl_job_running_time_min", () -> getMinTime(x.getJobId()))
                            .tags(Tags.of("job_id", String.valueOf(x.getJobId()),"jobDesc", jobInfo.getJobDesc(),
                                    "author", jobInfo.getAuthor()))
                            .register(registry);
                    Gauge.builder("xxl_job_running_time_total", () -> getTotalTime(x.getJobId()))
                            .tags(Tags.of("job_id", String.valueOf(x.getJobId()),"jobDesc", jobInfo.getJobDesc(),
                                    "author", jobInfo.getAuthor()))
                            .register(registry);
                    Gauge.builder("xxl_job_running_time_count", () -> getRunningJobCount(x.getJobId()))
                            .tags(Tags.of("job_id", String.valueOf(x.getJobId()),"jobDesc", jobInfo.getJobDesc(),
                                    "author", jobInfo.getAuthor()))
                            .register(registry);
                }
            });
            updateRunningJobSet.addAll(runningJob.stream().map(XxlJobLog::getId).collect(Collectors.toList()));
            offset+=pageSize;
            runningJob = xxlJobLogDao.pageList(offset,pageSize,0,0,null,null,3);
        }
        remove(updateRunningJobSet);
    }

    private Long getMaxTime(Integer jobId){
        long now=System.currentTimeMillis();
        Map<Long, Long> triggerTimeMap =  runningJobTriggerTimeMap.get(jobId);
        return triggerTimeMap.values().stream().map(x-> now-x).max(Long::compareTo).orElse(null);
    }

    private Long getMinTime(Integer jobId){
        long now=System.currentTimeMillis();
        Map<Long, Long> triggerTimeMap =  runningJobTriggerTimeMap.get(jobId);
        return triggerTimeMap.values().stream().map(x-> now-x).min(Long::compareTo).orElse(null);
    }

    private Long getTotalTime(Integer jobId){
        long now=System.currentTimeMillis();
        Map<Long, Long> triggerTimeMap =  runningJobTriggerTimeMap.get(jobId);
        return triggerTimeMap.values().stream().map(x-> now-x).reduce(Long::sum).orElse(null);
    }

    private Integer getRunningJobCount(Integer jobId){
        Map<Long, Long> triggerTimeMap =  runningJobTriggerTimeMap.get(jobId);
        return triggerTimeMap.size();
    }

    private void remove(Set<Long> updateRunningJobSet){
        if(updateRunningJobSet.isEmpty()){
            runningJobTriggerTimeMap.values().forEach(Map::clear);
        } else {
            runningJobTriggerTimeMap.values().forEach(map -> map.entrySet().removeIf(entry -> !updateRunningJobSet.contains(entry.getKey())));
        }
    }
}

标签:

分类:

更新时间:

留下评论