package com.worktrans.framework.pt.log.config.async;

import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.worktrans.commons.serializer.lz4.Lz4Serialization;
import com.worktrans.framework.pt.common.log.async.LogThreadFactory;
import com.worktrans.framework.pt.common.log.async.RingBufferLogEvent;
import com.worktrans.framework.pt.common.log.async.RingBufferLogEventTranslator;
import com.worktrans.framework.pt.log.config.kafka.KafkaProducer;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/worktrans/framework/pt/log/config/async/AsyncLoggerDisruptor.class */
public class AsyncLoggerDisruptor {
    private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
    private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
    private volatile Disruptor<RingBufferLogEvent> disruptor;
    private String contextName;
    private int ringBufferSize;
    private Lz4Serialization lz4Serialization;
    private KafkaProducer kafkaProducer;
    private String topic;
    private Integer partitions;
    private Integer consumeThreadNum;
    private Logger logger = LoggerFactory.getLogger(AsyncLoggerDisruptor.class);
    private boolean useThreadLocalTranslator = true;

    Disruptor<RingBufferLogEvent> getDisruptor() {
        return this.disruptor;
    }

    public synchronized void start() {
        if (this.disruptor != null) {
            this.logger.trace("[{}] AsyncLoggerDisruptor not starting new disruptor for this context, using existing object.", this.contextName);
            return;
        }
        this.logger.trace("[{}] AsyncLoggerDisruptor creating new disruptor for this context.", this.contextName);
        this.ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize");
        WaitStrategy createWaitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy");
        this.disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, this.ringBufferSize, new LogThreadFactory("AsyncLogger[" + this.contextName + "]", true, 5) { // from class: com.worktrans.framework.pt.log.config.async.AsyncLoggerDisruptor.1
            public Thread newThread(Runnable runnable) {
                return super.newThread(runnable);
            }
        }, ProducerType.MULTI, createWaitStrategy);
        ExceptionHandler<RingBufferLogEvent> asyncLoggerExceptionHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
        this.disruptor.setDefaultExceptionHandler(asyncLoggerExceptionHandler);
        RingBufferLogEventHandler[] ringBufferLogEventHandlerArr = new RingBufferLogEventHandler[this.consumeThreadNum.intValue()];
        for (int i = 0; i < this.consumeThreadNum.intValue(); i++) {
            ringBufferLogEventHandlerArr[i] = new RingBufferLogEventHandler(this.kafkaProducer, this.topic, this.partitions.intValue(), this.lz4Serialization);
        }
        this.disruptor.handleEventsWithWorkerPool(ringBufferLogEventHandlerArr);
        this.logger.info("[{}] Starting AsyncLogger disruptor for this context with ringbufferSize={}, waitStrategy={}, exceptionHandler={}...", new Object[]{this.contextName, Integer.valueOf(this.disruptor.getRingBuffer().getBufferSize()), createWaitStrategy.getClass().getSimpleName(), asyncLoggerExceptionHandler});
        this.disruptor.start();
    }

    public boolean stop(long j, TimeUnit timeUnit) {
        Disruptor<RingBufferLogEvent> disruptor = getDisruptor();
        if (disruptor == null) {
            this.logger.trace("[{}] AsyncLoggerDisruptor: disruptor for this context already shut down.", this.contextName);
            return true;
        }
        this.logger.debug("[{}] AsyncLoggerDisruptor: shutting down disruptor for this context.", this.contextName);
        this.disruptor = null;
        for (int i = 0; hasBacklog(disruptor) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e) {
            }
        }
        try {
            disruptor.shutdown(j, timeUnit);
        } catch (TimeoutException e2) {
            this.logger.warn("[{}] AsyncLoggerDisruptor: shutdown timed out after {} {}", new Object[]{this.contextName, Long.valueOf(j), timeUnit});
            disruptor.halt();
        }
        this.logger.trace("[{}] AsyncLoggerDisruptor: disruptor has been shut down.", this.contextName);
        return true;
    }

    public boolean tryPublish(RingBufferLogEventTranslator ringBufferLogEventTranslator) {
        try {
            this.logger.info("tryPublish start");
            return this.disruptor.getRingBuffer().tryPublishEvent(ringBufferLogEventTranslator);
        } catch (NullPointerException e) {
            this.logger.warn("Ignoring log event after log was shut down: {} ", e);
            return false;
        }
    }

    private static boolean hasBacklog(Disruptor<?> disruptor) {
        RingBuffer ringBuffer = disruptor.getRingBuffer();
        return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
    }

    public void setContextName(String str) {
        this.contextName = str;
    }

    public void setLz4Serialization(Lz4Serialization lz4Serialization) {
        this.lz4Serialization = lz4Serialization;
    }

    public void setKafkaProducer(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public void setPartitions(Integer num) {
        this.partitions = num;
    }

    public void setConsumeThreadNum(Integer num) {
        this.consumeThreadNum = num;
    }
}
