/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database.snapshot;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryMetadataHandler;
import org.apache.ignite.internal.binary.BinaryNoopMetadataHandler;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.MarshallerContext;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.gridgain.grid.configuration.SnapshotConfiguration;
import org.gridgain.grid.internal.processors.cache.database.SnapshotMetricsMXBeanImpl;
import org.gridgain.grid.internal.processors.cache.database.SnapshotOperationStage;
import org.gridgain.grid.internal.processors.cache.database.messages.ClusterWideSnapshotOperationStageFinishedMessage;
import org.gridgain.grid.internal.processors.cache.database.messages.SnapshotOperationStageFinishedMessage;
import org.gridgain.grid.internal.processors.cache.database.snapshot.CustomStage;
import org.gridgain.grid.internal.processors.cache.database.snapshot.CustomStageContext;
import org.gridgain.grid.internal.processors.cache.database.snapshot.CustomStagePayloadWrapper;
import org.gridgain.grid.internal.processors.cache.database.snapshot.CustomStagesConfiguration;
import org.gridgain.grid.internal.processors.cache.database.snapshot.GridCacheSnapshotManager;
import org.gridgain.grid.internal.processors.cache.database.snapshot.GridSnapshotOperationEx;
import org.gridgain.grid.internal.processors.cache.database.snapshot.PausableExecutor;
import org.gridgain.grid.internal.processors.cache.database.snapshot.SnapshotConfigurableFuture;
import org.gridgain.grid.internal.processors.cache.database.snapshot.SnapshotOperationInfoImpl;
import org.gridgain.grid.persistentstore.SnapshotOperationType;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.verification.VerificationMode;

@RunWith(value=MockitoJUnitRunner.class)
public class SnapshotConfigurableFutureUnitTest {
    private final IgniteUuid futureId = IgniteUuid.fromString((String)"2222-22222222-2222-2222-2222-222222222222");
    private final UUID initiatorId = UUID.randomUUID();
    @Mock
    private GridCacheSnapshotManager gridCacheSnapshotManager;
    @Mock
    private GridCacheSharedContext gridCacheSharedContext;
    @Mock
    private SnapshotMetricsMXBeanImpl snapshotMetricsMXBean;
    @Mock
    private SnapshotOperationInfoImpl snapshotOperationInfo;
    @Mock
    private GridSnapshotOperationEx snapshotOperationEx;
    @Mock
    private MarshallerContext marshallerContext;
    @Mock
    private GridDiscoveryManager gridDiscoveryManager;
    @Mock
    private DiscoCache discoCache;
    @Mock
    private DiscoveryDataClusterState discoDataClusterState;
    @Mock
    private GridKernalContext gridKernalContext;
    @Mock
    private GridClusterStateProcessor gridClusterStateProcessor;
    @Mock
    private GridIoManager gridIoManager;
    @Mock
    private GridCacheProcessor gridCacheProcessor;
    @Captor
    private ArgumentCaptor<Message> messageCaptor;
    private final ClusterNode crd = new LocalAwareTestNode(UUID.fromString("11111111-1111-1111-1111-111111111111"));
    private final ClusterNode nonCrd = new LocalAwareTestNode(UUID.fromString("22222222-2222-2222-2222-222222222222"));
    private final BaselineTopology baselineTopology = BaselineTopology.build(Arrays.asList(this.crd, this.nonCrd), (int)1);
    private final ListeningTestLogger logger = new ListeningTestLogger();
    private final IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
    private final BinaryMarshaller marshaller = new BinaryMarshaller();
    private final AffinityTopologyVersion topVer = new AffinityTopologyVersion(1L);
    private final CountDownLatch controllableExecutorServiceShutdownLatch = new CountDownLatch(1);
    @Mock
    private ExecutorService controllableExecutorService;
    private final PausableExecutor pausableExecutor = new PausableExecutor();
    private final List<String> logMessages = new CopyOnWriteArrayList<String>();
    private ClusterNode localNode;
    private SnapshotConfigurableFuture<Void> future;

