package com.worktrans.job.operator;

import com.alibaba.druid.sql.ast.statement.SQLAlterTableAddColumn;
import com.alibaba.druid.sql.ast.statement.SQLAlterTableItem;
import com.alibaba.druid.sql.ast.statement.SQLAlterTableStatement;
import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlAlterTableChangeColumn;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlAlterTableModifyColumn;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.core.JsonFactory;
import com.worktrans.datacenter.datalink.domain.cons.CommonMark;
import com.worktrans.datacenter.datalink.domain.vo.AccessTableConf;
import com.worktrans.datacenter.datalink.domain.vo.Column;
import com.worktrans.datacenter.datalink.domain.vo.DorisConfigVO;
import com.worktrans.datacenter.datalink.domain.vo.SchemaChangeEvent;
import com.worktrans.db.DorisTableStructServer;
import com.worktrans.db.MysqlTableStructServer;
import com.worktrans.exception.BizException;
import com.worktrans.job.etl.EtlContext;
import com.worktrans.job.vo.BasicVO;
import com.worktrans.kafka.KafkaUtil;
import com.worktrans.util.DateUtil;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/worktrans/job/operator/SchemaAndDataTypeProcessFunction.class */
public class SchemaAndDataTypeProcessFunction extends ProcessFunction<BasicVO, BasicVO> {
    private boolean isNeedLowerCase;
    private final DorisConfigVO dorisConfigVO;
    private final List<AccessTableConf> accessTableConfs;
    private final MysqlTableStructServer mysqlTableStructServer;
    private final String bootstrapServers;
    private Map<String, List<AccessTableConf>> tableNameMap;
    private EtlContext etlContext;
    private static final Logger log = LoggerFactory.getLogger(SchemaAndDataTypeProcessFunction.class);
    private static final List<String> SPECIAL_TYLE = Arrays.asList(JsonFactory.FORMAT_NAME_JSON, "TEXT", "TINYTEXT", "MEDIUMTEXT", "LONGTEXT", "json", "text", "tinytext", "mediumtext", "longtext");
    private static final ZoneId ZONE_ID = ZoneId.of("GMT");

    public SchemaAndDataTypeProcessFunction(EtlContext etlContext, List<AccessTableConf> list, DorisConfigVO dorisConfigVO, MysqlTableStructServer mysqlTableStructServer, String str, boolean z) {
        this.etlContext = etlContext;
        this.dorisConfigVO = dorisConfigVO;
        this.accessTableConfs = list;
        this.mysqlTableStructServer = mysqlTableStructServer;
        this.bootstrapServers = str;
        this.isNeedLowerCase = z;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.tableNameMap = new HashMap();
        for (AccessTableConf accessTableConf : this.accessTableConfs) {
            this.tableNameMap.computeIfAbsent(accessTableConf.getSourceTableName(), str -> {
                return new ArrayList();
            }).add(accessTableConf);
        }
    }

    public void processElement(BasicVO basicVO, ProcessFunction<BasicVO, BasicVO>.Context context, Collector<BasicVO> collector) throws Exception {
        String fullTableName = basicVO.getFullTableName();
        List<AccessTableConf> list = this.tableNameMap.get(fullTableName);
        if (CollectionUtils.isEmpty(list)) {
            collector.collect(basicVO);
            return;
        }
        SchemaChangeEvent historyRecord = basicVO.getHistoryRecord();
        if (historyRecord == null) {
            dataTypeConert(basicVO, collector);
            return;
        }
        try {
            changeSinkTableSchema(fullTableName, list, historyRecord);
        } catch (Exception e) {
            if (StringUtils.isNotBlank(this.bootstrapServers)) {
                KafkaUtil.recordError(this.bootstrapServers, basicVO, "schema_change");
            }
            log.error("ddl同步失败！" + historyRecord.getDdl(), e);
        }
    }

    private void dataTypeConert(BasicVO basicVO, Collector<BasicVO> collector) throws Exception {
        Map<String, Column> gainConvertColumnMap = this.mysqlTableStructServer.gainConvertColumnMap(basicVO.getFullTableName(), this.isNeedLowerCase);
        dataTypeConvert4Map(basicVO.getBefore(), gainConvertColumnMap);
        dataTypeConvert4Map(basicVO.getAfter(), gainConvertColumnMap);
        collector.collect(basicVO);
    }

