package com.worktrans.commons.kafka.util;

import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.IdUtil;
import com.worktrans.commons.core.header.WtHeader;
import com.worktrans.commons.kafka.configuration.KafkaTopicPartitions;
import com.worktrans.commons.util.JsonUtil;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

/* loaded from: input_file:com/worktrans/commons/kafka/util/KafkaProducerUtil.class */
public final class KafkaProducerUtil {
    private static final Logger logger = LoggerFactory.getLogger(KafkaProducerUtil.class);
    private static final String X_AUTH_USER = "X-Auth-User";
    private static final String TRACE_ID = "traceId";
    private static String prefix;
    private static KafkaTemplate kafkaTemplate;
    private static KafkaTopicPartitions kafkaTopicPartitions;

    public static void setKafkaTemplate(KafkaTemplate kafkaTemplate2) {
        kafkaTemplate = kafkaTemplate2;
    }

    public static void setKafkaTopicPartitions(KafkaTopicPartitions kafkaTopicPartitions2) {
        kafkaTopicPartitions = kafkaTopicPartitions2;
    }

    public static synchronized void sendMessage(String str, String str2) {
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), kafkaTopicPartitions.getCurrentPartition(str), (Long) null, (Object) null, str2, composeUserInfo()));
    }

    public static synchronized void sendMessage(String str, String str2, String str3) {
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), kafkaTopicPartitions.getCurrentPartition(str), (Long) null, str2, str3, composeUserInfo()));
    }

    public static void sendMessageWithByte(String str, byte[] bArr, byte[] bArr2, WtHeader wtHeader) {
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), kafkaTopicPartitions.getCurrentPartition(str), bArr, bArr2, composeUserInfo(wtHeader)));
    }

    public static void sendMessageWithByte(String str, byte[] bArr, byte[] bArr2, WtHeader wtHeader, String str2) {
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(str2, str), kafkaTopicPartitions.getCurrentPartition(str), bArr, bArr2, composeUserInfo(wtHeader)));
    }

    public static void sendMessageWithByte(String str, byte[] bArr, byte[] bArr2, WtHeader wtHeader, ListenableFutureCallback listenableFutureCallback) {
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), kafkaTopicPartitions.getCurrentPartition(str), bArr, bArr2, composeUserInfo(wtHeader))).addCallback(listenableFutureCallback);
    }

    public static void sendMessageWithByte(String str, byte[] bArr, byte[] bArr2, WtHeader wtHeader, ListenableFutureCallback listenableFutureCallback, String str2) {
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(str2, str), kafkaTopicPartitions.getCurrentPartition(str), bArr, bArr2, composeUserInfo(wtHeader))).addCallback(listenableFutureCallback);
    }

    public static synchronized void sendMessage(String str, String str2, WtHeader wtHeader) {
        Assert.notNull(wtHeader, "wtHeader is not null");
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), kafkaTopicPartitions.getCurrentPartition(str), (Long) null, (Object) null, str2, composeUserInfo(wtHeader)));
    }

    public static synchronized void sendMessage(String str, String str2, String str3, WtHeader wtHeader) {
        Assert.notNull(wtHeader, "wtHeader is not null");
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), kafkaTopicPartitions.getCurrentPartition(str), (Long) null, str2, str3, composeUserInfo(wtHeader)));
    }

    public static void sendMessage(String str, Integer num, String str2, WtHeader wtHeader) {
        Assert.notNull(wtHeader, "wtHeader is not null");
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), num, (Long) null, (Object) null, str2, composeUserInfo(wtHeader)));
    }

    public static void sendMessage(String str, String... strArr) throws InterruptedException {
        for (String str2 : strArr) {
            kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), (Integer) null, (Long) null, (Object) null, str2, composeUserInfo()));
        }
    }

    public static void sendMessage(String str, WtHeader wtHeader, String... strArr) throws InterruptedException {
        for (String str2 : strArr) {
            kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), (Integer) null, (Long) null, (Object) null, str2, composeUserInfo(wtHeader)));
        }
    }

    public static void sendMessage(String str, Map<Object, Object> map) {
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), (Integer) null, (Long) null, (Object) null, JsonUtil.toJson(map), composeUserInfo()));
    }

    public static void sendMessageCallback(String str, String str2, WtHeader wtHeader, ListenableFutureCallback listenableFutureCallback) {
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), (Integer) null, (Long) null, (Object) null, str2, composeUserInfo(wtHeader))).addCallback(listenableFutureCallback);
    }

    public static void sendMessageCallback(String str, String str2, String str3, WtHeader wtHeader, ListenableFutureCallback listenableFutureCallback) {
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), (Integer) null, (Long) null, str2, str3, composeUserInfo(wtHeader))).addCallback(listenableFutureCallback);
    }

    public static void sendMsgCallback(ProducerRecord<String, String> producerRecord, ListenableFutureCallback listenableFutureCallback) {
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, producerRecord.topic()), (Integer) null, (Long) null, (Object) null, producerRecord.value(), composeUserInfo())).addCallback(listenableFutureCallback);
    }

    public static RecordMetadata sendMessageSync(String str, String str2) throws Exception {
        RecordMetadata recordMetadata = ((SendResult) kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), (Integer) null, (Long) null, (Object) null, str2, composeUserInfo())).get()).getRecordMetadata();
        logger.info(JsonUtil.toJson(recordMetadata));
        return recordMetadata;
    }

    public static void sendMessage(String str, String str2, WtHeader wtHeader, RecordHeader recordHeader) {
        Assert.notNull(wtHeader, "wtHeader is not null");
        kafkaTemplate.send(new ProducerRecord(WrapUtil.wrapTopic(prefix, str), kafkaTopicPartitions.getCurrentPartition(str), (Long) null, (Object) null, str2, composeUserInfo(wtHeader, recordHeader)));
    }

    public static void setPrefix(String str) {
        prefix = str;
    }

    private static Iterable<RecordHeader> composeUserInfo(WtHeader wtHeader) {
        Optional info = wtHeader.getInfo();
        ArrayList arrayList = new ArrayList();
        if (info.isPresent()) {
            arrayList.add(new RecordHeader(X_AUTH_USER, ((String) wtHeader.getInfo().get()).getBytes(CharsetUtil.CHARSET_UTF_8)));
        }
        arrayList.add(new RecordHeader(TRACE_ID, ((String) wtHeader.getTraceId().orElse(IdUtil.objectId())).getBytes(CharsetUtil.CHARSET_UTF_8)));
        return arrayList;
    }

    private static Iterable<RecordHeader> composeUserInfo(WtHeader wtHeader, RecordHeader recordHeader) {
        Optional info = wtHeader.getInfo();
        ArrayList arrayList = new ArrayList();
        if (info.isPresent()) {
            arrayList.add(new RecordHeader(X_AUTH_USER, ((String) wtHeader.getInfo().get()).getBytes(CharsetUtil.CHARSET_UTF_8)));
        }
        arrayList.add(new RecordHeader(TRACE_ID, ((String) wtHeader.getTraceId().orElse(IdUtil.objectId())).getBytes(CharsetUtil.CHARSET_UTF_8)));
        if (recordHeader != null) {
            arrayList.add(recordHeader);
        }
        return arrayList;
    }

    private static Iterable<RecordHeader> composeUserInfo() {
        ServletRequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        if (requestAttributes == null) {
            return null;
        }
        HttpServletRequest request = requestAttributes.getRequest();
        String header = request.getHeader(X_AUTH_USER);
        logger.info("request userInfo:{}", header);
        ArrayList arrayList = new ArrayList();
        if (StringUtils.isNotBlank(header)) {
            arrayList.add(new RecordHeader(X_AUTH_USER, header.getBytes(CharsetUtil.CHARSET_UTF_8)));
        }
        String header2 = request.getHeader(TRACE_ID);
        if (StringUtils.isBlank(header2)) {
            header2 = IdUtil.objectId();
        }
        arrayList.add(new RecordHeader(TRACE_ID, header2.getBytes(CharsetUtil.CHARSET_UTF_8)));
        return arrayList;
    }
}
