/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.table.distributed.raft;

import java.io.Serializable;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.partition.replicator.network.command.SecondarySafeTimeSyncCommand;
import org.apache.ignite.internal.partition.replicator.network.command.TransferRowVersionsToSecondaryStorageCommand;
import org.apache.ignite.internal.partition.replicator.network.command.UpdateAllSecondaryStorageCommand;
import org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
import org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowWithTombstoneMessage;
import org.apache.ignite.internal.partition.replicator.raft.CommandResult;
import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowImpl;
import org.apache.ignite.internal.secondarystoragebridge.SecondaryStorageBridge;
import org.apache.ignite.internal.secondarystoragebridge.UpdatesStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.TimedBinaryRowAndRowId;
import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.storage.secondary.BinaryRowAndRowId;
import org.apache.ignite.internal.storage.secondary.SecondaryStorage;
import org.apache.ignite.internal.tx.UpdateCommandResult;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class SecondaryPartitionListener
implements RaftGroupListener,
RaftTableProcessor {
    private static final IgniteLogger LOG = Loggers.forClass(SecondaryPartitionListener.class);
    private static final int BATCH_WRITE_SIZE = 16;
    private final Set<String> currentGroupTopology = new HashSet<String>();
    private final UpdatesStorage updatesStorage;
    private final SecondaryStorage secondaryStorage;
    private final int tableId;
    private final int partitionId;
    private final PendingComparableValuesTracker<HybridTimestamp, Void> safeTime;
    private final RaftGroupConfigurationConverter raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

    public SecondaryPartitionListener(SecondaryStorage secondaryStorage, SecondaryStorageBridge secondaryStorageBridge, int tableId, int partitionId, PendingComparableValuesTracker<HybridTimestamp, Void> safeTime) {
        this.secondaryStorage = secondaryStorage;
        this.updatesStorage = secondaryStorageBridge.getOrCreateUpdatesStorage(tableId, partitionId);
        this.tableId = tableId;
        this.partitionId = partitionId;
        this.safeTime = safeTime;
        RaftGroupConfiguration committedGroupConfiguration = this.raftGroupConfigurationConverter.fromBytes(this.updatesStorage.configuration());
        if (committedGroupConfiguration != null) {
            this.setCurrentGroupTopology(committedGroupConfiguration);
        }
    }

    public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
        iterator.forEachRemaining(clo -> {
            Command command = clo.command();
            assert (false) : "No read commands expected, [cmd=" + command + "]";
        });
    }

    public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
        iterator.forEachRemaining(clo -> {
            CommandResult result;
            WriteCommand command = (WriteCommand)clo.command();
            long commandIndex = clo.index();
            long commandTerm = clo.term();
            @Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp();
            assert (safeTimestamp == null || command instanceof SafeTimePropagatingCommand) : command;
            this.checkCommandIndex(commandIndex);
            try {
                result = this.processCommand(command, commandIndex, commandTerm, safeTimestamp);
            }
            catch (Throwable error) {
                LOG.error("Got error while processing command [commandIndex={}, commandTerm={}, command={}]", error, new Object[]{clo.index(), clo.index(), command});
                clo.result((Serializable)error);
                throw error;
            }
            clo.result(result.result());
        });
    }

    private void checkCommandIndex(long commandIndex) {
        assert (commandIndex > this.updatesStorage.lastAppliedIndex()) : "Write command must have an index greater than that of the update storage [commandIndex=" + commandIndex + ", updatesStorageIndex=" + this.updatesStorage.lastAppliedIndex() + "]";
    }

    private CommandResult handleSafeTimeSyncCommand(long commandIndex, long commandTerm) {
        this.updatesStorage.lastApplied(commandIndex, commandTerm);
        return CommandResult.EMPTY_APPLIED_RESULT;
    }

    private CommandResult handleSecondarySafeTimeSyncCommand(SecondarySafeTimeSyncCommand command, long commandIndex, long commandTerm) {
        HybridTimestamp proposedSafeTime = command.proposedSafeTime();
        this.updateSafeTimeIfNeeded(proposedSafeTime);
        this.updatesStorage.lastApplied(commandIndex, commandTerm);
        return CommandResult.EMPTY_APPLIED_RESULT;
    }

    private CommandResult handleUpdateCommand(UpdateAllSecondaryStorageCommand command, long commandIndex, long commandTerm) {
        assert (this.secondaryStorage != null) : "Secondary storage is null";
        if (this.isNotPrimaryReplica(command.leaseStartTime())) {
            return this.isNotPrimaryResult();
        }
        ArrayList<BinaryRowAndRowId> batchToWrite = new ArrayList<BinaryRowAndRowId>();
        long commitTimestamp = command.rows().isEmpty() ? 0L : ((BinaryRowWithTombstoneMessage)command.rows().get(0)).commitTimestamp();
        for (BinaryRowWithTombstoneMessage row : command.rows()) {
            if (row.commitTimestamp() <= ((HybridTimestamp)this.safeTime.current()).longValue()) {
                LOG.info("Skipping partition update over potential concurrent replications [tableId={}, partitionId={}]", new Object[]{this.tableId, this.partitionId});
                continue;
            }
            boolean commitChanged = commitTimestamp != row.commitTimestamp();
            HybridTimestamp hybridCommitTimestamp = HybridTimestamp.hybridTimestamp((long)commitTimestamp);
            if (batchToWrite.size() >= 16 || commitChanged) {
                this.secondaryStorage.writeBatch(batchToWrite, hybridCommitTimestamp);
                batchToWrite.clear();
            }
            if (commitChanged) {
                this.updateSafeTimeIfNeeded(hybridCommitTimestamp);
            }
            commitTimestamp = row.commitTimestamp();
            BinaryRowAndRowId binaryRowAndRowId = this.createBinaryRowAndRowId(row);
            batchToWrite.add(binaryRowAndRowId);
        }
        if (!batchToWrite.isEmpty()) {
            HybridTimestamp hybridCommitTimestamp = HybridTimestamp.hybridTimestamp((long)commitTimestamp);
            this.secondaryStorage.writeBatch(batchToWrite, hybridCommitTimestamp);
        }
        this.updatesStorage.lastApplied(commandIndex, commandTerm);
        return CommandResult.EMPTY_APPLIED_RESULT;
    }

    private CommandResult handleUpdateRowVersionsCommand(TransferRowVersionsToSecondaryStorageCommand command, long commandIndex, long commandTerm) {
        assert (this.secondaryStorage != null) : "Secondary storage is null";
        if (this.isNotPrimaryReplica(command.leaseStartTime())) {
            return this.isNotPrimaryResult();
        }
        ArrayList<TimedBinaryRowAndRowId> batchToWrite = new ArrayList<TimedBinaryRowAndRowId>();
        UUID prevRowId = command.rows().isEmpty() ? RowId.lowestRowId((int)this.partitionId).uuid() : ((BinaryRowWithTombstoneMessage)command.rows().get(0)).rowId();
        for (BinaryRowWithTombstoneMessage currentRow : command.rows()) {
            boolean rowChanged;
            boolean bl = rowChanged = !prevRowId.equals(currentRow.rowId());
            if (batchToWrite.size() >= 16 || rowChanged) {
                this.secondaryStorage.writeBatch(batchToWrite);
                batchToWrite.clear();
            }
            prevRowId = currentRow.rowId();
            TimedBinaryRowAndRowId binaryRowAndRowId = this.createTimedBinaryRowAndRowId(currentRow);
            batchToWrite.add(binaryRowAndRowId);
        }
        if (!batchToWrite.isEmpty()) {
            this.secondaryStorage.writeBatch(batchToWrite);
        }
        this.updatesStorage.lastApplied(commandIndex, commandTerm);
        return CommandResult.EMPTY_APPLIED_RESULT;
    }

    private boolean isNotPrimaryReplica(HybridTimestamp cmdLeaseStartTime) {
        long leaseStartTime = this.updatesStorage.leaseStartTime();
        return leaseStartTime != cmdLeaseStartTime.longValue();
    }

    private CommandResult isNotPrimaryResult() {
        UUID nodeId = this.updatesStorage.primaryReplicaNodeId();
        String nodeName = this.updatesStorage.primaryReplicaNodeName();
        UpdateCommandResult updateCommandResult = new UpdateCommandResult(false, Long.valueOf(this.updatesStorage.leaseStartTime()), this.isPrimaryInGroupTopology(nodeId, nodeName), 0L);
        return new CommandResult((Serializable)updateCommandResult, false);
    }

    private void updateSafeTimeIfNeeded(HybridTimestamp proposedSafeTime) {
        if (proposedSafeTime.compareTo((HybridTimestamp)this.safeTime.current()) > 0) {
            this.safeTime.update((Comparable)proposedSafeTime, null);
        } else {
            LOG.info("Skipping partition safe time update over potential concurrent replications [tableId={}, partitionId={}]", new Object[]{this.tableId, this.partitionId});
        }
    }

    private BinaryRowAndRowId createBinaryRowAndRowId(BinaryRowWithTombstoneMessage row) {
        RowId rowId = new RowId(this.partitionId, row.rowId());
        BinaryRowImpl binaryRow = new BinaryRowImpl(row.schemaVersion(), row.binaryTuple());
        return new BinaryRowAndRowId((BinaryRow)binaryRow, rowId, row.tombstone());
    }

    private TimedBinaryRowAndRowId createTimedBinaryRowAndRowId(BinaryRowWithTombstoneMessage row) {
        RowId rowId = new RowId(this.partitionId, row.rowId());
        BinaryRowImpl binaryRow = new BinaryRowImpl(row.schemaVersion(), row.binaryTuple());
        return new TimedBinaryRowAndRowId((BinaryRow)binaryRow, rowId, HybridTimestamp.hybridTimestamp((long)row.commitTimestamp()), row.tombstone());
    }

    private CommandResult handlePrimaryReplicaChangeCommand(PrimaryReplicaChangeCommand command, long commandIndex, long commandTerm) {
        this.updatesStorage.updateLease(command.leaseStartTime(), command.primaryReplicaNodeId(), command.primaryReplicaNodeName());
        this.updatesStorage.lastApplied(commandIndex, commandTerm);
        return CommandResult.EMPTY_APPLIED_RESULT;
    }

    public CommandResult processCommand(WriteCommand command, long commandIndex, long commandTerm, @Nullable HybridTimestamp safeTimestamp) {
        if (command instanceof SafeTimeSyncCommand) {
            return this.handleSafeTimeSyncCommand(commandIndex, commandTerm);
        }
        if (command instanceof SecondarySafeTimeSyncCommand) {
            return this.handleSecondarySafeTimeSyncCommand((SecondarySafeTimeSyncCommand)command, commandIndex, commandTerm);
        }
        if (command instanceof UpdateAllSecondaryStorageCommand) {
            return this.handleUpdateCommand((UpdateAllSecondaryStorageCommand)command, commandIndex, commandTerm);
        }
        if (command instanceof TransferRowVersionsToSecondaryStorageCommand) {
            return this.handleUpdateRowVersionsCommand((TransferRowVersionsToSecondaryStorageCommand)command, commandIndex, commandTerm);
        }
        if (command instanceof PrimaryReplicaChangeCommand) {
            return this.handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand)command, commandIndex, commandTerm);
        }
        if (command instanceof UpdateMinimumActiveTxBeginTimeCommand) {
            return CommandResult.EMPTY_APPLIED_RESULT;
        }
        LOG.warn("Received unsupported command [cmd = {}]", new Object[]{command});
        return CommandResult.EMPTY_NOT_APPLIED_RESULT;
    }

    public void onConfigurationCommitted(RaftGroupConfiguration config, long lastAppliedIndex, long lastAppliedTerm) {
        this.setCurrentGroupTopology(config);
        RaftGroupConfiguration raftConfig = new RaftGroupConfiguration(lastAppliedIndex, lastAppliedTerm, (Collection)config.peers(), (Collection)config.learners(), (Collection)config.oldPeers(), (Collection)config.oldLearners());
        this.updatesStorage.updateConfiguration(config.index(), config.term(), this.raftGroupConfigurationConverter.toBytes(raftConfig));
    }

    public void initialize(@Nullable RaftGroupConfiguration config, @Nullable LeaseInfo leaseInfo, long lastAppliedIndex, long lastAppliedTerm) {
        if (lastAppliedIndex <= this.updatesStorage.lastAppliedIndex()) {
            return;
        }
        if (config != null) {
            this.setCurrentGroupTopology(config);
            this.updatesStorage.updateConfiguration(lastAppliedIndex, lastAppliedTerm, this.raftGroupConfigurationConverter.toBytes(config));
        }
        if (leaseInfo != null) {
            this.updatesStorage.updateLease(leaseInfo.leaseStartTime(), leaseInfo.primaryReplicaNodeId(), leaseInfo.primaryReplicaNodeName());
        }
        this.updatesStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
        this.updatesStorage.flush();
    }

    public long lastAppliedIndex() {
        return this.updatesStorage.lastAppliedIndex();
    }

    public long lastAppliedTerm() {
        return this.updatesStorage.lastAppliedTerm();
    }

    public void lastApplied(long lastAppliedIndex, long lastAppliedTerm) {
        if (lastAppliedIndex <= this.updatesStorage.lastAppliedIndex()) {
            return;
        }
        this.updatesStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
    }

    public CompletableFuture<Void> flushStorage() {
        return this.updatesStorage.flush();
    }

    public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
        IgniteUtils.inBusyLock((IgniteSpinBusyLock)this.busyLock, () -> this.onSnapshotSaveBusy(doneClo));
    }

    private void onSnapshotSaveBusy(Consumer<Throwable> doneClo) {
        CompletableFuture.allOf(this.updatesStorage.flush()).whenComplete((unused, throwable) -> doneClo.accept((Throwable)throwable));
    }

    public boolean onSnapshotLoad(Path path) {
        return true;
    }

    public void onShutdown() {
        this.busyLock.block();
    }

    @TestOnly
    public PendingComparableValuesTracker<HybridTimestamp, Void> safeTime() {
        return this.safeTime;
    }

    private void setCurrentGroupTopology(RaftGroupConfiguration config) {
        this.currentGroupTopology.clear();
        this.currentGroupTopology.addAll(config.peers());
        this.currentGroupTopology.addAll(config.learners());
    }

    private boolean isPrimaryInGroupTopology(@Nullable UUID nodeId, @Nullable String nodeName) {
        return nodeId == null || nodeName == null || this.currentGroupTopology.contains(nodeName);
    }
}