    @Before
    public void initMocks() {
        ((GridCacheSharedContext)Mockito.doReturn((Object)this.gridDiscoveryManager).when((Object)this.gridCacheSharedContext)).discovery();
        ((GridDiscoveryManager)Mockito.doReturn((Object)this.discoCache).when((Object)this.gridDiscoveryManager)).discoCache((AffinityTopologyVersion)ArgumentMatchers.any());
        ((DiscoCache)Mockito.doReturn((Object)this.discoDataClusterState).when((Object)this.discoCache)).state();
        ((DiscoCache)Mockito.doReturn(Arrays.asList(this.crd, this.nonCrd)).when((Object)this.discoCache)).aliveBaselineNodes();
        ((DiscoCache)Mockito.doReturn(Arrays.asList(this.crd, this.nonCrd)).when((Object)this.discoCache)).aliveServerNodes();
        ((DiscoCache)Mockito.doReturn(Arrays.asList(this.crd, this.nonCrd)).when((Object)this.discoCache)).serverNodes();
        ((DiscoveryDataClusterState)Mockito.doReturn((Object)this.baselineTopology).when((Object)this.discoDataClusterState)).baselineTopology();
        ((GridCacheSharedContext)Mockito.doReturn((Object)this.gridKernalContext).when((Object)this.gridCacheSharedContext)).kernalContext();
        ((GridKernalContext)Mockito.doReturn((Object)this.gridClusterStateProcessor).when((Object)this.gridKernalContext)).state();
        ((GridClusterStateProcessor)Mockito.doReturn((Object)this.discoDataClusterState).when((Object)this.gridClusterStateProcessor)).clusterState();
        ((GridCacheSharedContext)Mockito.doReturn((Object)this.marshaller).when((Object)this.gridCacheSharedContext)).marshaller();
        ((GridCacheSharedContext)Mockito.doReturn((Object)this.gridIoManager).when((Object)this.gridCacheSharedContext)).gridIO();
        ((GridCacheSharedContext)Mockito.doReturn((Object)this.gridCacheProcessor).when((Object)this.gridCacheSharedContext)).cache();
        ((GridCacheSnapshotManager)Mockito.doAnswer(this::executeOnPausableExecutor).when((Object)this.gridCacheSnapshotManager)).submitTaskToSnapshotExecutor((SnapshotOperationType)ArgumentMatchers.any(), (Runnable)ArgumentMatchers.any());
        ((ExecutorService)Mockito.doAnswer(invocation -> {
            this.controllableExecutorServiceShutdownLatch.await();
            return null;
        }).when((Object)this.controllableExecutorService)).shutdownNow();
    }

    @Nullable
    private Object executeOnPausableExecutor(InvocationOnMock invocation) {
        Runnable task = (Runnable)invocation.getArgument(1);
        this.pausableExecutor.execute(task);
        return null;
    }

    @Before
    public void initMarshaller() throws Exception {
        this.marshaller.setContext(this.marshallerContext);
        BinaryContext binaryContext = new BinaryContext((BinaryMetadataHandler)BinaryNoopMetadataHandler.instance(), this.igniteConfiguration, (IgniteLogger)this.logger);
        this.marshaller.setBinaryContext(binaryContext, this.igniteConfiguration);
    }

    @Before
    public void startTestClockTimer() {
        IgniteUtils.onGridStart();
    }

    @Before
    public void createFuture() {
        this.logger.registerListener(this.logMessages::add);
        ((GridCacheSharedContext)Mockito.doReturn((Object)this.logger).when((Object)this.gridCacheSharedContext)).logger((Class)ArgumentMatchers.any(Class.class));
        this.future = new SnapshotConfigurableFuture(2, this.futureId, false, this.initiatorId, null, null, this.gridCacheSnapshotManager, this.gridCacheSharedContext, new SnapshotConfiguration(), this.snapshotMetricsMXBean);
        this.future.executorSrvc = this.controllableExecutorService;
    }

