package com.worktrans.job.etl;

import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.worktrans.cons.MysqlInstanceEnum;
import com.worktrans.datacenter.datalink.domain.cons.CommonChar;
import com.worktrans.datacenter.datalink.domain.cons.CommonMark;
import com.worktrans.datacenter.datalink.domain.vo.AccessTableConf;
import com.worktrans.datacenter.datalink.domain.vo.JdbcSourceConfigVO;
import com.worktrans.datacenter.datalink.domain.vo.JobBasicParam;
import com.worktrans.datacenter.datalink.domain.vo.MysqlCdcSourceProps;
import com.worktrans.db.DorisTableStructServer;
import com.worktrans.db.MysqlTableStructServer;
import com.worktrans.exception.BizException;
import com.worktrans.job.config.ConfigHandler;
import com.worktrans.job.operator.SchemaAndDataTypeProcessFunction;
import com.worktrans.job.operator.map.CdcJsonToBasicVoMapFunction;
import com.worktrans.job.source.CdcSource;
import com.worktrans.job.vo.AccessJobParam;
import com.worktrans.job.vo.BasicVO;
import com.worktrans.kafka.KafkaUtil;
import com.worktrans.util.UidGenerateUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/worktrans/job/etl/MysqlOdsJob.class */
public class MysqlOdsJob extends AbstractStreamBaseJob {
    private static final Logger LOG = LogManager.getLogger(MysqlOdsJob.class);
    private AccessJobParam accessJobParam;
    private ConfigHandler configHandler;
    private EtlContext etlContext;

    public static void main(String[] strArr) throws Exception {
        new MysqlOdsJob().execute(strArr);
    }

