package com.worktrans.job.etl;

import cn.hutool.core.lang.Assert;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import com.worktrans.cons.JobConfig;
import com.worktrans.datacenter.datalink.domain.cons.CommonChar;
import com.worktrans.datacenter.datalink.domain.cons.CommonMark;
import com.worktrans.datacenter.datalink.domain.vo.AccessTableConf;
import com.worktrans.datacenter.datalink.domain.vo.DorisConfigVO;
import com.worktrans.datacenter.datalink.domain.vo.JobBasicParam;
import com.worktrans.db.DorisTableStructServer;
import com.worktrans.job.config.ConfigHandler;
import com.worktrans.job.operator.AccessTableProgressCollectProcessFunction;
import com.worktrans.job.operator.DistributeByTableProcessFunction;
import com.worktrans.job.operator.DwdEmitElementsForDestinationsProcessFunction;
import com.worktrans.job.operator.DwdPkFieldUpdateFixProcessFunction;
import com.worktrans.job.operator.DwdSpecialFieldProcessFunction;
import com.worktrans.job.operator.filter.FilterFunction;
import com.worktrans.job.operator.kafka.WqKafkaRecordDeserializationSchema;
import com.worktrans.job.operator.sink.doris.DorisProperty;
import com.worktrans.job.operator.sink.doris.DorisSinkFunction;
import com.worktrans.job.vo.AccessJobParam;
import com.worktrans.job.vo.BasicVO;
import com.worktrans.kafka.KafkaUtil;
import com.worktrans.util.UidGenerateUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/worktrans/job/etl/DwdJob.class */
public class DwdJob extends AbstractStreamBaseJob {
    private static final Logger log = LoggerFactory.getLogger(DwdJob.class);
    private EtlContext etlContext;

    public static void main(String[] strArr) throws Exception {
        new DwdJob().execute(strArr);
    }

