package org.apache.ignite.internal.lowwatermark;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.FailureType;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEventParameters;
import org.apache.ignite.internal.lowwatermark.message.GetLowWatermarkRequest;
import org.apache.ignite.internal.lowwatermark.message.LowWatermarkMessageGroup;
import org.apache.ignite.internal.lowwatermark.message.LowWatermarkMessagesFactory;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.class */
public class LowWatermarkImpl extends AbstractEventProducer<LowWatermarkEvent, LowWatermarkEventParameters> implements LowWatermark, IgniteComponent {
    private static final IgniteLogger LOG;
    static final ByteArray LOW_WATERMARK_VAULT_KEY;
    private static final LowWatermarkMessagesFactory MESSAGES_FACTORY;
    private final LowWatermarkConfiguration lowWatermarkConfig;
    private final ClockService clockService;
    private final VaultManager vaultManager;
    private final ScheduledExecutorService scheduledThreadPool;

    @Nullable
    private volatile HybridTimestamp lowWatermark;
    private final FailureProcessor failureProcessor;
    private final MessagingService messagingService;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
    private final AtomicBoolean closeGuard = new AtomicBoolean();
    private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture = new AtomicReference<>();
    private final ReadWriteLock updateLowWatermarkLock = new ReentrantReadWriteLock();
    private final AtomicReference<LowWatermarkCandidate> lowWatermarkCandidate = new AtomicReference<>(new LowWatermarkCandidate(HybridTimestamp.MIN_VALUE, CompletableFutures.nullCompletedFuture()));

