package com.worktrans.job.etl;

import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONObject;
import com.worktrans.datacenter.datalink.domain.cons.CommonMark;
import com.worktrans.datacenter.datalink.domain.vo.AccessTableConf;
import com.worktrans.datacenter.datalink.domain.vo.JdbcSourceConfigVO;
import com.worktrans.datacenter.datalink.domain.vo.JobBasicParam;
import com.worktrans.datacenter.datalink.domain.vo.MysqlCdcSourceProps;
import com.worktrans.db.DorisTableStructServer;
import com.worktrans.db.MysqlTableStructServer;
import com.worktrans.exception.BizException;
import com.worktrans.job.config.ConfigHandler;
import com.worktrans.job.operator.AccessTableProgressCollectProcessFunction;
import com.worktrans.job.operator.DistributeByTableProcessFunction;
import com.worktrans.job.operator.DwdEmitElementsForDestinationsProcessFunction;
import com.worktrans.job.operator.DwdSpecialFieldProcessFunction;
import com.worktrans.job.operator.SchemaAndDataTypeProcessFunction;
import com.worktrans.job.operator.filter.FilterFunction;
import com.worktrans.job.operator.map.CdcJsonToBasicVoMapFunction;
import com.worktrans.job.operator.sink.doris.DorisProperty;
import com.worktrans.job.operator.sink.doris.DorisSinkFunction;
import com.worktrans.job.source.CdcSource;
import com.worktrans.job.vo.AccessJobParam;
import com.worktrans.job.vo.BasicVO;
import com.worktrans.util.UidGenerateUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/worktrans/job/etl/MysqlJob.class */
public class MysqlJob extends AbstractStreamBaseJob {
    private static final Logger log = LoggerFactory.getLogger(MysqlJob.class);
    private AccessJobParam accessJobParam;
    private ConfigHandler configHandler;
    private EtlContext etlContext;

    public static void main(String[] strArr) throws Exception {
        new MysqlJob().execute(strArr);
    }

    public void execute(String[] strArr) throws Exception {
        JobBasicParam prepareRun = super.prepareRun(strArr);
        if (StringUtils.isEmpty(super.getBasicParam().getJobName())) {
            throw new BizException("ods任务请指定jobName");
        }
        this.configHandler = new ConfigHandler(prepareRun);
        this.etlContext = new EtlContext();
        List<Long> allCids = new DorisTableStructServer(this.etlContext.getDbSourceFactory(), prepareRun.getSinkConfig()).getAllCids();
        if (CollectionUtils.isEmpty(allCids)) {
            throw new BizException("ods任务没有获取到任何要运行的cid,请检查配置");
        }
        prepareRun.setCids(allCids);
        this.accessJobParam = this.configHandler.gainDwdJobParam(allCids);
        this.accessJobParam.setSinkProps(prepareRun.getSinkConfig());
        log.error("ods任务配置dimJobParam:{}", JSONUtil.toJsonPrettyStr(this.accessJobParam));
        List<AccessTableConf> accessTableConfs = this.accessJobParam.getAccessTableConfs();
        if (this.accessJobParam == null || CollectionUtils.isEmpty(accessTableConfs)) {
            log.error("ods任务配置不正确，dimJobParam:{}", JSONObject.toJSONString(this.accessJobParam));
            throw new BizException("参数信息不全，请检查维表配置！dimJobParam=" + JSONObject.toJSONString(this.accessJobParam));
        }
        List<MysqlCdcSourceProps> gainMysqlCdcSourceProps = this.accessJobParam.gainMysqlCdcSourceProps();
        HashMap hashMap = new HashMap();
        for (AccessTableConf accessTableConf : accessTableConfs) {
            JdbcSourceConfigVO sourceJdbcConfig = accessTableConf.getSourceJdbcConfig();
            if (sourceJdbcConfig != null && !StringUtils.isNotBlank(accessTableConf.getSourceTopic())) {
                ((List) hashMap.computeIfAbsent(sourceJdbcConfig.getIp() + CommonMark.COLON + sourceJdbcConfig.getPort(), str -> {
                    return new ArrayList();
                })).add(accessTableConf);
            }
        }
        for (MysqlCdcSourceProps mysqlCdcSourceProps : gainMysqlCdcSourceProps) {
            String gainUniqueKey = mysqlCdcSourceProps.gainUniqueKey();
            if (super.getBasicParam().getSelectedMysqlInstance() == null || super.getBasicParam().getSelectedMysqlInstance().contains(gainUniqueKey)) {
                handleOneMysqlInstance(mysqlCdcSourceProps, (List) hashMap.get(gainUniqueKey));
            }
        }
        this.configHandler.updateAccessJobTable(accessTableConfs);
        this.configHandler.close();
        FlinkEnvironmentHolder.env.execute(prepareRun.getJobName());
    }

