package com.worktrans.pti.esb.mq.service;

import com.alibaba.fastjson.JSON;
import com.worktrans.commons.cons.StatusEnum;
import com.worktrans.commons.web.response.Response;
import com.worktrans.core.pagehelper.PageHelper;
import com.worktrans.pti.esb.mq.config.PtiCommonMqProperties;
import com.worktrans.pti.esb.mq.cons.enums.MqExecStatusEnum;
import com.worktrans.pti.esb.mq.dal.model.EsbMqMsgBodyDO;
import com.worktrans.pti.esb.mq.dal.model.EsbMqMsgRecordDO;
import com.worktrans.pti.esb.mq.dal.service.EsbMqMsgBodyService;
import com.worktrans.pti.esb.mq.dal.service.EsbMqMsgRecordService;
import com.worktrans.pti.esb.mq.model.MqHandleParamDTO;
import com.worktrans.pti.esb.mq.model.MqRecordOptDTO;
import com.worktrans.pti.esb.mq.utils.MqStringUtils;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.config.annotation.RefreshScope;

@RefreshScope
/* loaded from: input_file:com/worktrans/pti/esb/mq/service/MqCommonHandleAbstract.class */
public abstract class MqCommonHandleAbstract {
    private static final Logger log = LoggerFactory.getLogger(MqCommonHandleAbstract.class);

    @Autowired(required = false)
    private Map<String, MqCommonHandleAbstract> mqCommonHandleAbstracts;

    @Autowired
    private PtiCommonMqProperties properties;

    @Autowired
    private EsbMqMsgRecordService esbMqMsgRecordService;

    @Autowired
    private EsbMqMsgBodyService esbMqMsgBodyService;

    protected abstract Response handle(MqHandleParamDTO mqHandleParamDTO);

    public final void initMqTable() {
        try {
            log.info("====== START 初始化创建mq 相关数据表 ======");
            this.esbMqMsgRecordService.createMqTable();
            log.info("====== END 初始化创建mq 相关数据表 ======");
        } catch (Exception e) {
            log.error("====== 初始化mq数据表失败，错误信息:======,{}", ExceptionUtils.getStackTrace(e));
        }
    }