    public LowWatermarkImpl(String str, LowWatermarkConfiguration lowWatermarkConfiguration, ClockService clockService, VaultManager vaultManager, FailureProcessor failureProcessor, MessagingService messagingService) {
        this.lowWatermarkConfig = lowWatermarkConfiguration;
        this.clockService = clockService;
        this.vaultManager = vaultManager;
        this.failureProcessor = failureProcessor;
        this.messagingService = messagingService;
        this.scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.create(str, "low-watermark-updater", LOG));
    }

    public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            setLowWatermarkOnRecovery(readLowWatermarkFromVault());
            this.messagingService.addMessageHandler(LowWatermarkMessageGroup.class, this::onReceiveNetworkMessage);
            return CompletableFutures.nullCompletedFuture();
        });
    }

    public void scheduleUpdates() {
        IgniteUtils.inBusyLock(this.busyLock, this::scheduleUpdateLowWatermarkBusy);
    }

    @Nullable
    private HybridTimestamp readLowWatermarkFromVault() {
        VaultEntry vaultEntry = this.vaultManager.get(LOW_WATERMARK_VAULT_KEY);
        if (vaultEntry == null) {
            return null;
        }
        return (HybridTimestamp) ByteUtils.fromBytes(vaultEntry.value());
    }

    public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
        if (!this.closeGuard.compareAndSet(false, true)) {
            return CompletableFutures.nullCompletedFuture();
        }
        this.busyLock.block();
        ScheduledFuture<?> scheduledFuture = this.lastScheduledTaskFuture.get();
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
        }
        IgniteUtils.shutdownAndAwaitTermination(this.scheduledThreadPool, 10L, TimeUnit.SECONDS);
        return CompletableFutures.nullCompletedFuture();
    }

    @Override // org.apache.ignite.internal.lowwatermark.LowWatermark
    @Nullable
    public HybridTimestamp getLowWatermark() {
        return (HybridTimestamp) IgniteUtils.inBusyLock(this.busyLock, () -> {
            return this.lowWatermark;
        });
    }

    @Override // org.apache.ignite.internal.lowwatermark.LowWatermark
    public void getLowWatermarkSafe(Consumer<HybridTimestamp> consumer) {
        IgniteUtils.inBusyLock(this.busyLock, () -> {
            this.updateLowWatermarkLock.readLock().lock();
            try {
                consumer.accept(this.lowWatermark);
            } finally {
                this.updateLowWatermarkLock.readLock().unlock();
            }
        });
    }

    private void scheduleUpdateLowWatermarkBusy() {
        ScheduledFuture<?> scheduledFuture = this.lastScheduledTaskFuture.get();
        if (!$assertionsDisabled && scheduledFuture != null && !scheduledFuture.isDone()) {
            throw new AssertionError("previous scheduled task has not finished");
        }
        boolean compareAndSet = this.lastScheduledTaskFuture.compareAndSet(scheduledFuture, this.scheduledThreadPool.schedule(() -> {
            updateLowWatermark(createNewLowWatermarkCandidate());
        }, ((Long) this.lowWatermarkConfig.updateFrequency().value()).longValue(), TimeUnit.MILLISECONDS));
        if (!$assertionsDisabled && !compareAndSet) {
            throw new AssertionError("only one scheduled task is expected");
        }
    }

    HybridTimestamp createNewLowWatermarkCandidate() {
        return this.clockService.now().subtractPhysicalTime(((Long) this.lowWatermarkConfig.dataAvailabilityTime().value()).longValue() + this.clockService.maxClockSkewMillis());
    }

    private void setLowWatermark(HybridTimestamp hybridTimestamp) {
        this.updateLowWatermarkLock.writeLock().lock();
        try {
            if (!$assertionsDisabled && this.lowWatermark != null && hybridTimestamp.compareTo(this.lowWatermark) <= 0) {
                throw new AssertionError("Low watermark should only grow: [cur=" + this.lowWatermark + ", new=" + hybridTimestamp + "]");
            }
            this.lowWatermark = hybridTimestamp;
        } finally {
            this.updateLowWatermarkLock.writeLock().unlock();
        }
    }

    private void setLowWatermarkOnRecovery(@Nullable HybridTimestamp hybridTimestamp) {
        this.updateLowWatermarkLock.writeLock().lock();
        try {
            this.lowWatermark = hybridTimestamp;
        } finally {
            this.updateLowWatermarkLock.writeLock().unlock();
        }
    }

    void onReceiveNetworkMessage(NetworkMessage networkMessage, ClusterNode clusterNode, @Nullable Long l) {
        IgniteUtils.inBusyLock(this.busyLock, () -> {
            if (networkMessage instanceof GetLowWatermarkRequest) {
                if (!$assertionsDisabled && l == null) {
                    throw new AssertionError(clusterNode);
                }
                this.messagingService.respond(clusterNode, MESSAGES_FACTORY.getLowWatermarkResponse().lowWatermark(HybridTimestamp.hybridTimestampToLong(this.lowWatermark)).build(), l.longValue());
            }
        });
    }

    @Override // org.apache.ignite.internal.lowwatermark.LowWatermark
    public void updateLowWatermark(HybridTimestamp hybridTimestamp) {
        updateLowWatermark(hybridTimestamp, true);
    }

    @Override // org.apache.ignite.internal.lowwatermark.LowWatermark
    public void updateLowWatermark(HybridTimestamp hybridTimestamp, boolean z) {
        IgniteUtils.inBusyLock(this.busyLock, () -> {
            LowWatermarkCandidate lowWatermarkCandidate;
            LowWatermarkCandidate lowWatermarkCandidate2 = new LowWatermarkCandidate(hybridTimestamp, new CompletableFuture());
            do {
                lowWatermarkCandidate = this.lowWatermarkCandidate.get();
                if (lowWatermarkCandidate.lowWatermark().compareTo(hybridTimestamp) >= 0) {
                    return;
                }
            } while (!this.lowWatermarkCandidate.compareAndSet(lowWatermarkCandidate, lowWatermarkCandidate2));
            lowWatermarkCandidate.updateFuture().thenComposeAsync(r7 -> {
                return updateAndNotify(hybridTimestamp, z);
            }, (Executor) this.scheduledThreadPool).whenComplete((BiConsumer<? super U, ? super Throwable>) (r4, th) -> {
                if (th != null) {
                    lowWatermarkCandidate2.updateFuture().completeExceptionally(th);
                } else {
                    lowWatermarkCandidate2.updateFuture().complete(null);
                }
            });
        });
    }

    CompletableFuture<Void> updateAndNotify(HybridTimestamp hybridTimestamp) {
        return updateAndNotify(hybridTimestamp, true);
    }

    CompletableFuture<Void> updateAndNotify(HybridTimestamp hybridTimestamp, boolean z) {
        return IgniteUtils.inBusyLockAsync(this.busyLock, () -> {
            return fireEvent(LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE, new ChangeLowWatermarkEventParameters(hybridTimestamp)).thenComposeAsync(r8 -> {
                this.vaultManager.put(LOW_WATERMARK_VAULT_KEY, ByteUtils.toBytes(hybridTimestamp));
                setLowWatermark(hybridTimestamp);
                return fireEvent(LowWatermarkEvent.LOW_WATERMARK_CHANGED, new ChangeLowWatermarkEventParameters(hybridTimestamp));
            }, (Executor) this.scheduledThreadPool).whenCompleteAsync((r11, th) -> {
                if (th == null) {
                    LOG.info("Successful low watermark update: {}", new Object[]{hybridTimestamp});
                    if (z) {
                        IgniteUtils.inBusyLock(this.busyLock, this::scheduleUpdateLowWatermarkBusy);
                        return;
                    }
                    return;
                }
                if (th instanceof NodeStoppingException) {
                    return;
                }
                LOG.error("Failed to update low watermark, will schedule again: {}", th, new Object[]{hybridTimestamp});
                this.failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, th));
                if (z) {
                    IgniteUtils.inBusyLock(this.busyLock, this::scheduleUpdateLowWatermarkBusy);
                }
            }, (Executor) this.scheduledThreadPool);
        });
    }

    static {
        $assertionsDisabled = !LowWatermarkImpl.class.desiredAssertionStatus();
        LOG = Loggers.forClass(LowWatermarkImpl.class);
        LOW_WATERMARK_VAULT_KEY = new ByteArray("low-watermark");
        MESSAGES_FACTORY = new LowWatermarkMessagesFactory();
    }
}
