package com.worktrans.kafka;

import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.ProducerConfig;
import com.worktrans.cons.JobConfig;
import com.worktrans.datacenter.datalink.domain.cons.CommonMark;
import com.worktrans.datacenter.datalink.domain.vo.AccessTableConf;
import com.worktrans.datacenter.datalink.domain.vo.ITopicTableRel;
import com.worktrans.job.operator.kafka.BasicVODeserializationSchema;
import com.worktrans.job.operator.kafka.BasicVOKeySerialization;
import com.worktrans.job.operator.kafka.WqKafkaPartitioner;
import com.worktrans.job.operator.kafka.WqKafkaRecordDeserializationSchema;
import com.worktrans.job.operator.kafka.WqTableNameTopicSelector;
import com.worktrans.job.vo.BasicVO;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/worktrans/kafka/KafkaUtil.class */
public class KafkaUtil {
    private static final Logger log = LoggerFactory.getLogger(KafkaUtil.class);

    public static CreateTopicsResult createTopicIfNotExsist(Collection<String> collection, String str) {
        if (CollectionUtils.isEmpty(collection)) {
            return null;
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        AdminClient create = AdminClient.create(properties);
        create.listTopics();
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(new NewTopic(it.next(), 10, (short) 2));
        }
        return create.createTopics(arrayList);
    }

    public static Map<String, Integer> getTopicPartition(String str, Collection<String> collection) throws ExecutionException, InterruptedException {
        if (CollectionUtils.isEmpty(collection)) {
            return new HashMap();
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        Map values = AdminClient.create(properties).describeTopics(collection).values();
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : values.entrySet()) {
            KafkaFuture kafkaFuture = (KafkaFuture) entry.getValue();
            log.info("开始检查topic:{}的分区数", entry.getKey());
            try {
                TopicDescription topicDescription = (TopicDescription) kafkaFuture.get();
                hashMap.put(topicDescription.name(), Integer.valueOf(topicDescription.partitions().size()));
            } catch (ExecutionException e) {
                throw new ExecutionException("检查topic:【" + ((String) entry.getKey()) + "】时出现异常！！！", e);
            }
        }
        return hashMap;
    }

    public static <T extends ITopicTableRel> KafkaSink<BasicVO> gainKafkaSink(String str, String str2, String str3, String str4, List<AccessTableConf> list) {
        Properties properties = new Properties();
        properties.setProperty("batch.size", "5000");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "50");
        if (StrUtil.isNotBlank(str2)) {
            properties.setProperty("compression.type", str2);
        }
        if (StrUtil.isNotBlank(str3)) {
            properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, str3);
        }
        log.info("kafkaProducerConfig:" + JSONUtil.toJsonStr(properties));
        return KafkaSink.builder().setKafkaProducerConfig(properties).setBootstrapServers(str).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopicSelector(new WqTableNameTopicSelector(str4, list)).setPartitioner(new WqKafkaPartitioner()).setValueSerializationSchema(new BasicVODeserializationSchema()).setKeySerializationSchema(new BasicVOKeySerialization()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();
    }

    public static KafkaSource<BasicVO> gainKafkaSource(String str, String str2, String str3, List<Long> list) {
        Properties properties = new Properties();
        properties.put("commit.offsets.on.checkpoint", true);
        properties.put("partition.discovery.interval.ms", "60000");
        return KafkaSource.builder().setBootstrapServers(str).setTopics(new String[]{str2 + str3}).setGroupId("my-group-sink-to-doris-" + str3 + StringUtils.join(list, CommonMark.COMMA)).setStartingOffsets(OffsetsInitializer.earliest()).setDeserializer(new WqKafkaRecordDeserializationSchema(list)).setProperties(properties).build();
    }

    public static void recordError(String str, BasicVO basicVO, String str2) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        kafkaProducer.send(new ProducerRecord(JobConfig.ERROR_MSG_TOPIC, str2, JSONUtil.toJsonStr(basicVO)));
        kafkaProducer.close();
    }
}
