package com.worktrans.pti.folivora.kafka.consumer;

import com.alibaba.fastjson.JSONObject;
import com.worktrans.commons.core.util.EnvUtil;
import com.worktrans.commons.lang.Argument;
import com.worktrans.pti.folivora.kafka.dto.LogKafkaDTO;
import com.worktrans.pti.folivora.kafka.log.LogInfo;
import com.worktrans.pti.folivora.kafka.service.HandleInfoService;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/worktrans/pti/folivora/kafka/consumer/CalculateConsumer.class */
public class CalculateConsumer {
    private static final Logger log = LoggerFactory.getLogger(CalculateConsumer.class);
    private ThreadPoolTaskExecutor scheduleTimeConsumeThread;

    @Value("${calculate.cal-result.thread-core:10}")
    private int consumerThreadCore;

    @Value("${calculate.cal-result.thread-max:20}")
    private int consumerThreadMax;

    @Value("${calculate.cal-result.queue-capacity:64}")
    private int queueCapacity;

    @Resource
    private HandleInfoService handleInfoService;

    @KafkaListener(groupId = "${kafka.consumer.group.id:GID_C_PTI_FOLOVORA}", autoStartup = "${kafka.listen.auto.start:true}", topics = {"loginfo-record-topic"}, containerFactory = "ackContainerFactory")
    public void consume(ConsumerRecords<byte[], byte[]> consumerRecords, Acknowledgment acknowledgment) {
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            this.scheduleTimeConsumeThread.execute(() -> {
                execute(consumerRecord, EnvUtil.getCurrentTraceId());
            });
            acknowledgment.acknowledge();
        }
    }

    private void execute(ConsumerRecord<byte[], byte[]> consumerRecord, String str) {
        log.error("CalculateConsumer-execute-start:" + System.currentTimeMillis());
        MDC.put("traceId", str);
        try {
            String str2 = new String((byte[]) consumerRecord.value(), "UTF-8");
            log.error("业务操作日志数据 str = {}", str2);
            LogKafkaDTO logKafkaDTO = (LogKafkaDTO) JSONObject.parseObject(str2, LogKafkaDTO.class);
            LogInfo logInfo = (LogInfo) JSONObject.parseObject(logKafkaDTO.getMsg(), LogInfo.class);
            logInfo.setEnv(logKafkaDTO.getEnv());
            if (Argument.isBlank(logInfo.getCid())) {
                logInfo.setCid("0");
            }
            this.handleInfoService.handle(logInfo);
            log.error("业务操作日志数据 str = {}", str2);
        } catch (Exception e) {
            log.error("业务操作日志数据转换出错，错误信息={}", e);
        }
    }

    @PostConstruct
    public void init() {
        this.scheduleTimeConsumeThread = new ThreadPoolTaskExecutor();
        this.scheduleTimeConsumeThread.setCorePoolSize(this.consumerThreadCore);
        this.scheduleTimeConsumeThread.setAllowCoreThreadTimeOut(true);
        this.scheduleTimeConsumeThread.setAwaitTerminationSeconds(120);
        this.scheduleTimeConsumeThread.setMaxPoolSize(this.consumerThreadMax);
        this.scheduleTimeConsumeThread.setQueueCapacity(this.queueCapacity);
        this.scheduleTimeConsumeThread.setThreadNamePrefix("calculate.cal-result-thread-");
        this.scheduleTimeConsumeThread.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        this.scheduleTimeConsumeThread.initialize();
    }
}