    private void dataTypeConvert4Map(Map<String, Object> map, Map<String, Column> map2) {
        if (Objects.nonNull(map)) {
            for (Map.Entry<String, Column> entry : map2.entrySet()) {
                String key = entry.getKey();
                String dataType = entry.getValue().getDataType();
                Object obj = map.get(key);
                if (obj != null) {
                    Object obj2 = null;
                    if ("DATETIME".equalsIgnoreCase(dataType)) {
                        obj2 = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Long) obj).longValue()), ZONE_ID);
                    } else if ("TIMESTAMP".equalsIgnoreCase(dataType)) {
                        obj2 = LocalDateTime.parse(obj.toString().substring(0, 19), DateUtil.dateTimeFormatter);
                    } else if ("DATE".equalsIgnoreCase(dataType)) {
                        obj2 = LocalDate.ofEpochDay(((Integer) obj).longValue());
                    } else if ("TIME".equalsIgnoreCase(dataType)) {
                        obj2 = getLocalTime(Long.valueOf(obj.toString()));
                    } else if ("DECIMAL".equalsIgnoreCase(dataType)) {
                        obj2 = new BigDecimal(obj.toString());
                    }
                    map.put(key, obj2);
                }
            }
        }
    }

    private void changeSinkTableSchema(String str, List<AccessTableConf> list, SchemaChangeEvent schemaChangeEvent) throws Exception {
        this.mysqlTableStructServer.refreshTableStruct(str, this.isNeedLowerCase);
        String replace = StringUtils.replace(schemaChangeEvent.getDdl(), "\\r\\n", CommonMark.BLANK);
        try {
            SQLAlterTableStatement parseAlter = new MySqlStatementParser(replace).parseAlter();
            if (parseAlter instanceof SQLAlterTableStatement) {
                DorisTableStructServer dorisTableStructServer = new DorisTableStructServer(this.etlContext.getDbSourceFactory(), this.dorisConfigVO);
                List<SQLAlterTableItem> items = parseAlter.getItems();
                if (CollectionUtils.isEmpty(items)) {
                    log.error("解析表结构变动消息失败，items为空，ddl:" + replace);
                    throw new BizException("解析表结构变动消息失败，items为空，ddl:" + replace);
                }
                for (SQLAlterTableItem sQLAlterTableItem : items) {
                    if (sQLAlterTableItem instanceof SQLAlterTableAddColumn) {
                        handleAddCol(list, dorisTableStructServer, (SQLAlterTableAddColumn) sQLAlterTableItem);
                    } else if (sQLAlterTableItem instanceof MySqlAlterTableModifyColumn) {
                        handleModifyCol(list, dorisTableStructServer, (MySqlAlterTableModifyColumn) sQLAlterTableItem);
                    } else {
                        if (!(sQLAlterTableItem instanceof MySqlAlterTableChangeColumn)) {
                            log.error("不支持自动同步表结构的操作类型：" + sQLAlterTableItem.toString());
                            throw new BizException("不支持自动同步表结构的操作类型：" + sQLAlterTableItem);
                        }
                        handleChangeCol(list, dorisTableStructServer, (MySqlAlterTableChangeColumn) sQLAlterTableItem);
                    }
                }
            }
        } catch (Exception e) {
            log.error("解析表结构变动消息失败，ddl:" + replace, e);
            throw new BizException("解析表结构变动消息失败，ddl:" + replace);
        }
    }

    private void handleAddCol(List<AccessTableConf> list, DorisTableStructServer dorisTableStructServer, SQLAlterTableAddColumn sQLAlterTableAddColumn) {
        for (SQLColumnDefinition sQLColumnDefinition : sQLAlterTableAddColumn.getColumns()) {
            String columnName = sQLColumnDefinition.getColumnName();
            String transformedFieldType = getTransformedFieldType(sQLColumnDefinition);
            for (AccessTableConf accessTableConf : list) {
                if (!dorisTableStructServer.judgeColumnExists(accessTableConf.getSinkTableName(), columnName)) {
                    dorisTableStructServer.executeDDLAndMonitor(accessTableConf.getSinkTableName(), "ALTER TABLE " + accessTableConf.getSinkTableName() + " ADD COLUMN " + columnName + CommonMark.BLANK + transformedFieldType);
                }
            }
        }
    }

    private String getTransformedFieldType(SQLColumnDefinition sQLColumnDefinition) {
        String obj = sQLColumnDefinition.getDataType().toString();
        Column column = new Column();
        if (obj.contains(CommonMark.LEFT_BRACKET)) {
            column.setDataType(obj.substring(0, obj.indexOf(CommonMark.LEFT_BRACKET)));
        } else {
            column.setDataType(obj);
        }
        column.setColumnType(obj);
        return transformFieldType(column);
    }

    private void handleModifyCol(List<AccessTableConf> list, DorisTableStructServer dorisTableStructServer, MySqlAlterTableModifyColumn mySqlAlterTableModifyColumn) {
        SQLColumnDefinition newColumnDefinition = mySqlAlterTableModifyColumn.getNewColumnDefinition();
        String transformedFieldType = getTransformedFieldType(newColumnDefinition);
        for (AccessTableConf accessTableConf : list) {
            dorisTableStructServer.executeDDLAndMonitor(accessTableConf.getSinkTableName(), "ALTER TABLE " + accessTableConf.getSinkTableName() + " MODIFY COLUMN " + newColumnDefinition.getColumnName() + CommonMark.BLANK + transformedFieldType);
        }
    }

    private void handleChangeCol(List<AccessTableConf> list, DorisTableStructServer dorisTableStructServer, MySqlAlterTableChangeColumn mySqlAlterTableChangeColumn) {
        SQLColumnDefinition newColumnDefinition = mySqlAlterTableChangeColumn.getNewColumnDefinition();
        String transformedFieldType = getTransformedFieldType(newColumnDefinition);
        for (AccessTableConf accessTableConf : list) {
            dorisTableStructServer.executeDDLAndMonitor(accessTableConf.getSinkTableName(), "ALTER TABLE " + accessTableConf.getSinkTableName() + " MODIFY COLUMN " + newColumnDefinition.getColumnName() + CommonMark.BLANK + transformedFieldType);
        }
    }

    private LocalDateTime getLocalTime(Long l) {
        LocalTime ofNanoOfDay = LocalTime.ofNanoOfDay(l.longValue());
        return LocalDateTime.of(1, 1, 1, ofNanoOfDay.getHour(), ofNanoOfDay.getMinute(), ofNanoOfDay.getSecond());
    }

    public void close() throws Exception {
        super.close();
        this.etlContext.close();
    }

    private String transformFieldType(Column column) {
        String dataType = column.getDataType();
        String columnType = column.getColumnType();
        if (SPECIAL_TYLE.contains(dataType.toUpperCase())) {
            return "string";
        }
        if ("TIMESTAMP".equalsIgnoreCase(dataType)) {
            return "bigint(20)";
        }
        if ("VARCHAR".equalsIgnoreCase(dataType)) {
            return "VARCHAR(" + (Integer.valueOf(columnType.substring(columnType.indexOf(CommonMark.LEFT_BRACKET) + 1, columnType.lastIndexOf(")"))).intValue() * 4) + ")";
        }
        if (!"DECIMAL".equalsIgnoreCase(dataType)) {
            return "datetime".equalsIgnoreCase(dataType) ? "datetime" : columnType;
        }
        Integer valueOf = Integer.valueOf(columnType.substring(columnType.indexOf(CommonMark.LEFT_BRACKET) + 1, columnType.lastIndexOf(CommonMark.COMMA)).trim());
        Integer valueOf2 = Integer.valueOf(columnType.substring(columnType.indexOf(CommonMark.COMMA) + 1, columnType.lastIndexOf(")")).trim());
        return "DECIMAL(" + (valueOf.intValue() > 27 ? 27 : valueOf.intValue()) + CommonMark.COMMA + (valueOf2.intValue() > 9 ? 9 : valueOf2.intValue()) + ")";
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
        processElement((BasicVO) obj, (ProcessFunction<BasicVO, BasicVO>.Context) context, (Collector<BasicVO>) collector);
    }
}