    public void execute(String[] strArr) throws Exception {
        JobBasicParam prepareRun = super.prepareRun(strArr);
        if (StringUtils.isEmpty(super.getBasicParam().getJobName())) {
            throw new BizException("ods任务请指定jobName");
        }
        this.configHandler = new ConfigHandler(prepareRun);
        this.etlContext = new EtlContext();
        List<Long> allCids = new DorisTableStructServer(this.etlContext.getDbSourceFactory(), prepareRun.getSinkConfig()).getAllCids();
        if (CollectionUtils.isEmpty(allCids)) {
            throw new BizException("ods任务没有获取到任何要运行的cid,请检查配置");
        }
        LOG.error(prepareRun.getJobName() + " cids {}", JSONUtil.toJsonStr(allCids));
        prepareRun.setCids(allCids);
        this.accessJobParam = this.configHandler.gainDwdJobParam(allCids);
        this.accessJobParam.setSinkProps(prepareRun.getSinkConfig());
        List<AccessTableConf> accessTableConfs = this.accessJobParam.getAccessTableConfs();
        LOG.error(prepareRun.getJobName() + " accessTableConfs {}", JSONUtil.toJsonStr(accessTableConfs));
        if (this.accessJobParam == null || CollectionUtils.isEmpty(accessTableConfs)) {
            LOG.error("ods任务配置不正确，dimJobParam:{}", JSONObject.toJSONString(this.accessJobParam));
            throw new BizException("参数信息不全，请检查维表配置！dimJobParam=" + JSONObject.toJSONString(this.accessJobParam));
        }
        HashSet hashSet = new HashSet();
        Iterator<AccessTableConf> it = accessTableConfs.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getTopic());
        }
        super.createTopicIfNotExist(hashSet, CommonChar.KAFKA_TOPIC_PREFIX_ODS);
        List<MysqlCdcSourceProps> gainMysqlCdcSourceProps = this.accessJobParam.gainMysqlCdcSourceProps();
        LOG.error(prepareRun.getJobName() + " before sources {}", JSONUtil.toJsonStr(gainMysqlCdcSourceProps));
        List<MysqlInstanceEnum> byName = MysqlInstanceEnum.getByName(prepareRun.getJobName());
        List<MysqlCdcSourceProps> list = (List) gainMysqlCdcSourceProps.stream().filter(mysqlCdcSourceProps -> {
            return byName.contains(MysqlInstanceEnum.getByIp(mysqlCdcSourceProps.getIp()));
        }).collect(Collectors.toList());
        LOG.error(prepareRun.getJobName() + " after sources {}", JSONUtil.toJsonStr(list));
        HashMap hashMap = new HashMap();
        for (AccessTableConf accessTableConf : accessTableConfs) {
            JdbcSourceConfigVO sourceJdbcConfig = accessTableConf.getSourceJdbcConfig();
            if (sourceJdbcConfig != null && !StringUtils.isNotBlank(accessTableConf.getSourceTopic())) {
                ((List) hashMap.computeIfAbsent(sourceJdbcConfig.getIp() + CommonMark.COLON + sourceJdbcConfig.getPort(), str -> {
                    return new ArrayList();
                })).add(accessTableConf);
            }
        }
        for (MysqlCdcSourceProps mysqlCdcSourceProps2 : list) {
            String gainUniqueKey = mysqlCdcSourceProps2.gainUniqueKey();
            if (super.getBasicParam().getSelectedMysqlInstance() == null || super.getBasicParam().getSelectedMysqlInstance().contains(gainUniqueKey)) {
                handleOneMysqlInstance(prepareRun, mysqlCdcSourceProps2, (List) hashMap.get(gainUniqueKey));
            }
        }
        this.configHandler.updateAccessJobTable(accessTableConfs);
        this.configHandler.close();
        FlinkEnvironmentHolder.env.execute(prepareRun.getJobName());
    }

    private void handleOneMysqlInstance(JobBasicParam jobBasicParam, MysqlCdcSourceProps mysqlCdcSourceProps, List<AccessTableConf> list) {
        LOG.error(jobBasicParam.getJobName() + " mysqlCdcSourceProps {}", JSONUtil.toJsonStr(mysqlCdcSourceProps));
        LOG.error(jobBasicParam.getJobName() + " accessTableConfs {}", JSONUtil.toJsonStr(list));
        SingleOutputStreamOperator parallelism = getSource(jobBasicParam, mysqlCdcSourceProps).process(new SchemaAndDataTypeProcessFunction(this.etlContext, list, this.accessJobParam.getSinkProps(), MysqlTableStructServer.getMysqlTableStructServer(this.etlContext.getDbSourceFactory(), mysqlCdcSourceProps.genJdbcConfig()), super.getBasicParam().getBootstrapServers(), getBasicParam().isNeedLowerCase())).name("schema_and_datatype_process").setParallelism(1);
        KafkaSink<BasicVO> gainKafkaSink = KafkaUtil.gainKafkaSink(getBasicParam().getBootstrapServers(), getBasicParam().getCompressionType(), getBasicParam().getMaxRequestSize(), CommonChar.KAFKA_TOPIC_PREFIX_ODS, this.accessJobParam.getAccessTableConfs());
        for (AccessTableConf accessTableConf : list) {
            Integer rescan = accessTableConf.getRescan();
            if (rescan != null && rescan.intValue() > 0) {
                LOG.info("source_table_name:{}重放任务开始运行，重置redis开关", accessTableConf.getSourceTableName());
                this.configHandler.resetRescan(accessTableConf.getCid(), accessTableConf.getTaskCode());
            }
        }
        parallelism.sinkTo(gainKafkaSink).name("kafka_sink_" + mysqlCdcSourceProps.gainUniqueKey()).uid(UidGenerateUtil.getUid("mysql_ods" + mysqlCdcSourceProps.gainUniqueKey(), "sink", 0));
    }

    private SingleOutputStreamOperator<BasicVO> getSource(JobBasicParam jobBasicParam, MysqlCdcSourceProps mysqlCdcSourceProps) {
        MySqlSource<String> mySqlSource = CdcSource.getMySqlSource(mysqlCdcSourceProps);
        MysqlInstanceEnum byIpPort = MysqlInstanceEnum.getByIpPort(mysqlCdcSourceProps.getIp(), mysqlCdcSourceProps.getPort());
        SingleOutputStreamOperator uid = FlinkEnvironmentHolder.env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), byIpPort != null ? "mysql_source_" + byIpPort.getDesc() : "mysql_source_" + mysqlCdcSourceProps.gainUniqueKey()).uid(UidGenerateUtil.getUid("ods", mysqlCdcSourceProps.gainUniqueKey(), UidGenerateUtil.DEFAULT_VERSION));
        LOG.info(jobBasicParam.getJobName() + " CdcJsonToBasicVoMapFunctionConfig {}", Boolean.valueOf(getBasicParam().isNeedLowerCase()));
        return uid.map(new CdcJsonToBasicVoMapFunction(getBasicParam().isNeedLowerCase())).name("cdcJson_to_basicBO");
    }
}
