package com.worktrans.job.operator.sink.doris;

import com.alibaba.fastjson.JSONObject;
import com.worktrans.exception.BizException;
import com.worktrans.job.vo.BasicVO;
import io.debezium.data.Envelope;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.table.DorisStreamLoad;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/worktrans/job/operator/sink/doris/DorisOutputFormat.class */
public class DorisOutputFormat extends RichOutputFormat<BasicVO> {
    private static final Logger log = LoggerFactory.getLogger(DorisOutputFormat.class);
    private static final String ESCAPE_DELIMITERS_KEY = "escape_delimiters";
    private static final String ESCAPE_DELIMITERS_DEFAULT = "false";
    private final Properties streamLoadProp;
    private final DorisOptions options;
    private final DorisReadOptions readOptions;
    private final DorisExecutionOptions executionOptions;
    private DorisStreamLoad dorisStreamLoad;
    private transient ScheduledExecutorService scheduler;
    private transient ScheduledFuture<?> scheduledFuture;
    private volatile transient Exception flushException;
    private final Map<String, Map> upsertBatch = new HashMap(10000);
    private final Map<String, Map> deleteBatch = new HashMap(10000);
    private volatile transient boolean closed = false;

    public DorisOutputFormat(DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, DorisExecutionOptions dorisExecutionOptions) {
        this.options = dorisOptions;
        this.readOptions = dorisReadOptions;
        this.executionOptions = dorisExecutionOptions;
        this.streamLoadProp = dorisExecutionOptions.getStreamLoadProp();
        if (Boolean.parseBoolean(this.streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT)) && this.streamLoadProp.contains(ESCAPE_DELIMITERS_KEY)) {
            this.streamLoadProp.remove(ESCAPE_DELIMITERS_KEY);
        }
    }

    public void configure(Configuration configuration) {
    }

    public void open(int i, int i2) throws IOException {
        this.dorisStreamLoad = new DorisStreamLoad(getBackend(), this.options.getTableIdentifier().split("\\.")[0], this.options.getTableIdentifier().split("\\.")[1], this.options.getUsername(), this.options.getPassword(), this.streamLoadProp);
        log.info("Streamload BE:{}", this.dorisStreamLoad.getLoadUrlStr());
        if (this.executionOptions.getBatchIntervalMs().longValue() == 0 || this.executionOptions.getBatchSize().intValue() == 1) {
            return;
        }
        this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-order-streamLoad-output-format"));
        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
            synchronized (this) {
                if (!this.closed) {
                    try {
                        flush();
                        if (Thread.currentThread().isInterrupted()) {
                            this.closed = true;
                            Thread.currentThread().interrupt();
                        }
                    } catch (Exception e) {
                        this.flushException = e;
                    }
                }
            }
        }, this.executionOptions.getBatchIntervalMs().longValue(), this.executionOptions.getBatchIntervalMs().longValue(), TimeUnit.MILLISECONDS);
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            throw new BizException("Writing records to streamload failed.", this.flushException);
        }
    }

    public synchronized void writeRecord(BasicVO basicVO) throws IOException {
        checkFlushException();
        addBatch(basicVO);
        if (this.executionOptions.getBatchSize().intValue() <= 0 || this.deleteBatch.size() + this.upsertBatch.size() < this.executionOptions.getBatchSize().intValue()) {
            return;
        }
        flush();
    }

    private void addBatch(BasicVO basicVO) {
        setDefaultTimestampAndDelSign(basicVO);
        String str = basicVO.getStr("cal_pk");
        if (Envelope.Operation.DELETE.code().equalsIgnoreCase(basicVO.getOp())) {
            this.deleteBatch.put(str, basicVO.getBefore());
            this.upsertBatch.remove(str);
        } else {
            this.upsertBatch.put(str, basicVO.getAfter());
            this.deleteBatch.remove(str);
        }
    }

    private void setDefaultTimestampAndDelSign(BasicVO basicVO) {
        Map<String, Object> before = Envelope.Operation.DELETE.code().equalsIgnoreCase(basicVO.getOp()) ? basicVO.getBefore() : basicVO.getAfter();
        if (before == null || before.get("data_load_timestamp") != null) {
            return;
        }
        before.put("data_load_timestamp", Long.valueOf(System.currentTimeMillis()));
        before.put("cal_count", 1);
    }

    public synchronized void close() throws IOException {
        if (!this.closed) {
            this.closed = true;
            try {
                flush();
                if (this.scheduledFuture != null) {
                    this.scheduledFuture.cancel(false);
                    this.scheduler.shutdown();
                }
            } catch (Exception e) {
                log.warn("Writing records to doris failed.", e);
                throw new BizException("Writing records to doris failed.", e);
            }
        }
        checkFlushException();
    }

    public synchronized Integer flush() throws IOException {
        checkFlushException();
        Integer num = 0;
        if (!this.deleteBatch.isEmpty()) {
            doFlush(Envelope.Operation.DELETE, this.deleteBatch.values());
            num = Integer.valueOf(num.intValue() + this.deleteBatch.size());
            this.deleteBatch.clear();
        }
        if (!this.upsertBatch.isEmpty()) {
            doFlush(Envelope.Operation.CREATE, this.upsertBatch.values());
            num = Integer.valueOf(num.intValue() + this.upsertBatch.size());
            this.upsertBatch.clear();
        }
        return num;
    }

    private void doFlush(Envelope.Operation operation, Collection<Map> collection) throws IOException {
        String jSONString = JSONObject.toJSONString(collection);
        for (int i = 0; i <= this.executionOptions.getMaxRetries().intValue(); i++) {
            try {
                if (Envelope.Operation.DELETE == operation) {
                    this.streamLoadProp.put("merge_type", "DELETE");
                } else {
                    this.streamLoadProp.remove("merge_type");
                }
                log.info("table:{},执行doFlush, merge_type={}, size={}", new Object[]{this.options.getTableIdentifier(), this.streamLoadProp.get("merge_type"), Integer.valueOf(collection.size())});
                this.dorisStreamLoad.load(jSONString);
                return;
            } catch (StreamLoadException e) {
                log.error("doris sink error, retry times = {}", Integer.valueOf(i), e);
                if (i >= this.executionOptions.getMaxRetries().intValue()) {
                    throw new IOException((Throwable) e);
                }
                try {
                    this.dorisStreamLoad.setHostPort(getBackend());
                    log.warn("streamload error,switch be: {}", this.dorisStreamLoad.getLoadUrlStr(), e);
                    Thread.sleep(1000 * i);
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new IOException("unable to flush; interrupted while doing another attempt", e);
                }
            }
        }
    }

    private String getBackend() throws IOException {
        try {
            return RestService.randomBackend(this.options, this.readOptions, log);
        } catch (IOException | DorisException e) {
            log.error("get backends info fail");
            throw new IOException(e);
        }
    }
}