    public final Response saveOrUpdateMqRecord(MqRecordOptDTO mqRecordOptDTO) {
        Response error;
        Response error2;
        if (!this.properties.getEnabledMysql().booleanValue()) {
            log.info("====== cid:{},msgKey:{} enabledMq:{},当前mq公共处理逻辑开关未开启，直接触发handle处理======", new Object[]{mqRecordOptDTO.getCid(), mqRecordOptDTO.getMsgKey(), this.properties.getEnabledMysql()});
            MqHandleParamDTO mqHandleParamDTO = new MqHandleParamDTO();
            mqHandleParamDTO.setCid(mqRecordOptDTO.getCid());
            mqHandleParamDTO.setDataSource(mqRecordOptDTO.getDataSource());
            mqHandleParamDTO.setEventType(mqRecordOptDTO.getEventType());
            mqHandleParamDTO.setMsgKey(mqRecordOptDTO.getMsgKey());
            mqHandleParamDTO.setMsgBody(mqRecordOptDTO.getMsgBody());
            try {
                error2 = getHandle(mqRecordOptDTO.getCid(), mqRecordOptDTO.getHandleKey()).handle(mqHandleParamDTO);
            } catch (Exception e) {
                log.error("====== cid:{},msgKey:{} handle直接处理捕获异常，错误信息:======,{}", new Object[]{mqRecordOptDTO.getCid(), mqRecordOptDTO.getMsgKey(), ExceptionUtils.getStackTrace(e)});
                error2 = Response.error(e.getMessage());
            }
            return error2;
        }
        log.info("======  cid：{}, START 保存MQ记录，调用入参是：{} ======", mqRecordOptDTO.getCid(), JSON.toJSONString(mqRecordOptDTO));
        EsbMqMsgRecordDO esbMqMsgRecordDO = new EsbMqMsgRecordDO();
        long currentTimeMillis = System.currentTimeMillis();
        log.info("====== cid:{},开始时间：{} ======", mqRecordOptDTO.getCid(), Long.valueOf(currentTimeMillis));
        try {
            try {
                EsbMqMsgRecordDO esbMqMsgRecordDO2 = new EsbMqMsgRecordDO();
                esbMqMsgRecordDO2.setCid(mqRecordOptDTO.getCid());
                esbMqMsgRecordDO2.setMsgKey(mqRecordOptDTO.getMsgKey());
                esbMqMsgRecordDO2.setDataSource(mqRecordOptDTO.getDataSource());
                if (Objects.nonNull(this.esbMqMsgRecordService.findOne(esbMqMsgRecordDO2))) {
                    log.info("====== {}======", String.format("msgKey: %s, dataSource：%s, 当前MQ数据已存在，不允许处理", mqRecordOptDTO.getMsgKey(), mqRecordOptDTO.getDataSource()));
                    Response success = Response.success();
                    long currentTimeMillis2 = System.currentTimeMillis();
                    log.info("====== cid:{},结束时间：{} ,总计耗时：{} ======", new Object[]{mqRecordOptDTO.getCid(), Long.valueOf(currentTimeMillis2), Long.valueOf(currentTimeMillis2 - currentTimeMillis)});
                    return success;
                }
                log.info("topic: {},tag: {},msgKey: {},当前MQ数据记录不存在,执行新增操作", new Object[]{mqRecordOptDTO.getTopic(), mqRecordOptDTO.getTag(), mqRecordOptDTO.getMsgKey()});
                esbMqMsgRecordDO.setCid(mqRecordOptDTO.getCid());
                esbMqMsgRecordDO.setTopic(mqRecordOptDTO.getTopic());
                esbMqMsgRecordDO.setTag(mqRecordOptDTO.getTag());
                esbMqMsgRecordDO.setDataSource(mqRecordOptDTO.getDataSource());
                esbMqMsgRecordDO.setEventType(mqRecordOptDTO.getEventType());
                esbMqMsgRecordDO.setMsgId(mqRecordOptDTO.getMsgId());
                esbMqMsgRecordDO.setMsgKey(mqRecordOptDTO.getMsgKey());
                esbMqMsgRecordDO.setRetryCallCount(0);
                esbMqMsgRecordDO.setHandleKey(mqRecordOptDTO.getHandleKey());
                esbMqMsgRecordDO.setMaxCallCount(mqRecordOptDTO.getMaxCallCount());
                if (Objects.isNull(mqRecordOptDTO.getCid()) || StringUtils.isBlank(mqRecordOptDTO.getMsgKey())) {
                    esbMqMsgRecordDO.setExecStatus(MqExecStatusEnum.DATA_ERROR.getCode());
                } else {
                    esbMqMsgRecordDO.setExecStatus(mqRecordOptDTO.getExecStatus() == null ? MqExecStatusEnum.PENDING.getCode() : mqRecordOptDTO.getExecStatus().getCode());
                    if (!mqRecordOptDTO.getEnabledAsync().booleanValue()) {
                        esbMqMsgRecordDO.setExecStatus(MqExecStatusEnum.PROCESSING.getCode());
                    }
                }
                EsbMqMsgBodyDO esbMqMsgBodyDO = new EsbMqMsgBodyDO();
                esbMqMsgBodyDO.setCid(mqRecordOptDTO.getCid());
                esbMqMsgBodyDO.setBodyText(mqRecordOptDTO.getMsgBody());
                esbMqMsgBodyDO.setStatus(Integer.valueOf(StatusEnum.ENABLE.getValue()));
                this.esbMqMsgBodyService.save(esbMqMsgBodyDO);
                esbMqMsgRecordDO.setBodyBid(esbMqMsgBodyDO.getBid());
                this.esbMqMsgRecordService.save(esbMqMsgRecordDO);
                log.info("====== cid：{}, END 保存MQ记录 =====", mqRecordOptDTO.getCid());
                long currentTimeMillis3 = System.currentTimeMillis();
                log.info("====== cid:{},结束时间：{} ,总计耗时：{} ======", new Object[]{mqRecordOptDTO.getCid(), Long.valueOf(currentTimeMillis3), Long.valueOf(currentTimeMillis3 - currentTimeMillis)});
                return !mqRecordOptDTO.getEnabledAsync().booleanValue() ? retryMqHandle(mqRecordOptDTO.getCid(), esbMqMsgRecordDO.getBid()) : Response.success();
            } catch (Exception e2) {
                MqHandleParamDTO mqHandleParamDTO2 = new MqHandleParamDTO();
                mqHandleParamDTO2.setCid(mqRecordOptDTO.getCid());
                mqHandleParamDTO2.setDataSource(mqRecordOptDTO.getDataSource());
                mqHandleParamDTO2.setEventType(mqRecordOptDTO.getEventType());
                mqHandleParamDTO2.setMsgKey(mqRecordOptDTO.getMsgKey());
                mqHandleParamDTO2.setMsgBody(mqRecordOptDTO.getMsgBody());
                log.info("====== cid:{},msgKey:{} 当前消息处理出现未知异常,异常信息是:{}，直接触发handle处理======", new Object[]{mqRecordOptDTO.getCid(), mqRecordOptDTO.getMsgKey(), ExceptionUtils.getStackTrace(e2)});
                try {
                    error = getHandle(mqRecordOptDTO.getCid(), mqRecordOptDTO.getHandleKey()).handle(mqHandleParamDTO2);
                } catch (Exception e3) {
                    log.error("====== cid:{},msgKey:{} 异常捕获后执行handle抛出异常，错误信息:======,{}", new Object[]{mqRecordOptDTO.getCid(), mqRecordOptDTO.getMsgKey(), ExceptionUtils.getStackTrace(e3)});
                    error = Response.error(e3.getMessage());
                }
                Response response = error;
                long currentTimeMillis4 = System.currentTimeMillis();
                log.info("====== cid:{},结束时间：{} ,总计耗时：{} ======", new Object[]{mqRecordOptDTO.getCid(), Long.valueOf(currentTimeMillis4), Long.valueOf(currentTimeMillis4 - currentTimeMillis)});
                return response;
            }
        } catch (Throwable th) {
            long currentTimeMillis5 = System.currentTimeMillis();
            log.info("====== cid:{},结束时间：{} ,总计耗时：{} ======", new Object[]{mqRecordOptDTO.getCid(), Long.valueOf(currentTimeMillis5), Long.valueOf(currentTimeMillis5 - currentTimeMillis)});
            throw th;
        }
    }