    public void execute(String[] strArr) throws Exception {
        this.etlContext = new EtlContext();
        JobBasicParam prepareRun = super.prepareRun(strArr);
        ConfigHandler configHandler = new ConfigHandler(prepareRun);
        List<Long> cids = super.getBasicParam().getCids();
        AccessJobParam gainDwdJobParam = configHandler.gainDwdJobParam(cids);
        Assert.notNull(prepareRun.getSinkConfig(), "明细表任务必须指定sink配置！", new Object[0]);
        gainDwdJobParam.setSinkProps(prepareRun.getSinkConfig());
        List<AccessTableConf> accessTableConfs = gainDwdJobParam.getAccessTableConfs();
        Map<String, List<AccessTableConf>> hashMap = new HashMap<>(12);
        for (AccessTableConf accessTableConf : accessTableConfs) {
            hashMap.computeIfAbsent(CommonChar.KAFKA_TOPIC_PREFIX_ODS + accessTableConf.getTopic(), str -> {
                return new ArrayList();
            }).add(accessTableConf);
        }
        Map<String, String> allTableSeqColumn = getAllTableSeqColumn(gainDwdJobParam, hashMap);
        Map<String, Integer> topicPartition = KafkaUtil.getTopicPartition(super.getBasicParam().getBootstrapServers(), hashMap.keySet());
        HashSet hashSet = new HashSet();
        for (String str2 : hashMap.keySet()) {
            List<AccessTableConf> list = hashMap.get(str2);
            KafkaSourceBuilder property = KafkaSource.builder().setBootstrapServers(super.getBasicParam().getBootstrapServers()).setGroupId("dwd_consumer_" + prepareRun.getJobName() + CommonMark.UNDERLINE + str2).setStartingOffsets(OffsetsInitializer.earliest()).setDeserializer(new WqKafkaRecordDeserializationSchema(cids)).setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest").setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false").setProperty("request.timeout.ms", "60000");
            if (cids.contains(JobConfig.LONG_ZERO)) {
                property.setTopics(new String[]{str2});
            } else {
                HashSet hashSet2 = new HashSet();
                Iterator<Long> it = cids.iterator();
                while (it.hasNext()) {
                    hashSet2.add(new TopicPartition(str2, Utils.toPositive(it.next().intValue()) % topicPartition.get(str2).intValue()));
                }
                Iterator<Long> it2 = cids.iterator();
                while (it2.hasNext()) {
                    hashSet2.add(new TopicPartition(str2, Utils.toPositive(it2.next().intValue()) % topicPartition.get(str2).intValue()));
                }
                property.setPartitions(hashSet2);
            }
            SingleOutputStreamOperator name = FlinkEnvironmentHolder.env.fromSource(property.build(), WatermarkStrategy.noWatermarks(), "Kafka Source:" + str2).uid(UidGenerateUtil.getUid("dwd", "kafka_source" + str2, 0)).process(new DwdEmitElementsForDestinationsProcessFunction(list, cids.contains(JobConfig.LONG_ZERO))).name("multi_destination").filter(new FilterFunction(list)).name("filter").process(new DwdSpecialFieldProcessFunction(list)).name("udf");
            DistributeByTableProcessFunction distributeByTableProcessFunction = new DistributeByTableProcessFunction(list);
            Map<String, OutputTag<BasicVO>> gainOutputTagMap = distributeByTableProcessFunction.gainOutputTagMap();
            SingleOutputStreamOperator name2 = name.process(new AccessTableProgressCollectProcessFunction(this.etlContext, str2, distributeByTableProcessFunction.gainSinkTableToSourceTableMap(), gainDwdJobParam.getSinkProps())).name("progress").setParallelism(1).uid(UidGenerateUtil.getUid("dwd", "progress_collect_" + str2, 0)).process(distributeByTableProcessFunction).name("distribute");
            log.info(prepareRun.getJobName() + " topic {}", str2);
            for (Map.Entry<String, OutputTag<BasicVO>> entry : gainOutputTagMap.entrySet()) {
                handleSingle(prepareRun, gainDwdJobParam.getSinkProps(), entry.getKey(), name2.getSideOutput(entry.getValue()), list, hashSet, allTableSeqColumn);
            }
        }
        configHandler.close();
        FlinkEnvironmentHolder.env.execute(prepareRun.getJobName());
    }

    private Map<String, String> getAllTableSeqColumn(AccessJobParam accessJobParam, Map<String, List<AccessTableConf>> map) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = map.keySet().iterator();
        while (it.hasNext()) {
            Iterator<Map.Entry<String, OutputTag<BasicVO>>> it2 = new DistributeByTableProcessFunction(map.get(it.next())).gainOutputTagMap().entrySet().iterator();
            while (it2.hasNext()) {
                arrayList.add(it2.next().getKey());
            }
        }
        return new DorisTableStructServer(this.etlContext.getDbSourceFactory(), accessJobParam.getSinkProps()).getGmtModMap(arrayList);
    }

    private void handleSingle(JobBasicParam jobBasicParam, DorisConfigVO dorisConfigVO, String str, DataStream<BasicVO> dataStream, List<AccessTableConf> list, Set<String> set, Map<String, String> map) {
        String uid = UidGenerateUtil.getUid("dwd", "kafka_sink_doris_" + str, 0);
        log.info(jobBasicParam.getJobName() + " dwd uid {}", uid);
        if (set.contains(uid)) {
            log.info(jobBasicParam.getJobName() + " 重复 {}", uid);
            return;
        }
        set.add(uid);
        String[] split = StringUtils.split(str, CommonMark.DOT);
        DorisProperty dorisProperty = new DorisProperty();
        dorisProperty.setIp(dorisConfigVO.getIp());
        dorisProperty.setPort(dorisConfigVO.getPort());
        dorisProperty.setHttpPort(dorisConfigVO.getHttpPort());
        dorisProperty.setUsername(dorisConfigVO.getUsername());
        dorisProperty.setPassword(dorisConfigVO.getPassword());
        dorisProperty.setDb(split[0]);
        dorisProperty.setTable(split[1]);
        String str2 = map.get(str);
        if (str2 == null) {
            log.error(jobBasicParam.getJobName() + " seqColumnMap is null,{}", str);
        }
        dorisProperty.setSeqColumn(str2);
        dataStream.process(new DwdPkFieldUpdateFixProcessFunction(list)).addSink(new DorisSinkFunction(this.etlContext, dorisProperty)).setParallelism(1).name("doris_sink_" + str).uid(UidGenerateUtil.getUid("dwd", "kafka_sink_doris_" + str, 0));
    }
}
