package com.worktrans.job.operator;

import cn.hutool.core.map.MapUtil;
import cn.hutool.db.Db;
import cn.hutool.json.JSONUtil;
import com.worktrans.datacenter.datalink.domain.cons.CommonMark;
import com.worktrans.datacenter.datalink.domain.vo.DorisConfigVO;
import com.worktrans.job.etl.EtlContext;
import com.worktrans.job.vo.AccessTableProgressMap;
import com.worktrans.job.vo.AccessTableProgressVO;
import com.worktrans.job.vo.BasicVO;
import com.worktrans.util.JdbcConnectUtil;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/worktrans/job/operator/AccessTableProgressCollectProcessFunction.class */
public class AccessTableProgressCollectProcessFunction extends ProcessFunction<BasicVO, BasicVO> implements CheckpointedFunction {
    private static final Logger log = LoggerFactory.getLogger(AccessTableProgressCollectProcessFunction.class);
    private final String topic;
    private final DorisConfigVO dorisConfigVO;
    private final String STATE_KEY = "table_access_progress_state";
    private final Map<String, AccessTableProgressVO> tranferPathProgressVOMap = new AccessTableProgressMap();
    private final Map<String, Set<String>> sinkTableToSourceTableMap;
    private transient ListState<AccessTableProgressVO> listState;
    private EtlContext etlContext;

    public AccessTableProgressCollectProcessFunction(EtlContext etlContext, String str, Map<String, Set<String>> map, DorisConfigVO dorisConfigVO) {
        this.topic = str;
        this.sinkTableToSourceTableMap = map;
        this.dorisConfigVO = dorisConfigVO;
        this.etlContext = etlContext;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        getRuntimeContext().getMetricGroup().gauge("access_progress", () -> {
            return this.tranferPathProgressVOMap;
        });
    }

    public void processElement(BasicVO basicVO, ProcessFunction<BasicVO, BasicVO>.Context context, Collector<BasicVO> collector) throws Exception {
        try {
            updateProgress(basicVO);
        } catch (Exception e) {
            log.error("收集指标出错！", e);
        }
        collector.collect(basicVO);
    }

    private void updateProgress(BasicVO basicVO) {
        String fullOriginalTableName = basicVO.getSource().getFullOriginalTableName();
        String fullTableName = basicVO.getSource().getFullTableName();
        String str = fullOriginalTableName + CommonMark.DOLLER + fullTableName;
        AccessTableProgressVO accessTableProgressVO = this.tranferPathProgressVOMap.get(str);
        if (accessTableProgressVO == null) {
            accessTableProgressVO = new AccessTableProgressVO(fullTableName, fullOriginalTableName);
            this.tranferPathProgressVOMap.put(str, accessTableProgressVO);
        }
        if (Envelope.Operation.READ.code().equals(basicVO.getOp())) {
            accessTableProgressVO.setStage(AbstractSourceInfo.SNAPSHOT_KEY);
        } else {
            accessTableProgressVO.setStage("stream");
        }
        accessTableProgressVO.getCount().incrementAndGet();
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (MapUtil.isNotEmpty(this.tranferPathProgressVOMap)) {
            this.listState.update(new ArrayList(this.tranferPathProgressVOMap.values()));
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.listState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("table_access_progress_state", TypeInformation.of(AccessTableProgressVO.class)));
        Set<String> keySet = this.sinkTableToSourceTableMap.keySet();
        HashSet<String> hashSet = new HashSet();
        if (functionInitializationContext.isRestored()) {
            Iterable<AccessTableProgressVO> iterable = (Iterable) this.listState.get();
            log.info("{}进度指标收集算子从ck中恢复，{}", this.topic, JSONUtil.toJsonStr(iterable));
            HashSet hashSet2 = new HashSet();
            for (AccessTableProgressVO accessTableProgressVO : iterable) {
                this.tranferPathProgressVOMap.put(accessTableProgressVO.getSourceTable() + CommonMark.DOLLER + accessTableProgressVO.getSinkTable(), accessTableProgressVO);
                hashSet2.add(accessTableProgressVO.getSinkTable());
            }
            for (String str : keySet) {
                if (!hashSet2.contains(str)) {
                    hashSet.add(str);
                }
            }
        } else {
            hashSet.addAll(keySet);
        }
        if (hashSet.isEmpty()) {
            return;
        }
        Db use = Db.use(this.etlContext.getDbSourceFactory().getDbDataSource(JdbcConnectUtil.getConn(this.dorisConfigVO.getIp(), this.dorisConfigVO.getPort(), this.dorisConfigVO.getUsername(), this.dorisConfigVO.getPassword())));
        for (String str2 : hashSet) {
            Long valueOf = Long.valueOf(use.count("select 1 from " + str2, new Object[0]));
            for (String str3 : this.sinkTableToSourceTableMap.getOrDefault(str2, new HashSet())) {
                this.tranferPathProgressVOMap.computeIfAbsent(str3 + CommonMark.DOLLER + str2, str4 -> {
                    AccessTableProgressVO accessTableProgressVO2 = new AccessTableProgressVO(str2, str3);
                    accessTableProgressVO2.setCount(new AtomicLong(valueOf.longValue()));
                    return accessTableProgressVO2;
                });
            }
        }
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
        processElement((BasicVO) obj, (ProcessFunction<BasicVO, BasicVO>.Context) context, (Collector<BasicVO>) collector);
    }
}
