package com.worktrans.shared.asynctask;

import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.worktrans.commons.core.util.BidUtils;
import com.worktrans.commons.web.context.LoadContext;
import com.worktrans.shared.excel.common.KafkaMsgParam;
import com.worktrans.shared.foundation.domain.request.asynctask.KafkaCancelMsgParam;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/worktrans/shared/asynctask/KafkaMsgReceive.class */
public class KafkaMsgReceive {
    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final long KEEP_ALIVE_TIME = 10;
    private List<AsyncTaskItem> asyncTaskItems;
    private AsyncTaskAnnotation asyncTaskAnnotation;
    private ExecutorService asyncTaskExecutorPool;
    private LoadContext loadContext;
    private static Logger logger = LoggerFactory.getLogger(KafkaMsgReceive.class);
    private static final ConcurrentHashMap<String, AsyncTaskItem> EXPORT_TYPE_MAP = new ConcurrentHashMap<>(32);

    public void init() {
        if (CollectionUtils.isEmpty(this.asyncTaskItems)) {
            return;
        }
        for (AsyncTaskItem asyncTaskItem : this.asyncTaskItems) {
            if (asyncTaskItem != null) {
                String type = asyncTaskItem.getType();
                if (!StringUtils.isBlank(type)) {
                    EXPORT_TYPE_MAP.put(type, asyncTaskItem);
                }
            }
        }
        this.asyncTaskExecutorPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue(100), new ThreadFactoryBuilder().setNameFormat(BidUtils.bid("ATEP") + "-%d").setDaemon(true).build());
    }

    public void consumeTest(ConsumerRecords<String, String> consumerRecords) {
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            this.loadContext.loadContext(consumerRecord.headers());
            logger.info("收到异步任务的kafa消息--" + ((String) consumerRecord.value()));
            logger.info(((String) consumerRecord.value()) + "offset:" + consumerRecord.offset() + "timestamp:" + consumerRecord.timestamp() + "partition:" + consumerRecord.partition());
            AsyncTaskItem exportTaskItem = getExportTaskItem(JSONObject.parseObject((String) consumerRecord.value()).getString("businessType"));
            logger.info("EXPORT_TYPE_MAP--" + JSONObject.toJSONString(EXPORT_TYPE_MAP));
            String str = consumerRecord.topic();
            logger.info("EXPORT_TYPE_TOPIC--" + str);
            if (exportTaskItem == null || !StringUtils.isNotBlank(str)) {
                logger.info("EXPORT_TYPE_TOPIC_ERROR--" + str);
            } else {
                if (str.endsWith("SHARED-ASYNC-EXCEL-TOPIC")) {
                    KafkaMsgParam kafkaMsgParam = (KafkaMsgParam) JSONObject.parseObject((String) consumerRecord.value(), KafkaMsgParam.class);
                    logger.info("exportTaskItem--" + JSONObject.toJSONString(exportTaskItem));
                    String str2 = MDC.get("traceId");
                    this.asyncTaskExecutorPool.execute(() -> {
                        this.asyncTaskAnnotation.executeTask(kafkaMsgParam, exportTaskItem, str2);
                    });
                }
                if (str.endsWith("SHARED-ASYNC-EXCEL-CANCEL-TOPIC")) {
                    KafkaCancelMsgParam kafkaCancelMsgParam = (KafkaCancelMsgParam) JSONObject.parseObject((String) consumerRecord.value(), KafkaCancelMsgParam.class);
                    logger.info("cancelExportTaskItem--" + JSONObject.toJSONString(exportTaskItem));
                    this.asyncTaskAnnotation.executeCancelTask(kafkaCancelMsgParam, MDC.get("traceId"));
                }
            }
        }
    }

    private AsyncTaskItem getExportTaskItem(String str) {
        return EXPORT_TYPE_MAP.get(str);
    }
}
