/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.dcr;

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.gridgain.internal.dcr.DcrErrorHandler;
import org.gridgain.internal.dcr.LocalReplicationManager;
import org.gridgain.internal.dcr.ReplicationStartOptions;
import org.gridgain.internal.dcr.ReplicationStatus;
import org.gridgain.internal.dcr.metastorage.DcrStorage;
import org.gridgain.internal.dcr.metastorage.DcrStorageListener;
import org.gridgain.internal.dcr.metastorage.ReplicationEntry;

class DcrStorageListenerImpl
implements DcrStorageListener {
    private static final IgniteLogger LOG = Loggers.forClass(DcrStorageListenerImpl.class);
    private final String nodeName;
    private final LocalReplicationManager dcrLocal;
    private final DcrStorage store;

    public DcrStorageListenerImpl(String nodeName, LocalReplicationManager dcrLocal, DcrStorage store) {
        this.nodeName = nodeName;
        this.dcrLocal = dcrLocal;
        this.store = store;
    }

    @Override
    public CompletableFuture<Void> onRemove(String name, ReplicationEntry entry) {
        this.dcrLocal.stopReplication(name);
        return CompletableFutures.nullCompletedFuture();
    }

    private CompletableFuture<Void> startReplication(String name, ReplicationEntry entry) {
        return this.dcrLocal.startReplication(name, ReplicationStartOptions.fromEntry(entry), event -> {
            switch (event.state()) {
                case FINISHED: {
                    this.store.update(name, entry1 -> entry1.toBuilder().status(ReplicationStatus.STOPPED).build(), true);
                    return;
                }
                case FAILED: {
                    this.processReplicationException(name, event.error());
                    return;
                }
            }
            this.processReplicationException(name, new IllegalStateException("Unrecognized replication event type."));
        });
    }

    @Override
    public CompletableFuture<Void> onAdd(String name, ReplicationEntry entry) {
        if (entry.isRunning() && Objects.equals(entry.workerNode(), this.nodeName)) {
            return this.startReplication(name, entry);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    @Override
    public CompletableFuture<Void> onUpdate(String name, ReplicationEntry oldEntry, ReplicationEntry newEntry) {
        boolean wasRunning = oldEntry.isRunning();
        boolean isRunning = newEntry.isRunning();
        if (!Objects.equals(newEntry.workerNode(), this.nodeName)) {
            return CompletableFutures.nullCompletedFuture();
        }
        if (wasRunning && (!isRunning || newEntry.tableStatuses().containsValue((Object)ReplicationStatus.STOPPED))) {
            List<String> tableToStop = newEntry.tableStatuses().entrySet().stream().filter(entry -> entry.getValue() == ReplicationStatus.STOPPED).map(Map.Entry::getKey).collect(Collectors.toList());
            this.dcrLocal.stopReplication(name, tableToStop);
            return CompletableFutures.nullCompletedFuture();
        }
        Instant flushPoint = newEntry.flushPoint();
        if (flushPoint != null && !flushPoint.equals(oldEntry.flushPoint())) {
            this.dcrLocal.flushReplication(name, flushPoint);
            return CompletableFutures.nullCompletedFuture();
        }
        if (!wasRunning && isRunning || newEntry.tableStatuses().containsValue((Object)ReplicationStatus.REPLICATING) || wasRunning && !Objects.equals(oldEntry.workerNode(), this.nodeName)) {
            return this.startReplication(name, newEntry);
        }
        return CompletableFutures.nullCompletedFuture();
    }

    private void processReplicationException(String name, Throwable throwable) {
        Throwable e = ExceptionUtils.unwrapCause((Throwable)throwable);
        if (e instanceof CancellationException) {
            LOG.info("Replication {} was stopped.", new Object[]{name});
            return;
        }
        DcrErrorHandler.handleFail(this.store, this.nodeName, name, throwable);
    }
}

