package com.worktrans.pti.esb.mq.task;

import com.alibaba.fastjson.JSON;
import com.worktrans.pti.esb.mq.common.CustomerThreadPoolFactory;
import com.worktrans.pti.esb.mq.service.MqCommonHandleAbstract;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHandler;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@JobHandler("syncRetryMqJobTask")
@Component
/* loaded from: input_file:com/worktrans/pti/esb/mq/task/SyncRetryMqJobTask.class */
public class SyncRetryMqJobTask extends IJobHandler {

    @Autowired
    private Map<String, MqCommonHandleAbstract> mqCommonHandleAbstracts;
    private static final Logger log = LoggerFactory.getLogger(SyncRetryMqJobTask.class);
    private static final ExecutorService MQ_THREAD_POOL_EXECUTOR = CustomerThreadPoolFactory.generate("mq-job-handle", 20);

    public ReturnT<String> execute(String str) throws Exception {
        MqCommonHandleAbstract mqCommonHandleAbstract = this.mqCommonHandleAbstracts.get("baseMqCommonHandleAbstract");
        Long l = StringUtils.isNotBlank(str) ? JSON.parseObject(str).getLong("cid") : null;
        try {
            int i = 100;
            int queryRetryMqRecordCount = mqCommonHandleAbstract.queryRetryMqRecordCount(l);
            log.info("====== 入参cid:{},共查询到{}条需要处理的异常重试mq数据 ======", l, Integer.valueOf(queryRetryMqRecordCount));
            int i2 = ((queryRetryMqRecordCount + 100) - 1) / 100;
            log.info("====== START 处理mq 异常数据重试 ,当前时间：{} ======", LocalDateTime.now());
            if (queryRetryMqRecordCount > 0) {
                Long l2 = l;
                for (int i3 = 0; i3 < i2; i3++) {
                    int i4 = i3 + 1;
                    MQ_THREAD_POOL_EXECUTOR.execute(() -> {
                        mqCommonHandleAbstract.queryRetryMqRecord(l2, i4, i).forEach(esbMqMsgRecordDO -> {
                            MqCommonHandleAbstract mqCommonHandleAbstract2 = this.mqCommonHandleAbstracts.get(esbMqMsgRecordDO.getHandleKey());
                            if (Objects.isNull(mqCommonHandleAbstract2)) {
                                log.info("====== 当前cid：{} handleKey:{} 未获取到mq执行handle，使用baseHandle处理 ======", esbMqMsgRecordDO.getCid(), esbMqMsgRecordDO.getHandleKey());
                                mqCommonHandleAbstract2 = mqCommonHandleAbstract;
                            }
                            log.info("====== 当前cid：{}  获取到对应执行器:{} ======", esbMqMsgRecordDO.getCid(), mqCommonHandleAbstract2.getClass().getName());
                            try {
                                mqCommonHandleAbstract2.retryMqHandle(esbMqMsgRecordDO.getCid(), esbMqMsgRecordDO.getBid());
                            } catch (Exception e) {
                                log.error("====== 当前cid：{}, bid:{} 处理mq消息失败，错误信息:======,{}", new Object[]{esbMqMsgRecordDO.getCid(), esbMqMsgRecordDO.getBid(), ExceptionUtils.getStackTrace(e)});
                            }
                        });
                    });
                }
            } else {
                log.info("====== 未查询到需要处理的异常mq数据 ======");
            }
            log.info("====== END 处理mq 异常数据重试 ,结束时间：{} ======", LocalDateTime.now());
            return ReturnT.SUCCESS;
        } catch (Exception e) {
            log.error("同步异常：" + ExceptionUtils.getStackTrace(e));
            return new ReturnT<>(500, e.getMessage());
        }
    }
}