    private void handleOneMysqlInstance(MysqlCdcSourceProps mysqlCdcSourceProps, List<AccessTableConf> list) {
        MysqlTableStructServer mysqlTableStructServer = MysqlTableStructServer.getMysqlTableStructServer(this.etlContext.getDbSourceFactory(), mysqlCdcSourceProps.genJdbcConfig());
        DistributeByTableProcessFunction distributeByTableProcessFunction = new DistributeByTableProcessFunction(list);
        Map<String, OutputTag<BasicVO>> gainOutputTagMap = distributeByTableProcessFunction.gainOutputTagMap();
        SingleOutputStreamOperator name = getSource(mysqlCdcSourceProps).process(new SchemaAndDataTypeProcessFunction(this.etlContext, list, this.accessJobParam.getSinkProps(), mysqlTableStructServer, super.getBasicParam().getBootstrapServers(), getBasicParam().isNeedLowerCase())).name("schema_datatype_process").setParallelism(1).process(new DwdEmitElementsForDestinationsProcessFunction(list, true)).name("multi_destination").filter(new FilterFunction(list)).name("filter").process(new DwdSpecialFieldProcessFunction(list)).name("udf").process(new AccessTableProgressCollectProcessFunction(this.etlContext, mysqlCdcSourceProps.gainUniqueKey(), distributeByTableProcessFunction.gainSinkTableToSourceTableMap(), this.accessJobParam.getSinkProps())).name("progress").setParallelism(1).uid(UidGenerateUtil.getUid("dwd", "progress_collect_" + mysqlCdcSourceProps.gainUniqueKey(), 0)).process(distributeByTableProcessFunction).name("distribute");
        for (Map.Entry<String, OutputTag<BasicVO>> entry : gainOutputTagMap.entrySet()) {
            handleSingle(super.getBasicParam().getCids(), entry.getKey(), name.getSideOutput(entry.getValue()));
        }
        for (AccessTableConf accessTableConf : list) {
            Integer rescan = accessTableConf.getRescan();
            if (rescan != null && rescan.intValue() > 0) {
                log.info("source_table_name:{}重放任务开始运行，重置redis开关", accessTableConf.getSourceTableName());
                this.configHandler.resetRescan(accessTableConf.getCid(), accessTableConf.getTaskCode());
            }
        }
    }

    private SingleOutputStreamOperator<BasicVO> getSource(MysqlCdcSourceProps mysqlCdcSourceProps) {
        return FlinkEnvironmentHolder.env.fromSource(CdcSource.getMySqlSource(mysqlCdcSourceProps), WatermarkStrategy.noWatermarks(), "mysql_source_" + mysqlCdcSourceProps.gainUniqueKey()).uid(UidGenerateUtil.getUid("ods", mysqlCdcSourceProps.gainUniqueKey(), UidGenerateUtil.DEFAULT_VERSION)).map(new CdcJsonToBasicVoMapFunction(getBasicParam().isNeedLowerCase())).name("cdcJson_to_basicBO");
    }

    private void handleSingle(List<Long> list, String str, DataStream<BasicVO> dataStream) {
        String[] split = StringUtils.split(str, CommonMark.DOT);
        DorisProperty dorisProperty = new DorisProperty();
        dorisProperty.setIp(this.accessJobParam.getSinkProps().getIp());
        dorisProperty.setPort(this.accessJobParam.getSinkProps().getPort());
        dorisProperty.setHttpPort(this.accessJobParam.getSinkProps().getHttpPort());
        dorisProperty.setUsername(this.accessJobParam.getSinkProps().getUsername());
        dorisProperty.setPassword(this.accessJobParam.getSinkProps().getPassword());
        dorisProperty.setDb(split[0]);
        dorisProperty.setTable(split[1]);
        dataStream.addSink(new DorisSinkFunction(this.etlContext, dorisProperty)).name("doris_" + str).uid(UidGenerateUtil.getUid("dwd", "kafka_sink_doris_" + str, 0));
    }

    @Override // com.worktrans.job.etl.AbstractStreamBaseJob
    protected boolean needKafka() {
        return false;
    }
}
