package org.apache.flink.state.api;

import java.util.OptionalInt;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.functions.Timestamper;
import org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner;
import org.apache.flink.state.api.output.OperatorSubtaskStateReducer;
import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperator;
import org.apache.flink.state.api.output.partitioner.HashSelector;
import org.apache.flink.state.api.output.partitioner.KeyGroupRangePartitioner;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.util.TernaryBoolean;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/state/api/BootstrapTransformation.class */
public class BootstrapTransformation<T> {
    private final DataSet<T> dataSet;
    private final SavepointWriterOperatorFactory factory;

    @Nullable
    private final KeySelector<T, ?> originalKeySelector;

    @Nullable
    private final HashSelector<T> hashKeySelector;

    @Nullable
    private final TypeInformation<?> keyType;
    private final OptionalInt operatorMaxParallelism;

    @Nullable
    private final Timestamper<T> timestamper;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BootstrapTransformation(DataSet<T> dataSet, OptionalInt optionalInt, @Nullable Timestamper<T> timestamper, SavepointWriterOperatorFactory savepointWriterOperatorFactory) {
        this.dataSet = dataSet;
        this.operatorMaxParallelism = optionalInt;
        this.factory = savepointWriterOperatorFactory;
        this.timestamper = timestamper;
        this.originalKeySelector = null;
        this.hashKeySelector = null;
        this.keyType = null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public <K> BootstrapTransformation(DataSet<T> dataSet, OptionalInt optionalInt, @Nullable Timestamper<T> timestamper, SavepointWriterOperatorFactory savepointWriterOperatorFactory, @Nonnull KeySelector<T, K> keySelector, @Nonnull TypeInformation<K> typeInformation) {
        this.dataSet = dataSet;
        this.operatorMaxParallelism = optionalInt;
        this.factory = savepointWriterOperatorFactory;
        this.timestamper = timestamper;
        this.originalKeySelector = keySelector;
        this.hashKeySelector = new HashSelector<>(keySelector);
        this.keyType = typeInformation;
    }

    int getMaxParallelism(int i) {
        return this.operatorMaxParallelism.orElse(i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataSet<OperatorState> writeOperatorState(OperatorID operatorID, StateBackend stateBackend, Configuration configuration, int i, Path path) {
        int maxParallelism = getMaxParallelism(i);
        return writeOperatorSubtaskStates(operatorID, stateBackend, configuration, path, maxParallelism).reduceGroup(new OperatorSubtaskStateReducer(operatorID, maxParallelism)).name("reduce(OperatorSubtaskState)");
    }

    @VisibleForTesting
    MapPartitionOperator<T, TaggedOperatorSubtaskState> writeOperatorSubtaskStates(OperatorID operatorID, StateBackend stateBackend, Path path, int i) {
        return writeOperatorSubtaskStates(operatorID, stateBackend, new Configuration(), path, i);
    }

    private MapPartitionOperator<T, TaggedOperatorSubtaskState> writeOperatorSubtaskStates(OperatorID operatorID, StateBackend stateBackend, Configuration configuration, Path path, int i) {
        PartitionOperator partitionOperator = this.dataSet;
        if (this.originalKeySelector != null) {
            partitionOperator = this.dataSet.partitionCustom(new KeyGroupRangePartitioner(i), this.hashKeySelector);
        }
        StreamOperator<TaggedOperatorSubtaskState> streamOperator = (StreamOperator) this.dataSet.clean(this.factory.createOperator(System.currentTimeMillis(), path));
        MapPartitionOperator<T, TaggedOperatorSubtaskState> name = partitionOperator.mapPartition(new BoundedOneInputStreamTaskRunner(getConfig(operatorID, stateBackend, configuration, streamOperator), i, this.timestamper)).name(operatorID.toHexString());
        if (streamOperator instanceof BroadcastStateBootstrapOperator) {
            name = (MapPartitionOperator) name.setParallelism(1);
        } else if (getParallelism(name) > i) {
            name.setParallelism(i);
        }
        return name;
    }

    @VisibleForTesting
    StreamConfig getConfig(OperatorID operatorID, StateBackend stateBackend, Configuration configuration, StreamOperator<TaggedOperatorSubtaskState> streamOperator) {
        Configuration configuration2 = new Configuration(this.dataSet.getExecutionEnvironment().getConfiguration());
        configuration2.addAll(configuration);
        StreamConfig streamConfig = new StreamConfig(configuration2);
        streamConfig.setChainStart();
        streamConfig.setCheckpointingEnabled(true);
        streamConfig.setCheckpointMode(CheckpointingMode.EXACTLY_ONCE);
        if (this.keyType != null) {
            streamConfig.setStateKeySerializer(this.keyType.createSerializer(this.dataSet.getExecutionEnvironment().getConfig()));
            streamConfig.setStatePartitioner(0, this.originalKeySelector);
        }
        streamConfig.setStreamOperator(streamOperator);
        streamConfig.setOperatorName(operatorID.toHexString());
        streamConfig.setOperatorID(operatorID);
        streamConfig.setStateBackend(stateBackend);
        streamConfig.setChangelogStateBackendEnabled(TernaryBoolean.FALSE);
        streamConfig.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND, 1.0d);
        return streamConfig;
    }

    private static <T> int getParallelism(MapPartitionOperator<T, TaggedOperatorSubtaskState> mapPartitionOperator) {
        int parallelism = mapPartitionOperator.getParallelism();
        if (parallelism == -1) {
            parallelism = mapPartitionOperator.getExecutionEnvironment().getParallelism();
        }
        return parallelism;
    }
}
