package com.worktrans.job.operator.sink.doris;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms.TimestampConverter;
import com.worktrans.datacenter.datalink.domain.cons.CommonMark;
import com.worktrans.job.etl.EtlContext;
import com.worktrans.job.vo.BasicVO;
import io.debezium.config.CommonConnectorConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/worktrans/job/operator/sink/doris/DorisSinkFunction.class */
public class DorisSinkFunction extends RichSinkFunction<BasicVO> implements CheckpointedFunction {
    private static final Logger log = LoggerFactory.getLogger(DorisSinkFunction.class);
    private final DorisOutputFormat outputFormat;
    private EtlContext etlContext;

    public DorisSinkFunction(EtlContext etlContext, DorisProperty dorisProperty) {
        this.etlContext = etlContext;
        this.outputFormat = new DorisOutputFormat(getOptions(dorisProperty), DorisReadOptions.builder().build(), getExecutionOptions(dorisProperty));
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        RuntimeContext runtimeContext = getRuntimeContext();
        this.outputFormat.setRuntimeContext(runtimeContext);
        this.outputFormat.open(runtimeContext.getIndexOfThisSubtask(), runtimeContext.getNumberOfParallelSubtasks());
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.outputFormat.flush();
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public void invoke(BasicVO basicVO, SinkFunction.Context context) throws Exception {
        this.outputFormat.writeRecord(basicVO);
    }

    public void close() throws Exception {
        this.outputFormat.close();
        this.etlContext.close();
        super.close();
    }

    private DorisExecutionOptions getExecutionOptions(DorisProperty dorisProperty) {
        Properties properties = new Properties();
        properties.put("strip_outer_array", "true");
        properties.put(TimestampConverter.FORMAT_CONFIG, "json");
        if (StringUtils.isNotEmpty(dorisProperty.getSeqColumn())) {
            properties.put("function_column.sequence_col", dorisProperty.getSeqColumn());
        }
        Map<String, Object> prop = dorisProperty.getProp();
        if (prop == null) {
            prop = new HashMap();
        }
        return DorisExecutionOptions.builder().setBatchSize(Integer.valueOf(Integer.parseInt(prop.getOrDefault("batchSize", 10000).toString()))).setBatchIntervalMs(Long.valueOf(Long.parseLong(prop.getOrDefault("batchIntervalMs", Long.valueOf(CommonConnectorConfig.DEFAULT_RETRIABLE_RESTART_WAIT)).toString()))).setMaxRetries(Integer.valueOf(((Integer) prop.getOrDefault("maxRetries", 1)).intValue())).setStreamLoadProp(properties).build();
    }

    private DorisOptions getOptions(DorisProperty dorisProperty) {
        return DorisOptions.builder().setFenodes(dorisProperty.getIp() + CommonMark.COLON + dorisProperty.getHttpPort()).setTableIdentifier(dorisProperty.getDb() + CommonMark.DOT + dorisProperty.getTable()).setUsername(dorisProperty.getUsername()).setPassword(dorisProperty.getPassword()).build();
    }
}
