package org.apache.flink.graph.library;

import java.lang.Comparable;
import java.util.Iterator;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;
import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.types.NullValue;

/* loaded from: input_file:org/apache/flink/graph/library/ConnectedComponents.class */
public class ConnectedComponents<K, VV extends Comparable<VV>, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, VV>>> {
    private Integer maxIterations;

    /* loaded from: input_file:org/apache/flink/graph/library/ConnectedComponents$CCMessenger.class */
    public static final class CCMessenger<K, VV extends Comparable<VV>> extends ScatterFunction<K, VV, VV, NullValue> implements ResultTypeQueryable<VV> {
        private final TypeInformation<VV> typeInformation;

        public CCMessenger(TypeInformation<VV> typeInformation) {
            this.typeInformation = typeInformation;
        }

        @Override // org.apache.flink.graph.spargel.ScatterFunction
        public void sendMessages(Vertex<K, VV> vertex) throws Exception {
            sendMessageToAllNeighbors(vertex.getValue());
        }

        public TypeInformation<VV> getProducedType() {
            return this.typeInformation;
        }
    }

    /* loaded from: input_file:org/apache/flink/graph/library/ConnectedComponents$CCUpdater.class */
    public static final class CCUpdater<K, VV extends Comparable<VV>> extends GatherFunction<K, VV, VV> {
        @Override // org.apache.flink.graph.spargel.GatherFunction
        public void updateVertex(Vertex<K, VV> vertex, MessageIterator<VV> messageIterator) throws Exception {
            VV value = vertex.getValue();
            VV vv = value;
            Iterator<VV> it = messageIterator.iterator();
            while (it.hasNext()) {
                VV next = it.next();
                if (next.compareTo(vv) < 0) {
                    vv = next;
                }
            }
            if (vv.equals(value)) {
                return;
            }
            setNewVertexValue(vv);
        }
    }

    public ConnectedComponents(Integer num) {
        this.maxIterations = num;
    }

    @Override // org.apache.flink.graph.GraphAlgorithm
    public DataSet<Vertex<K, VV>> run(Graph<K, VV, EV> graph) throws Exception {
        return graph.mapEdges(new GraphUtils.MapTo(NullValue.getInstance())).getUndirected().runScatterGatherIteration(new CCMessenger(graph.getVertices().getType().getTypeAt(1)), new CCUpdater(), this.maxIterations.intValue()).getVertices();
    }
}
