package com.worktrans.datacenter.datalink.script;

import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.worktrans.commons.ex.BizException;
import com.worktrans.commons.lang.Argument;
import com.worktrans.datacenter.datalink.domain.cons.CommonColumn;
import com.worktrans.datacenter.datalink.domain.cons.CommonField;
import com.worktrans.datacenter.datalink.domain.cons.CommonMark;
import com.worktrans.datacenter.datalink.domain.dto.DataBaseDTO;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Properties;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.table.DorisStreamLoad;

/* loaded from: input_file:com/worktrans/datacenter/datalink/script/StreamLoadScript.class */
public class StreamLoadScript implements CommonScript {
    private String db;
    private String table;
    private DataBaseDTO database;
    private String rows;

    @Override // com.worktrans.datacenter.datalink.script.CommonScript
    public void prepare(JsonNode jsonNode) {
        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode jsonNode2 = jsonNode.get(CommonColumn.DB);
        if (Argument.isNull(jsonNode2)) {
            throw new BizException("StreamLoad脚本:DB不能为空");
        }
        this.db = jsonNode2.asText();
        JsonNode jsonNode3 = jsonNode.get(CommonColumn.TABLE);
        if (Argument.isNull(jsonNode3)) {
            throw new BizException("StreamLoad脚本:TABLE不能为空");
        }
        this.table = jsonNode3.asText();
        JsonNode jsonNode4 = jsonNode.get(CommonColumn.IP);
        if (Argument.isNull(jsonNode4)) {
            throw new BizException("StreamLoad脚本:IP不能为空");
        }
        JsonNode jsonNode5 = jsonNode.get(CommonColumn.PORT);
        if (Argument.isNull(jsonNode5)) {
            throw new BizException("StreamLoad脚本:PORT不能为空");
        }
        JsonNode jsonNode6 = jsonNode.get(CommonColumn.USERNAME);
        if (Argument.isNull(jsonNode6)) {
            throw new BizException("StreamLoad脚本:USERNAME不能为空");
        }
        JsonNode jsonNode7 = jsonNode.get(CommonColumn.PASSWORD);
        if (Argument.isNull(jsonNode7)) {
            throw new BizException("StreamLoad脚本:PASSWORD不能为空");
        }
        this.database = new DataBaseDTO();
        this.database.setIp(jsonNode4.asText());
        this.database.setPort(Integer.valueOf(jsonNode5.asInt()));
        this.database.setUsername(jsonNode6.asText());
        this.database.setPassword(jsonNode7.asText());
        JsonNode jsonNode8 = jsonNode.get(CommonColumn.RESULT);
        if (Argument.isNull(jsonNode8)) {
            throw new BizException("StreamLoad脚本:获取DB-SQL结果失败");
        }
        JsonNode jsonNode9 = jsonNode8.get(CommonColumn.ROW_DATA);
        if (Argument.isNull(jsonNode9)) {
            throw new BizException("StreamLoad脚本:没有获取到待写入的数据,退出脚本执行");
        }
        List list = (List) objectMapper.convertValue(jsonNode9, List.class);
        list.forEach(map -> {
            map.put(CommonField.CAL_COUNT, 1);
            map.put(CommonField.GMT_MODIFIED, LocalDateTime.now());
            map.put("data_load_timestamp", Long.valueOf(System.currentTimeMillis()));
        });
        this.rows = JSON.toJSONString(list);
    }

    @Override // com.worktrans.datacenter.datalink.script.CommonScript
    public void execute() throws Exception {
        streamLoad(this.rows);
    }

    public void streamLoad(String str) throws StreamLoadException {
        Properties properties = new Properties();
        properties.put("format", "json");
        properties.put("strip_outer_array", "true");
        properties.put("function_column.sequence_col", CommonField.GMT_MODIFIED);
        new DorisStreamLoad(this.database.getIp() + CommonMark.COLON + this.database.getPort(), this.db, this.table, this.database.getUsername(), this.database.getPassword(), properties).load(str);
    }
}
