package org.apache.doris.flink.table;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.shaded.org.apache.commons.codec.binary.Base64;
import org.apache.doris.shaded.org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/table/DorisStreamLoad.class */
public class DorisStreamLoad implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList(Arrays.asList("Success", "Publish Timeout"));
    private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
    private String user;
    private String passwd;
    private String loadUrlStr;
    private String hostPort;
    private String db;
    private String tbl;
    private String authEncoding;
    private Properties streamLoadProp;
    private final HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() { // from class: org.apache.doris.flink.table.DorisStreamLoad.1
        @Override // org.apache.http.impl.client.DefaultRedirectStrategy
        protected boolean isRedirectable(String str) {
            return true;
        }
    });
    private CloseableHttpClient httpClient = this.httpClientBuilder.build();

    /* loaded from: input_file:org/apache/doris/flink/table/DorisStreamLoad$LoadResponse.class */
    public static class LoadResponse {
        public int status;
        public String respMsg;
        public String respContent;

        public LoadResponse(int i, String str, String str2) {
            this.status = i;
            this.respMsg = str;
            this.respContent = str2;
        }

        public String toString() {
            try {
                return DorisStreamLoad.OBJECT_MAPPER.writeValueAsString(this);
            } catch (JsonProcessingException e) {
                return "";
            }
        }
    }

    public DorisStreamLoad(String str, String str2, String str3, String str4, String str5, Properties properties) {
        this.hostPort = str;
        this.db = str2;
        this.tbl = str3;
        this.user = str4;
        this.passwd = str5;
        this.loadUrlStr = String.format(loadUrlPattern, str, str2, str3);
        this.authEncoding = basicAuthHeader(str4, str5);
        this.streamLoadProp = properties;
    }

    public String getLoadUrlStr() {
        return this.loadUrlStr;
    }

    public void setHostPort(String str) {
        this.hostPort = str;
        this.loadUrlStr = String.format(loadUrlPattern, str, this.db, this.tbl);
    }

    public void load(String str) throws StreamLoadException {
        LoadResponse loadBatch = loadBatch(str);
        LOG.info("Streamload Response:{}", loadBatch);
        if (loadBatch.status != 200) {
            throw new StreamLoadException("stream load error: " + loadBatch.respContent);
        }
        try {
            RespContent respContent = (RespContent) OBJECT_MAPPER.readValue(loadBatch.respContent, RespContent.class);
            if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
            } else {
                throw new StreamLoadException(String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL()));
            }
        } catch (IOException e) {
            throw new StreamLoadException(e);
        }
    }

    private LoadResponse loadBatch(String str) {
        String property = this.streamLoadProp.getProperty("label");
        if (StringUtils.isBlank(property)) {
            property = String.format("flink_connector_%s_%s", new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date()), UUID.randomUUID().toString().replaceAll("-", ""));
        }
        try {
            HttpPut httpPut = new HttpPut(this.loadUrlStr);
            httpPut.setHeader("Expect", HTTP.EXPECT_CONTINUE);
            httpPut.setHeader("Authorization", this.authEncoding);
            httpPut.setHeader("label", property);
            for (Map.Entry entry : this.streamLoadProp.entrySet()) {
                httpPut.setHeader(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
            }
            httpPut.setEntity(new StringEntity(str, "UTF-8"));
            CloseableHttpResponse execute = this.httpClient.execute((HttpUriRequest) httpPut);
            Throwable th = null;
            try {
                try {
                    LoadResponse loadResponse = new LoadResponse(execute.getStatusLine().getStatusCode(), execute.getStatusLine().getReasonPhrase(), execute.getEntity() != null ? EntityUtils.toString(execute.getEntity()) : "");
                    if (execute != null) {
                        if (0 != 0) {
                            try {
                                execute.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            execute.close();
                        }
                    }
                    return loadResponse;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            String str2 = "failed to stream load data with label: " + property;
            LOG.warn(str2, e);
            return new LoadResponse(-1, e.getMessage(), str2);
        }
    }

    private String basicAuthHeader(String str, String str2) {
        return "Basic " + new String(Base64.encodeBase64((str + TMultiplexedProtocol.SEPARATOR + str2).getBytes(StandardCharsets.UTF_8)));
    }

    public void close() throws IOException {
        if (null != this.httpClient) {
            try {
                this.httpClient.close();
            } catch (IOException e) {
                LOG.error("Closing httpClient failed.", e);
                throw new RuntimeException("Closing httpClient failed.", e);
            }
        }
    }
}