    public final Response retryMqHandle(Long l, String str) {
        Response error;
        log.info("======  cid：{}, START 触发处理MQ记录，调用入参bid:{} ======", l, str);
        EsbMqMsgRecordDO esbMqMsgRecordDO = (EsbMqMsgRecordDO) this.esbMqMsgRecordService.findByBid(l, str);
        if (Objects.isNull(esbMqMsgRecordDO)) {
            log.error("====== cid:{},bid: {},当前MQ数据记录不存在 ======", l, str);
            return Response.error();
        }
        EsbMqMsgRecordDO esbMqMsgRecordDO2 = new EsbMqMsgRecordDO();
        esbMqMsgRecordDO2.setBid(esbMqMsgRecordDO.getBid());
        esbMqMsgRecordDO2.setCid(esbMqMsgRecordDO.getCid());
        if (MqExecStatusEnum.SUCCESS.getCode().equals(esbMqMsgRecordDO.getExecStatus())) {
            log.error("====== {} ======", String.format("cid:%s,bid: %s,当前数据已处理成功，不允许处理", l, str));
            return Response.error();
        }
        esbMqMsgRecordDO2.setRetryCallCount(Integer.valueOf(esbMqMsgRecordDO.getRetryCallCount().intValue() + 1));
        esbMqMsgRecordDO2.setExecStatus(MqExecStatusEnum.PROCESSING.getCode());
        this.esbMqMsgRecordService.updateSelective(esbMqMsgRecordDO2);
        log.info("======  cid：{}, 更新mq记录为处理中状态 结束 ======", l);
        log.info("======  cid：{}, START 直接调用处理服务 ======", l);
        MqHandleParamDTO mqHandleParamDTO = new MqHandleParamDTO();
        mqHandleParamDTO.setCid(l);
        mqHandleParamDTO.setDataSource(esbMqMsgRecordDO.getDataSource());
        mqHandleParamDTO.setEventType(esbMqMsgRecordDO.getEventType());
        mqHandleParamDTO.setMsgKey(esbMqMsgRecordDO.getMsgKey());
        if (StringUtils.isEmpty(esbMqMsgRecordDO.getMsgBody())) {
            EsbMqMsgBodyDO esbMqMsgBodyDO = (EsbMqMsgBodyDO) this.esbMqMsgBodyService.findByBid(l, esbMqMsgRecordDO.getBodyBid());
            if (Objects.nonNull(esbMqMsgBodyDO)) {
                mqHandleParamDTO.setMsgBody(esbMqMsgBodyDO.getBodyText());
            } else {
                log.info("====== cid:{},bodyBid:{}  未查询到消息body记录 ======", l, esbMqMsgRecordDO.getBodyBid());
            }
        } else {
            mqHandleParamDTO.setMsgBody(esbMqMsgRecordDO.getMsgBody());
        }
        try {
            error = getHandle(l, esbMqMsgRecordDO.getHandleKey()).handle(mqHandleParamDTO);
        } catch (Exception e) {
            log.error("====== handle处理捕获异常，错误信息:======,{}", ExceptionUtils.getStackTrace(e));
            error = Response.error("handle处理捕获异常：" + ExceptionUtils.getStackTrace(e));
        }
        if (Objects.nonNull(error) && error.isSuccess()) {
            log.info("======  cid：{}, msgKey:{},处理结果 success ======", l, esbMqMsgRecordDO.getMsgKey());
            esbMqMsgRecordDO2.setExecStatus(MqExecStatusEnum.SUCCESS.getCode());
        } else {
            log.error("======  cid：{}, msgKey:{},处理结果 fail ======", l, esbMqMsgRecordDO.getMsgKey());
            log.error("======  失败的response:{} ======", JSON.toJSONString(error));
            esbMqMsgRecordDO2.setExecStatus(MqExecStatusEnum.FAIL.getCode());
            esbMqMsgRecordDO2.setResultMsg(MqStringUtils.dealErrMsgLen(error.getMsg()));
            if (esbMqMsgRecordDO2.getRetryCallCount().intValue() >= esbMqMsgRecordDO.getMaxCallCount().intValue()) {
                log.error("====== cid:{},bid: {},当前MQ数据已达到最大重试次数，不允许处理 ======", l, str);
                esbMqMsgRecordDO2.setExecStatus(MqExecStatusEnum.RETRY_FAIL.getCode());
            }
        }
        this.esbMqMsgRecordService.updateSelective(esbMqMsgRecordDO2);
        log.info("======  cid：{}, msgKey:{},END 直接调用处理服务 ======", l, esbMqMsgRecordDO.getMsgKey());
        log.info("======  cid：{}, END 触发处理MQ记录，调用入参bid:{} ======", l, str);
        return Response.success();
    }

