/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.partition.replicator.handlers;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.message.TxCleanupRecoveryRequest;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateStorageClosedException;
import org.apache.ignite.internal.tx.storage.state.TxStateStorageDestroyedException;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.ExceptionUtils;

public class TxCleanupRecoveryRequestHandler {
    private static final IgniteLogger LOG = Loggers.forClass(TxCleanupRecoveryRequestHandler.class);
    private static final int THROTTLE_BATCH_SIZE = 1000;
    private final TxStatePartitionStorage txStatePartitionStorage;
    private final TxManager txManager;
    private final FailureProcessor failureProcessor;
    private final ZonePartitionId replicationGroupId;

    public TxCleanupRecoveryRequestHandler(TxStatePartitionStorage txStatePartitionStorage, TxManager txManager, FailureProcessor failureProcessor, ZonePartitionId replicationGroupId) {
        this.txStatePartitionStorage = txStatePartitionStorage;
        this.txManager = txManager;
        this.failureProcessor = failureProcessor;
        this.replicationGroupId = replicationGroupId;
    }

    public CompletableFuture<Void> handle(TxCleanupRecoveryRequest request) {
        this.runPersistentStorageScan();
        return CompletableFutures.nullCompletedFuture();
    }

    private void runPersistentStorageScan() {
        ArrayList<IgniteBiTuple> tasks;
        int abortedCount;
        int committedCount;
        block14: {
            committedCount = 0;
            abortedCount = 0;
            tasks = new ArrayList<IgniteBiTuple>();
            try (Cursor txs = this.txStatePartitionStorage.scan();){
                for (IgniteBiTuple tx : txs) {
                    UUID txId = (UUID)tx.getKey();
                    TxMeta txMeta = (TxMeta)tx.getValue();
                    assert (!txMeta.enlistedPartitions().isEmpty());
                    assert (TxState.isFinalState((TxState)txMeta.txState())) : "Unexpected state [txId=" + txId + ", state=" + txMeta.txState() + "].";
                    if (txMeta.txState() == TxState.COMMITTED) {
                        ++committedCount;
                    } else {
                        ++abortedCount;
                    }
                    tasks.add(tx);
                }
            }
            catch (IgniteInternalException e) {
                if (ExceptionUtils.hasCause((Throwable)e, (Class[])new Class[]{TxStateStorageClosedException.class, TxStateStorageDestroyedException.class})) break block14;
                String errorMessage = String.format("Failed to scan transaction state storage [commitPartition=%s].", this.replicationGroupId);
                this.failureProcessor.process(new FailureContext((Throwable)e, errorMessage));
            }
        }
        LOG.debug("Persistent storage scan finished [committed={}, aborted={}].", new Object[]{committedCount, abortedCount});
        if (!tasks.isEmpty()) {
            this.throttledCleanup(new CopyOnWriteArrayList<IgniteBiTuple<UUID, TxMeta>>(tasks));
        }
    }

    private void throttledCleanup(List<IgniteBiTuple<UUID, TxMeta>> tasks) {
        block3: {
            if (tasks.isEmpty()) {
                return;
            }
            List<IgniteBiTuple<UUID, TxMeta>> batch = tasks.subList(0, Math.min(tasks.size(), 1000));
            ArrayList<IgniteBiTuple<UUID, TxMeta>> toCleanup = new ArrayList<IgniteBiTuple<UUID, TxMeta>>(batch);
            batch.clear();
            try {
                this.callCleanup(toCleanup).whenComplete((r, e) -> this.throttledCleanup(tasks));
            }
            catch (IgniteInternalException e2) {
                if (ExceptionUtils.hasCause((Throwable)e2, (Class[])new Class[]{TxStateStorageClosedException.class, TxStateStorageDestroyedException.class})) break block3;
                String errorMessage = String.format("Failed to cleanup transaction states [commitPartition=%s].", this.replicationGroupId);
                this.failureProcessor.process(new FailureContext((Throwable)e2, errorMessage));
            }
        }
    }

    private CompletableFuture<?> callCleanup(List<IgniteBiTuple<UUID, TxMeta>> tasks) {
        CompletableFuture[] array = (CompletableFuture[])tasks.stream().map(task -> this.callCleanup((TxMeta)task.getValue(), (UUID)task.getKey())).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(array);
    }

    private CompletableFuture<?> callCleanup(TxMeta txMeta, UUID txId) {
        return this.txManager.cleanup(this.replicationGroupId, txMeta.enlistedPartitions(), txMeta.txState() == TxState.COMMITTED, txMeta.commitTimestamp(), txId).exceptionally(throwable -> {
            LOG.warn("Failed to cleanup transaction [txId={}].", throwable, new Object[]{txId});
            return null;
        });
    }
}

