package com.worktrans.custom.report.center.mq.kafka.starter.service.impl;

import com.worktrans.custom.report.center.mq.kafka.starter.beans.KafkaMessage;
import com.worktrans.custom.report.center.mq.kafka.starter.beans.KafkaRequest;
import com.worktrans.custom.report.center.mq.kafka.starter.service.IKafkaService;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;

/* loaded from: input_file:com/worktrans/custom/report/center/mq/kafka/starter/service/impl/KafkaService.class */
public class KafkaService<K, V> implements IKafkaService<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaService.class);

    @Autowired
    private KafkaTemplate<K, V> dataCenterKafkaTemplate;

    @Override // com.worktrans.custom.report.center.mq.kafka.starter.service.IKafkaService
    public ListenableFuture<SendResult<K, V>> handleRequest(KafkaRequest<K, V> kafkaRequest) throws KafkaException {
        if (Objects.isNull(kafkaRequest)) {
            if (log.isErrorEnabled()) {
                log.error("kafka request is empty");
            }
            throw new KafkaException("Kafka请求为空");
        }
        KafkaMessage<K, V> message = kafkaRequest.getMessage();
        ListenableFuture<SendResult<K, V>> send = this.dataCenterKafkaTemplate.send(message.getTopic(), message.getKey(), message.getValue());
        send.addCallback(sendResult -> {
            if (log.isInfoEnabled()) {
                log.info(String.format("message %s send successfully", message.toString()));
            }
            if (Objects.isNull(kafkaRequest.getSuccessCallback())) {
                return;
            }
            kafkaRequest.getSuccessCallback().onSuccess(sendResult);
        }, th -> {
            if (log.isErrorEnabled()) {
                log.error(String.format("failed to sent message %s, %s", message.toString(), th.getMessage()), th);
            }
            if (Objects.isNull(kafkaRequest.getFailureCallback())) {
                return;
            }
            kafkaRequest.getFailureCallback().onFailure(th);
        });
        return send;
    }
}
