/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite3.internal.partition.replicator.raft;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.apache.ignite3.internal.hlc.HybridTimestamp;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.partition.replicator.network.command.TableAwareCommand;
import org.apache.ignite3.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
import org.apache.ignite3.internal.partition.replicator.raft.CommandResult;
import org.apache.ignite3.internal.partition.replicator.raft.OnSnapshotSaveHandler;
import org.apache.ignite3.internal.partition.replicator.raft.PartitionSnapshotInfo;
import org.apache.ignite3.internal.partition.replicator.raft.PartitionSnapshotInfoSerializer;
import org.apache.ignite3.internal.partition.replicator.raft.RaftTableProcessor;
import org.apache.ignite3.internal.partition.replicator.raft.handlers.AbstractCommandHandler;
import org.apache.ignite3.internal.partition.replicator.raft.handlers.CommandHandlers;
import org.apache.ignite3.internal.partition.replicator.raft.handlers.FinishTxCommandHandler;
import org.apache.ignite3.internal.partition.replicator.raft.handlers.VacuumTxStatesCommandHandler;
import org.apache.ignite3.internal.partition.replicator.raft.handlers.WriteIntentSwitchCommandHandler;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.PartitionKey;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.outgoing.PartitionSnapshots;
import org.apache.ignite3.internal.partition.replicator.raft.snapshot.outgoing.PartitionsSnapshots;
import org.apache.ignite3.internal.raft.RaftGroupConfiguration;
import org.apache.ignite3.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite3.internal.raft.ReadCommand;
import org.apache.ignite3.internal.raft.WriteCommand;
import org.apache.ignite3.internal.raft.service.CommandClosure;
import org.apache.ignite3.internal.raft.service.RaftGroupListener;
import org.apache.ignite3.internal.replicator.ZonePartitionId;
import org.apache.ignite3.internal.replicator.command.SafeTimePropagatingCommand;
import org.apache.ignite3.internal.replicator.command.SafeTimeSyncCommand;
import org.apache.ignite3.internal.replicator.message.PrimaryReplicaChangeCommand;
import org.apache.ignite3.internal.storage.lease.LeaseInfo;
import org.apache.ignite3.internal.tx.TxManager;
import org.apache.ignite3.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite3.internal.util.PendingComparableValuesTracker;
import org.apache.ignite3.internal.util.SafeTimeValuesTracker;
import org.apache.ignite3.internal.util.TrackerClosedException;
import org.apache.ignite3.internal.versioned.VersionedSerialization;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class ZonePartitionRaftListener
implements RaftGroupListener {
    private static final IgniteLogger LOG = Loggers.forClass(ZonePartitionRaftListener.class);
    private final SafeTimeValuesTracker safeTimeTracker;
    private final PendingComparableValuesTracker<Long, Void> storageIndexTracker;
    private final Int2ObjectMap<RaftTableProcessor> tableProcessors = new Int2ObjectOpenHashMap();
    private final TxStatePartitionStorage txStateStorage;
    private final PartitionsSnapshots partitionsSnapshots;
    private final PartitionKey partitionKey;
    private long lastAppliedIndex;
    private long lastAppliedTerm;
    private final Object tableProcessorsStateLock = new Object();
    private final OnSnapshotSaveHandler onSnapshotSaveHandler;
    private final CommandHandlers commandHandlers;
    private final RaftGroupConfigurationConverter raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();

    public ZonePartitionRaftListener(ZonePartitionId zonePartitionId, TxStatePartitionStorage txStatePartitionStorage, TxManager txManager, SafeTimeValuesTracker safeTimeTracker, PendingComparableValuesTracker<Long, Void> storageIndexTracker, PartitionsSnapshots partitionsSnapshots, Executor partitionOperationsExecutor) {
        this.safeTimeTracker = safeTimeTracker;
        this.storageIndexTracker = storageIndexTracker;
        this.partitionsSnapshots = partitionsSnapshots;
        this.txStateStorage = txStatePartitionStorage;
        this.partitionKey = new ZonePartitionKey(zonePartitionId.zoneId(), zonePartitionId.partitionId());
        this.onSnapshotSaveHandler = new OnSnapshotSaveHandler(txStatePartitionStorage, partitionOperationsExecutor);
        this.commandHandlers = new CommandHandlers.Builder().addHandler((short)9, (short)40, new FinishTxCommandHandler(txStatePartitionStorage, zonePartitionId, txManager)).addHandler((short)9, (short)41, new WriteIntentSwitchCommandHandler(arg_0 -> this.tableProcessors.get(arg_0), txManager)).addHandler((short)5, (short)13, new VacuumTxStatesCommandHandler(txStatePartitionStorage)).build();
    }

    @Override
    public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
        iterator.forEachRemaining(clo -> {
            Object command = clo.command();
            assert (false) : "No read commands expected, [cmd=" + command + "]";
        });
    }

    @Override
    public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
        iterator.forEachRemaining(clo -> {
            try {
                this.processWriteCommand((CommandClosure<WriteCommand>)clo);
            }
            catch (Throwable t) {
                LOG.error("Unknown error while processing command [commandIndex={}, commandTerm={}, command={}]", t, clo.index(), clo.index(), clo.command());
                clo.result(t);
                throw t;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processWriteCommand(CommandClosure<WriteCommand> clo) {
        CommandResult result;
        WriteCommand command = clo.command();
        long commandIndex = clo.index();
        long commandTerm = clo.term();
        @Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp();
        assert (safeTimestamp == null || command instanceof SafeTimePropagatingCommand) : command;
        Object object = this.tableProcessorsStateLock;
        synchronized (object) {
            this.partitionSnapshots().acquireReadLock();
            try {
                if (command instanceof TableAwareCommand) {
                    result = this.processTableAwareCommand(((TableAwareCommand)((Object)command)).tableId(), command, commandIndex, commandTerm, safeTimestamp);
                } else if (command instanceof UpdateMinimumActiveTxBeginTimeCommand) {
                    result = this.processCrossTableProcessorsCommand(command, commandIndex, commandTerm, safeTimestamp);
                } else if (command instanceof SafeTimeSyncCommand) {
                    result = this.processCrossTableProcessorsCommand(command, commandIndex, commandTerm, safeTimestamp);
                } else if (command instanceof PrimaryReplicaChangeCommand) {
                    result = this.processCrossTableProcessorsCommand(command, commandIndex, commandTerm, safeTimestamp);
                    if (this.updateLeaseInfoInTxStorage((PrimaryReplicaChangeCommand)command, commandIndex, commandTerm)) {
                        result = CommandResult.EMPTY_APPLIED_RESULT;
                    }
                } else {
                    AbstractCommandHandler<?> commandHandler = this.commandHandlers.handler(command.groupType(), command.messageType());
                    if (commandHandler == null) {
                        LOG.info("Message type {} is not supported by the zone partition RAFT listener yet", command.getClass());
                        result = CommandResult.EMPTY_APPLIED_RESULT;
                    } else {
                        result = commandHandler.handle(command, commandIndex, commandTerm, safeTimestamp);
                    }
                }
                if (result.wasApplied()) {
                    if (safeTimestamp != null) {
                        ZonePartitionRaftListener.updateTrackerIgnoringTrackerClosedException(this.safeTimeTracker, safeTimestamp);
                    }
                    ZonePartitionRaftListener.updateTrackerIgnoringTrackerClosedException(this.storageIndexTracker, commandIndex);
                }
                this.lastAppliedIndex = Math.max(this.lastAppliedIndex, commandIndex);
                this.lastAppliedTerm = Math.max(this.lastAppliedTerm, commandTerm);
            }
            finally {
                this.partitionSnapshots().releaseReadLock();
            }
        }
        clo.result(result.result());
    }

    private CommandResult processCrossTableProcessorsCommand(WriteCommand command, long commandIndex, long commandTerm, @Nullable HybridTimestamp safeTimestamp) {
        if (this.tableProcessors.isEmpty()) {
            return new CommandResult(null, this.lastAppliedIndex < commandIndex);
        }
        boolean wasApplied = false;
        for (RaftTableProcessor processor : this.tableProcessors.values()) {
            CommandResult r = processor.processCommand(command, commandIndex, commandTerm, safeTimestamp);
            wasApplied = wasApplied || r.wasApplied();
        }
        return new CommandResult(null, wasApplied);
    }

    private CommandResult processTableAwareCommand(int tableId, WriteCommand command, long commandIndex, long commandTerm, @Nullable HybridTimestamp safeTimestamp) {
        RaftTableProcessor tableProcessor = (RaftTableProcessor)this.tableProcessors.get(tableId);
        if (tableProcessor == null) {
            LOG.warn("Table processor for table ID {} not found. Command will be ignored: {}", tableId, command.toStringForLightLogging());
            return CommandResult.EMPTY_APPLIED_RESULT;
        }
        return tableProcessor.processCommand(command, commandIndex, commandTerm, safeTimestamp);
    }

    private boolean updateLeaseInfoInTxStorage(PrimaryReplicaChangeCommand command, long commandIndex, long commandTerm) {
        if (commandIndex <= this.txStateStorage.lastAppliedIndex()) {
            return false;
        }
        LeaseInfo leaseInfo = new LeaseInfo(command.leaseStartTime(), command.primaryReplicaNodeId(), command.primaryReplicaNodeName());
        this.txStateStorage.leaseInfo(leaseInfo, commandIndex, commandTerm);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onConfigurationCommitted(RaftGroupConfiguration config, long lastAppliedIndex, long lastAppliedTerm) {
        Object object = this.tableProcessorsStateLock;
        synchronized (object) {
            this.partitionSnapshots().acquireReadLock();
            try {
                this.tableProcessors.values().forEach(listener -> listener.onConfigurationCommitted(config, lastAppliedIndex, lastAppliedTerm));
            }
            finally {
                this.partitionSnapshots().releaseReadLock();
            }
            byte[] configBytes = this.raftGroupConfigurationConverter.toBytes(config);
            this.txStateStorage.committedGroupConfiguration(configBytes, lastAppliedIndex, lastAppliedTerm);
            this.lastAppliedIndex = Math.max(this.lastAppliedIndex, lastAppliedIndex);
            this.lastAppliedTerm = Math.max(this.lastAppliedTerm, lastAppliedTerm);
            ZonePartitionRaftListener.updateTrackerIgnoringTrackerClosedException(this.storageIndexTracker, lastAppliedIndex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
        Object object = this.tableProcessorsStateLock;
        synchronized (object) {
            byte[] configuration = this.txStateStorage.committedGroupConfiguration();
            assert (configuration != null) : "Trying to create a snapshot without Raft group configuration";
            PartitionSnapshotInfo snapshotInfo = new PartitionSnapshotInfo(this.lastAppliedIndex, this.lastAppliedTerm, this.txStateStorage.leaseInfo(), configuration, (Collection<Integer>)this.tableProcessors.keySet());
            this.onSnapshotSaveHandler.onSnapshotSave(snapshotInfo, (Collection<RaftTableProcessor>)this.tableProcessors.values()).whenComplete((unused, throwable) -> doneClo.accept((Throwable)throwable));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean onSnapshotLoad(Path path) {
        Object object = this.tableProcessorsStateLock;
        synchronized (object) {
            this.lastAppliedIndex = Math.max(this.lastAppliedIndex, this.txStateStorage.lastAppliedIndex());
            this.lastAppliedTerm = Math.max(this.lastAppliedTerm, this.txStateStorage.lastAppliedTerm());
            this.storageIndexTracker.update(this.lastAppliedIndex, null);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onShutdown() {
        this.cleanupSnapshots();
        Object object = this.tableProcessorsStateLock;
        synchronized (object) {
            this.tableProcessors.values().forEach(RaftTableProcessor::onShutdown);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addTableProcessor(int tableId, RaftTableProcessor processor) {
        Object object = this.tableProcessorsStateLock;
        synchronized (object) {
            RaftGroupConfiguration configuration = this.raftGroupConfigurationConverter.fromBytes(this.txStateStorage.committedGroupConfiguration());
            LeaseInfo leaseInfo = this.txStateStorage.leaseInfo();
            processor.initialize(configuration, leaseInfo, this.lastAppliedIndex, this.lastAppliedTerm);
            RaftTableProcessor prev = (RaftTableProcessor)this.tableProcessors.put(tableId, (Object)processor);
            assert (prev == null) : "Listener for table " + tableId + " already exists";
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addTableProcessorOnRecovery(int tableId, RaftTableProcessor processor) {
        Object object = this.tableProcessorsStateLock;
        synchronized (object) {
            PartitionSnapshotInfo snapshotInfo = this.snapshotInfo();
            if (snapshotInfo != null && !snapshotInfo.tableIds().contains(tableId)) {
                RaftGroupConfiguration configuration = this.raftGroupConfigurationConverter.fromBytes(snapshotInfo.configurationBytes());
                processor.initialize(configuration, snapshotInfo.leaseInfo(), snapshotInfo.lastAppliedIndex(), snapshotInfo.lastAppliedTerm());
            }
            RaftTableProcessor prev = (RaftTableProcessor)this.tableProcessors.put(tableId, (Object)processor);
            assert (prev == null) : "Listener for table " + tableId + " already exists";
        }
    }

    @Nullable
    private PartitionSnapshotInfo snapshotInfo() {
        byte[] snapshotInfoBytes = this.txStateStorage.snapshotInfo();
        if (snapshotInfoBytes == null) {
            return null;
        }
        return VersionedSerialization.fromBytes(snapshotInfoBytes, PartitionSnapshotInfoSerializer.INSTANCE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestOnly
    @Nullable
    public RaftTableProcessor tableProcessor(int tableId) {
        Object object = this.tableProcessorsStateLock;
        synchronized (object) {
            return (RaftTableProcessor)this.tableProcessors.get(tableId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeTableProcessor(int tableId) {
        Object object = this.tableProcessorsStateLock;
        synchronized (object) {
            this.tableProcessors.remove(tableId);
        }
    }

    private static <T extends Comparable<T>> void updateTrackerIgnoringTrackerClosedException(PendingComparableValuesTracker<T, Void> tracker, T newValue) {
        try {
            tracker.update(newValue, null);
        }
        catch (TrackerClosedException trackerClosedException) {
            // empty catch block
        }
    }

    private void cleanupSnapshots() {
        this.partitionsSnapshots.cleanupOutgoingSnapshots(this.partitionKey);
    }

    private PartitionSnapshots partitionSnapshots() {
        return this.partitionsSnapshots.partitionSnapshots(this.partitionKey);
    }

    @TestOnly
    public HybridTimestamp currentSafeTime() {
        return (HybridTimestamp)this.safeTimeTracker.current();
    }
}

