/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.dr;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.RetriableTransactionException;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.gridgain.internal.dr.DrUpdate;
import org.gridgain.internal.dr.DrUpdateHandler;
import org.gridgain.internal.dr.DrUtils;
import org.gridgain.internal.dr.common.GridCacheVersion;
import org.jetbrains.annotations.Nullable;

public class DrUpdateHandlerImpl
implements DrUpdateHandler {
    private static final IgniteLogger LOG = Loggers.forClass(DrUpdateHandlerImpl.class);
    private static final long TX_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(30L);
    private static final int MAX_RETRY_ATTEMPTS = Integer.MAX_VALUE;
    private final Ignite client;
    private final KeyValueView<Tuple, Tuple> tombstonesTable;
    private final long tombstoneTtl;

    public DrUpdateHandlerImpl(Ignite client, Duration tombstoneTtl) {
        assert (!tombstoneTtl.isNegative());
        this.client = client;
        this.tombstoneTtl = DrUtils.getOrDefaultTtlInSeconds(tombstoneTtl);
        this.tombstonesTable = DrUtils.initTombstonesTable(client).keyValueView();
    }

    @Override
    public CompletableFuture<Void> applyUpdates(QualifiedName tableName, List<DrUpdate> rowUpdates) {
        return this.client.tables().tableAsync(tableName).thenCompose(table -> table == null ? CompletableFuture.failedFuture(new TableNotFoundException(tableName)) : this.applyUpdates((Table)table, rowUpdates));
    }

    private CompletableFuture<Void> applyUpdates(Table table, List<DrUpdate> rowUpdates) {
        KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(rowUpdates.size());
        for (DrUpdate row : rowUpdates) {
            CompletableFuture<Void> opFuture = new CompletableFuture<Void>();
            futures.add(opFuture);
            try {
                if (row.isTombstone()) {
                    this.remove(table.name(), kvView, row).whenComplete(DrUpdateHandlerImpl.complete(opFuture));
                    continue;
                }
                this.put(table.name(), kvView, row).whenComplete(DrUpdateHandlerImpl.complete(opFuture));
            }
            catch (Throwable err) {
                opFuture.completeExceptionally(err);
            }
        }
        return CompletableFuture.allOf((CompletableFuture[])futures.toArray(CompletableFuture[]::new));
    }

    private CompletableFuture<Void> put(String tableName, KeyValueView<Tuple, Tuple> table, DrUpdate row) {
        Tuple tombstoneKey = DrUtils.createTombstoneKey(tableName, row);
        return this.runInTransactionAsync(tableName, "put", tx -> this.findOutOfOrderRemove((Transaction)tx, tombstoneKey, row.version()).thenCompose(outOfOrderRemoveFound -> outOfOrderRemoveFound != false ? CompletableFutures.nullCompletedFuture() : DrUpdateHandlerImpl.applyUpdate(table, tx, row)));
    }

    private CompletableFuture<Void> remove(String tableName, KeyValueView<Tuple, Tuple> table, DrUpdate newTombstone) {
        Tuple tombstoneKey = DrUtils.createTombstoneKey(tableName, newTombstone);
        return this.runInTransactionAsync(tableName, "remove", tx -> this.putOrRenewTombstone((Transaction)tx, tombstoneKey, newTombstone).thenCompose(tombstoneRenewed -> tombstoneRenewed != false ? DrUpdateHandlerImpl.applyRemove(table, tx, newTombstone.key(), newTombstone.version()).thenApply(ignore -> null) : CompletableFutures.nullCompletedFuture()));
    }

    private CompletableFuture<Boolean> findOutOfOrderRemove(Transaction tx, Tuple tombstoneKey, GridCacheVersion rowVersion) {
        return ((CompletableFuture)this.tombstonesTable.getAsync(tx, tombstoneKey).thenCompose(tombstone -> {
            if (tombstone == null) {
                return CompletableFutures.falseCompletedFuture();
            }
            GridCacheVersion tombstoneVersion = DrUtils.unmarshalVersion((byte[])tombstone.value("version"));
            assert (tombstoneVersion != null) : "tombstone must have a version";
            if (tombstoneVersion.compareTo(rowVersion) > 0) {
                return CompletableFutures.trueCompletedFuture();
            }
            return this.tombstonesTable.removeAsync(tx, tombstoneKey).thenApply(ignore -> false);
        })).whenComplete((res, err) -> {
            if (ExceptionUtils.isOrCausedBy(TableNotFoundException.class, err)) {
                LOG.error("System table for DR tombstones wasn't found: tableName={}", DrUtils.DR_TOMBSTONES_TABLE_NAME.toCanonicalForm());
            }
        });
    }

    private static CompletableFuture<Void> applyUpdate(KeyValueView<Tuple, Tuple> table, Transaction tx, DrUpdate row) {
        assert (!row.isTombstone());
        return table.getAsync(tx, row.key()).thenCompose(value -> value != null && DrUpdateHandlerImpl.keepExistingVersion(DrUpdateHandlerImpl.extractVersion(value), row.version()) ? CompletableFutures.nullCompletedFuture() : table.putAsync(tx, row.key(), row.value()).thenApply(unused -> null));
    }

    private CompletableFuture<Boolean> putOrRenewTombstone(Transaction tx, Tuple tombstoneKey, DrUpdate newTombstone) {
        assert (newTombstone.isTombstone());
        return ((CompletableFuture)this.tombstonesTable.getAsync(tx, tombstoneKey).thenCompose(oldTombstone -> {
            if (oldTombstone != null && DrUpdateHandlerImpl.keepExistingVersion(DrUpdateHandlerImpl.extractVersion(oldTombstone), newTombstone.version())) {
                return CompletableFutures.falseCompletedFuture();
            }
            Tuple tombstoneValue = DrUtils.createTombstoneValue(newTombstone.version(), this.tombstoneTtl);
            return this.tombstonesTable.putAsync(tx, tombstoneKey, tombstoneValue).thenApply(ignore -> true);
        })).whenComplete((res, err) -> {
            if (ExceptionUtils.isOrCausedBy(TableNotFoundException.class, err)) {
                LOG.error("System table for DR tombstones wasn't found: tableName={}", DrUtils.DR_TOMBSTONES_TABLE_NAME.toCanonicalForm());
            }
        });
    }

    private static CompletableFuture<Boolean> applyRemove(KeyValueView<Tuple, Tuple> table, Transaction tx, Tuple key, GridCacheVersion version) {
        return table.getAsync(tx, key).thenCompose(oldRow -> {
            if (oldRow == null) {
                return CompletableFutures.trueCompletedFuture();
            }
            if (DrUpdateHandlerImpl.keepExistingVersion(DrUpdateHandlerImpl.extractVersion(oldRow), version)) {
                return CompletableFutures.falseCompletedFuture();
            }
            return table.removeAsync(tx, key).thenApply(ignore -> true);
        });
    }

    @Nullable
    private static GridCacheVersion extractVersion(Tuple row) {
        byte[] version = (byte[])row.value("DrVersion");
        return DrUtils.unmarshalVersion(version);
    }

    private static <T> BiConsumer<T, Throwable> complete(CompletableFuture<Void> opFut) {
        return (res, err) -> {
            if (err == null) {
                opFut.complete(null);
            } else {
                opFut.completeExceptionally((Throwable)err);
            }
        };
    }

    private static boolean keepExistingVersion(@Nullable GridCacheVersion existingVersion, GridCacheVersion newVersion) {
        return existingVersion == null || existingVersion.dataCenterId() == newVersion.dataCenterId() && existingVersion.compareTo(newVersion) > 0;
    }

    private CompletableFuture<Void> runInTransactionAsync(String tableName, String operation, Function<Transaction, CompletableFuture<Void>> action) {
        TransactionOptions txOptsWithTimeout = new TransactionOptions().timeoutMillis(TX_TIMEOUT_MILLIS);
        TxRetryContext ctx = new TxRetryContext(action, txOptsWithTimeout, tableName, operation);
        return this.runInTransactionAsyncRec(ctx, 0);
    }

    private CompletableFuture<Void> runInTransactionAsyncRec(TxRetryContext ctx, int attempt) {
        return ((CompletableFuture)this.client.transactions().runInTransactionAsync(ctx.txAction(), ctx.txOptsWithTimeout()).handle((v, e) -> {
            if (e == null) {
                return CompletableFuture.completedFuture(null);
            }
            if (!DrUpdateHandlerImpl.isRetriable(e) || attempt >= Integer.MAX_VALUE) {
                return CompletableFuture.failedFuture(e);
            }
            LOG.debug("Retrying transaction [tableName={}, opName={}, attempt={}].", (Throwable)e, (Object)ctx.tableName(), (Object)ctx.opName(), (Object)attempt);
            return this.runInTransactionAsyncRec(ctx, attempt + 1);
        })).thenCompose(Function.identity());
    }

    private static boolean isRetriable(Throwable e) {
        return ExceptionUtils.hasCause(e, TimeoutException.class, RetriableTransactionException.class);
    }

    private static final class TxRetryContext {
        private final Function<Transaction, CompletableFuture<Void>> txAction;
        private final TransactionOptions txOptsWithTimeout;
        private final String tableName;
        private final String opName;

        private TxRetryContext(Function<Transaction, CompletableFuture<Void>> txAction, TransactionOptions txOptsWithTimeout, String tableName, String opName) {
            this.txAction = txAction;
            this.txOptsWithTimeout = txOptsWithTimeout;
            this.tableName = tableName;
            this.opName = opName;
        }

        Function<Transaction, CompletableFuture<Void>> txAction() {
            return this.txAction;
        }

        TransactionOptions txOptsWithTimeout() {
            return this.txOptsWithTimeout;
        }

        String tableName() {
            return this.tableName;
        }

        String opName() {
            return this.opName;
        }
    }
}