    final MqCommonHandleAbstract getHandle(Long l, String str) {
        log.info("====== cid:{} 指定处理器名称={} ======", l, str);
        MqCommonHandleAbstract mqCommonHandleAbstract = this.mqCommonHandleAbstracts.get(str);
        if (mqCommonHandleAbstract == null) {
            log.info("====== cid:{} 未获取到处理器，使用当前调用的处理器 ======", l);
            return this;
        }
        log.info("====== cid:{} 获取到处理器Class={} ======", l, mqCommonHandleAbstract.getClass());
        return mqCommonHandleAbstract;
    }

    public final List<EsbMqMsgRecordDO> queryRetryMqRecord(Long l, int i, int i2) {
        PageHelper.startPage(i, i2);
        EsbMqMsgRecordDO esbMqMsgRecordDO = new EsbMqMsgRecordDO();
        esbMqMsgRecordDO.setCid(l);
        return this.esbMqMsgRecordService.queryRetryMqRecord(esbMqMsgRecordDO);
    }

    public final int queryRetryMqRecordCount(Long l) {
        EsbMqMsgRecordDO esbMqMsgRecordDO = new EsbMqMsgRecordDO();
        esbMqMsgRecordDO.setCid(l);
        return this.esbMqMsgRecordService.queryRetryMqRecordCount(esbMqMsgRecordDO);
    }
}