    @After
    public void cleanup() throws Exception {
        this.controllableExecutorServiceShutdownLatch.countDown();
        this.pausableExecutor.shutdownNow();
        IgniteUtils.onGridStop();
    }

    @Test
    public void destroyRequestShouldNotInterfereWithLocalStageCompletionAckMessageSend() throws Exception {
        this.injectStagesConfiguration(new CustomStagesConfiguration().addStage((CustomStage)new NoOpStage()).addStage((CustomStage)new NoOpStage()));
        this.setLocalNodeTo(this.nonCrd);
        this.future.init(this.snapshotOperationInfo);
        this.future.start(this.topVer);
        this.waitTillFirstStageIsComplete();
        this.destroyFutureAfterProcessingStartedAndBeforeResultingMessageSent();
        boolean twoStagesFinished = GridTestUtils.waitForCondition(this::twoStagesAreFinished, (long)10000L);
        Assert.assertTrue((String)"Did not see two stages having finished in time", (boolean)twoStagesFinished);
        this.assertThatTwoMessagesAboutTwoFinishedStagesAreSent();
    }

    private void destroyFutureAfterProcessingStartedAndBeforeResultingMessageSent() throws IgniteCheckedException {
        this.pausableExecutor.pause();
        this.future.onMessage(this.crd, this.firstStageFinishedMessage());
        GridTestUtils.runAsync(() -> this.future.destroy((Throwable)new Exception("Stopping node...")));
        boolean markedAsDestroyed = GridTestUtils.waitForCondition(() -> this.future.destroyed.get(), (long)10000L);
        Assert.assertTrue((String)"Did not see marked-as-destroyed in time", (boolean)markedAsDestroyed);
        this.pausableExecutor.unpause();
    }

    private void injectStagesConfiguration(CustomStagesConfiguration stagesConfiguration) {
        HashMap<String, Object> extraParams = new HashMap<String, Object>();
        extraParams.put("CUSTOM_STAGES_CONFIGURATION", stagesConfiguration);
        extraParams.put("IMPLICIT_SNAPSHOT_OPERATION", Boolean.TRUE);
        ((SnapshotOperationInfoImpl)Mockito.doReturn((Object)this.snapshotOperationEx).when((Object)this.snapshotOperationInfo)).snapshotOperation();
        Mockito.when((Object)this.snapshotOperationEx.extraParameter()).thenReturn(extraParams);
    }

    private void setLocalNodeTo(ClusterNode node) {
        ((GridCacheSharedContext)Mockito.doReturn((Object)node).when((Object)this.gridCacheSharedContext)).localNode();
        ((GridCacheSharedContext)Mockito.doReturn((Object)node.id()).when((Object)this.gridCacheSharedContext)).localNodeId();
        this.localNode = node;
    }

    @NotNull
    private ClusterWideSnapshotOperationStageFinishedMessage firstStageFinishedMessage() throws IgniteCheckedException {
        byte[] payload = this.marshaller.marshal((Object)new CustomStagePayloadWrapper(0, null));
        return new ClusterWideSnapshotOperationStageFinishedMessage(this.futureId, true, null, this.topVer, Collections.emptyList(), SnapshotOperationStage.FIRST, payload);
    }

    private void waitTillFirstStageIsComplete() throws IgniteInterruptedCheckedException {
        this.waitTillLoggedMessageStartingWith("Finished FIRST:0 stage of snapshot operation");
    }

    private void waitTillLoggedMessageStartingWith(String prefix) throws IgniteInterruptedCheckedException {
        boolean conditionMet = GridTestUtils.waitForCondition(() -> this.logMessages.stream().anyMatch(s -> s.startsWith(prefix)), (long)10000L);
        Assert.assertTrue((String)("Did not see a message with prefix " + prefix), (boolean)conditionMet);
    }

