/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.table.distributed.replicator.secondary;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.ComponentStoppingException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.TrackerClosedException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.table.QualifiedName;
import org.jetbrains.annotations.Nullable;

abstract class SecondaryStorageSubscriber<T>
implements Flow.Subscriber<T> {
    private final QualifiedName tableName;
    private final int partitionId;
    private final FailureProcessor failureProcessor;
    private final IgniteSpinBusyLock busyLock;
    private volatile Flow.Subscription subscription;
    private final AtomicBoolean isStopped = new AtomicBoolean();

    SecondaryStorageSubscriber(QualifiedName tableName, int partitionId, FailureProcessor failureProcessor, IgniteSpinBusyLock busyLock) {
        this.tableName = tableName;
        this.partitionId = partitionId;
        this.failureProcessor = failureProcessor;
        this.busyLock = busyLock;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.executeWithBusyLock(() -> {
            this.subscription = subscription;
            subscription.request(1L);
        });
    }

    @Override
    public void onError(Throwable throwable) {
        this.executeWithBusyLock(() -> this.handleError(throwable, "Replication error"));
    }

    Flow.Subscription subscription() {
        return this.subscription;
    }

    abstract void stop();

    boolean tryStop() {
        return this.isStopped.compareAndSet(false, true);
    }

    void executeWithBusyLock(Runnable action) {
        if (!this.busyLock.enterBusy()) {
            return;
        }
        try {
            if (!this.isStopped.get()) {
                action.run();
            }
        }
        catch (Throwable e) {
            this.handleError(e, "Replication error");
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    CompletableFuture<?> executeWithBusyLock(Supplier<CompletableFuture<?>> action) {
        if (!this.busyLock.enterBusy()) {
            return CompletableFutures.nullCompletedFuture();
        }
        try {
            CompletableFuture<?> completableFuture = this.isStopped.get() ? CompletableFutures.nullCompletedFuture() : action.get();
            return completableFuture;
        }
        catch (Throwable e) {
            this.handleError(e, "Replication error");
            CompletableFuture completableFuture = CompletableFutures.nullCompletedFuture();
            return completableFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    void handleError(@Nullable Throwable throwable, String baseMessage) {
        if (throwable == null) {
            return;
        }
        this.executeWithBusyLock(() -> {
            boolean shouldNotifyFailureHandler;
            boolean bl = shouldNotifyFailureHandler = !ExceptionUtils.hasCause((Throwable)throwable, (Class[])new Class[]{NodeStoppingException.class, TrackerClosedException.class, ComponentStoppingException.class, TableNotFoundException.class});
            if (shouldNotifyFailureHandler) {
                String message = String.format("%s [tableName=%s, partitionId=%d].", baseMessage, this.tableName, this.partitionId);
                this.failureProcessor.process(new FailureContext(throwable, message));
            }
            this.stop();
        });
    }
}

