package org.apache.ignite.tensorflow.cluster;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessState;
import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus;

/* loaded from: input_file:org/apache/ignite/tensorflow/cluster/TensorFlowClusterMaintainer.class */
public class TensorFlowClusterMaintainer implements Service {
    private static final long serialVersionUID = -3220563310643566419L;

    @IgniteInstanceResource
    private transient Ignite ignite;

    @LoggerResource
    private transient IgniteLogger log;
    private final UUID clusterId;
    private final TensorFlowJobArchive jobArchive;
    private final String topicName;
    private transient TensorFlowClusterManager clusterMgr;
    private transient UUID[] prev;
    static final /* synthetic */ boolean $assertionsDisabled;

    public TensorFlowClusterMaintainer(UUID uuid, TensorFlowJobArchive tensorFlowJobArchive, String str) {
        if (!$assertionsDisabled && uuid == null) {
            throw new AssertionError("Cluster identifier should not be null");
        }
        if (!$assertionsDisabled && tensorFlowJobArchive == null) {
            throw new AssertionError("Job archive should not be null");
        }
        if (!$assertionsDisabled && str == null) {
            throw new AssertionError("Topic name should not be null");
        }
        this.clusterId = uuid;
        this.jobArchive = tensorFlowJobArchive;
        this.topicName = str;
    }

    public void cancel(ServiceContext serviceContext) {
        this.clusterMgr.stopClusterIfExists(this.clusterId);
        this.log.debug("Cluster maintainer canceled [clusterId=" + this.clusterId + "]");
    }

    public void init(ServiceContext serviceContext) {
        this.clusterMgr = new TensorFlowClusterManager(this.ignite);
        this.log.debug("Cluster maintainer initialized [clusterId=" + this.clusterId + "]");
    }

    public void execute(ServiceContext serviceContext) {
        while (!serviceContext.isCancelled() && !hasUserScriptCompletedSuccessfully()) {
            LockSupport.parkNanos(1000000L);
            if (hasAffinityChanged() || hasAnyWorkerFailed() || hasChiefFailed() || hasUserScriptFailed()) {
                this.log.debug("Cluster will be restarted [clusterId=" + this.clusterId + "]");
                restartCluster();
            }
        }
        stopCluster(true);
        this.log.debug("Cluster maintainer completed [clusterId=" + this.clusterId + "]");
    }

    private void restartCluster() {
        stopCluster(false);
        startCluster();
    }

    private void stopCluster(boolean z) {
        this.clusterMgr.stopClusterIfExists(this.clusterId);
        if (z) {
            this.ignite.message().send(this.topicName, Optional.empty());
        }
    }

    private void startCluster() {
        this.ignite.message().send(this.topicName, Optional.of(this.clusterMgr.createCluster(this.clusterId, this.jobArchive, str -> {
            this.ignite.message().sendOrdered("us_out_" + this.clusterId, str, 60000L);
        }, str2 -> {
            this.ignite.message().sendOrdered("us_err_" + this.clusterId, str2, 60000L);
        })));
    }

    private boolean hasAffinityChanged() {
        Affinity affinity = this.ignite.affinity(this.jobArchive.getUpstreamCacheName());
        int partitions = affinity.partitions();
        UUID[] uuidArr = new UUID[partitions];
        for (int i = 0; i < partitions; i++) {
            uuidArr[i] = affinity.mapPartitionToNode(i).id();
        }
        if (this.prev != null && Arrays.equals(uuidArr, this.prev)) {
            return false;
        }
        this.prev = uuidArr;
        return true;
    }

    private boolean hasAnyWorkerFailed() {
        try {
            Map<UUID, List<LongRunningProcessStatus>> ping = this.clusterMgr.getSrvProcMgr().ping(this.clusterMgr.getCluster(this.clusterId).getProcesses());
            Iterator<UUID> it = ping.keySet().iterator();
            while (it.hasNext()) {
                Iterator<LongRunningProcessStatus> it2 = ping.get(it.next()).iterator();
                while (it2.hasNext()) {
                    if (it2.next().getState().equals(LongRunningProcessState.DONE)) {
                        return true;
                    }
                }
            }
            return false;
        } catch (Exception e) {
            this.log.error("Failed to check process statuses", e);
            return true;
        }
    }

    private boolean hasChiefFailed() {
        return this.clusterMgr.getChiefException(this.clusterId) != null;
    }

    private boolean hasUserScriptFailed() {
        return this.clusterMgr.getUserScriptException(this.clusterId) != null;
    }

    private boolean hasUserScriptCompletedSuccessfully() {
        return this.clusterMgr.isUserScriptCompleted(this.clusterId) && this.clusterMgr.getUserScriptException(this.clusterId) == null;
    }

    static {
        $assertionsDisabled = !TensorFlowClusterMaintainer.class.desiredAssertionStatus();
    }
}