    private CustomStagePayloadWrapper unmarshalCustomStageWrapper(byte[] payload) {
        try {
            return (CustomStagePayloadWrapper)this.marshaller.unmarshal(payload, this.getClass().getClassLoader());
        }
        catch (IgniteCheckedException e) {
            throw new IllegalStateException(e);
        }
    }

    private boolean twoStagesAreFinished() {
        long finishedStagesCount = this.logMessages.stream().filter(s -> s.matches("Finished (.+) stage of snapshot operation.+")).count();
        return finishedStagesCount >= 2L;
    }

    private void assertThatTwoMessagesAboutTwoFinishedStagesAreSent() throws IgniteCheckedException {
        ((GridIoManager)Mockito.verify((Object)this.gridIoManager, (VerificationMode)Mockito.atLeastOnce())).sendToGridTopic((ClusterNode)ArgumentMatchers.eq((Object)this.crd), (GridTopic)ArgumentMatchers.eq((Object)GridTopic.TOPIC_SNAPSHOT), (Message)this.messageCaptor.capture(), ArgumentMatchers.anyByte());
        List localFinishMessages = this.messageCaptor.getAllValues().stream().filter(m -> m instanceof SnapshotOperationStageFinishedMessage).map(SnapshotOperationStageFinishedMessage.class::cast).collect(Collectors.toList());
        MatcherAssert.assertThat(localFinishMessages, (Matcher)Matchers.hasSize((int)2));
        long cancelStageFinishedCount = localFinishMessages.stream().filter(m -> m.stage() == SnapshotOperationStage.CANCELLED).count();
        MatcherAssert.assertThat((Object)cancelStageFinishedCount, (Matcher)Matchers.is((Object)0L));
        long customStage1FinishedCount = localFinishMessages.stream().filter(m -> m.stage() == SnapshotOperationStage.CUSTOM).filter(m -> this.unmarshalCustomStageWrapper(m.payload()).stageNum() == 1).count();
        MatcherAssert.assertThat((Object)customStage1FinishedCount, (Matcher)Matchers.is((Object)1L));
    }

    @Test
    public void successfulLocalMessageWithStageCancelShouldBeIgnoredOnTheCoordinator() throws Exception {
        this.injectStagesConfiguration(new CustomStagesConfiguration().addStage((CustomStage)new NoOpStage()).addStage((CustomStage)new NoOpStage()));
        this.setLocalNodeTo(this.crd);
        this.future.init(this.snapshotOperationInfo);
        this.future.start(this.topVer);
        this.waitTillFirstStageIsComplete();
        this.future.onMessage(this.nonCrd, this.successfulLocalCancelMessage());
        this.waitTillLoggedMessageStartingWith("Received message with stage=CANCELLED but success=true");
    }

    @NotNull
    private SnapshotOperationStageFinishedMessage successfulLocalCancelMessage() throws IgniteCheckedException {
        byte[] payload = this.marshaller.marshal((Object)new CustomStagePayloadWrapper(1, null));
        return new SnapshotOperationStageFinishedMessage(this.futureId, SnapshotOperationStage.CANCELLED, true, System.currentTimeMillis(), null, payload);
    }

    private static class NoOpStage
    implements CustomStage {
        private NoOpStage() {
        }

        public boolean execute(CustomStageContext stageContext) {
            return true;
        }
    }

    private class LocalAwareTestNode
    extends GridTestNode {
        public LocalAwareTestNode(UUID id) {
            super(id);
        }

        public boolean isLocal() {
            if (SnapshotConfigurableFutureUnitTest.this.localNode == null) {
                throw new IllegalStateException("Make sure to initialize #localNode!");
            }
            return SnapshotConfigurableFutureUnitTest.this.localNode.equals((Object)this);
        }
    }
}

