/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.database.snapshot;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.gridgain.grid.GridGain;
import org.gridgain.grid.internal.processors.cache.database.AbstractSnapshotTest;
import org.gridgain.grid.internal.processors.cache.database.GridSnapshotEx;
import org.gridgain.grid.internal.processors.cache.database.SnapshotOperationStage;
import org.gridgain.grid.internal.processors.cache.database.messages.ClusterWideSnapshotOperationStageFinishedMessage;
import org.gridgain.grid.internal.processors.cache.database.snapshot.CustomStage;
import org.gridgain.grid.internal.processors.cache.database.snapshot.CustomStageContext;
import org.gridgain.grid.internal.processors.cache.database.snapshot.CustomStagePayloadWrapper;
import org.gridgain.grid.internal.processors.cache.database.snapshot.CustomStagesConfiguration;
import org.gridgain.grid.internal.processors.cache.database.snapshot.FaultyStageFinishedCommunicationSpi;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SnapshotCustomOperationCoordinatorChangeDuringStageFinishTest
extends AbstractSnapshotTest {
    private static final String COORDINATOR_CONSISTENT_ID = "crd";
    private static final int NODES = 3;
    private static final int MAGIC_NUMBER = 5;
    @Parameterized.Parameter
    public TestData testData;

    @Override
    protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
        return super.getConfiguration(gridName).setConsistentId((Serializable)((Object)gridName)).setCommunicationSpi((CommunicationSpi)FaultyStageFinishedCommunicationSpi.neverFailing());
    }

    protected void beforeTest() throws Exception {
        super.beforeTest();
        this.cleanSnapshotDirs();
        StageStaticData.reset();
    }

    @Override
    protected void afterTest() throws Exception {
        super.afterTest();
        this.stopAllGrids();
    }

    @Test
    public void testCustomSnapshotOperation() throws Exception {
        FaultyStageFinishedCommunicationSpi commSpi = this.createCrdCommSpi();
        IgniteEx crd = this.startCrd(commSpi);
        IgniteEx aNode = this.startNonCrdNodes();
        List<Object> globalPayloadsFromClusterWideFinishMessages = this.startCollectingClusterWideFinishMessagePayloads(aNode);
        crd.cluster().state(ClusterState.ACTIVE);
        GridGain crdGG = (GridGain)crd.plugin("GridGain");
        GridSnapshotEx crdSnapshot = (GridSnapshotEx)crdGG.snapshot();
        CustomStagesConfiguration stagesCfg = new CustomStagesConfiguration().addStage((CustomStage)new TestStage(1000)).addStage((CustomStage)new TestStage(1000, () -> StageStaticData.stage2Done.set(true))).finalStage((CustomStage)new TestStage(1000, () -> StageStaticData.finalStageDone.set(true)));
        crdSnapshot.customSnapshotOperation(stagesCfg, "Custom operation from " + ((Object)((Object)this)).getClass());
        commSpi.waitTillTriedToSendStageFinishedMessageToAllNodes();
        crd.close();
        this.waitTillCustomOperationHasFinished();
        this.assertThatAllStagesAreDone();
        MatcherAssert.assertThat(globalPayloadsFromClusterWideFinishMessages, (Matcher)Matchers.everyItem((Matcher)Matchers.instanceOf(GlobalPayload.class)));
        this.assertThatCorrectPayloadsAreMerged();
    }

    private void assertThatAllStagesAreDone() {
        SnapshotCustomOperationCoordinatorChangeDuringStageFinishTest.assertTrue((String)"Stage 2 is not done", (boolean)StageStaticData.stage2Done.get());
        SnapshotCustomOperationCoordinatorChangeDuringStageFinishTest.assertTrue((String)"Final stage is not done", (boolean)StageStaticData.finalStageDone.get());
    }

    private void assertThatCorrectPayloadsAreMerged() {
        int onlyNonCrdMerged = 15;
        int nonCardAndCrdMerged = 20;
        if (this.testData.allowsAnyClusterWideFinishMessagesFromOriginalCrd()) {
            MatcherAssert.assertThat((Object)StageStaticData.mergedGlobalPayloads, (Matcher)Matchers.equalTo(Arrays.asList(nonCardAndCrdMerged, onlyNonCrdMerged, onlyNonCrdMerged)));
        } else {
            MatcherAssert.assertThat((Object)StageStaticData.mergedGlobalPayloads, (Matcher)Matchers.equalTo(Arrays.asList(nonCardAndCrdMerged, onlyNonCrdMerged, onlyNonCrdMerged, onlyNonCrdMerged)));
        }
    }

    private FaultyStageFinishedCommunicationSpi createCrdCommSpi() {
        FaultyStageFinishedCommunicationSpi commSpi = FaultyStageFinishedCommunicationSpi.failing(this.testData.allowance);
        commSpi.setLocalPort(GridTestUtils.getNextCommPort(((Object)((Object)this)).getClass()));
        commSpi.setTcpNoDelay(true);
        return commSpi;
    }

    private IgniteEx startCrd(FaultyStageFinishedCommunicationSpi commSpi) throws Exception {
        return this.startGrid(this.getConfiguration(COORDINATOR_CONSISTENT_ID).setCommunicationSpi((CommunicationSpi)commSpi));
    }

    private IgniteEx startNonCrdNodes() throws Exception {
        IgniteEx aNode = null;
        for (int i = 0; i < 3; ++i) {
            aNode = this.startGrid("node" + i);
        }
        return aNode;
    }

    private List<Object> startCollectingClusterWideFinishMessagePayloads(IgniteEx spy) {
        CopyOnWriteArrayList<Object> globalPayloadsFromClusterWideFinishMessages = new CopyOnWriteArrayList<Object>();
        spy.context().io().addMessageListener(GridTopic.TOPIC_SNAPSHOT, (nodeId, msg, plc) -> {
            if (msg instanceof ClusterWideSnapshotOperationStageFinishedMessage) {
                ClusterWideSnapshotOperationStageFinishedMessage broadcastMsg = (ClusterWideSnapshotOperationStageFinishedMessage)msg;
                CustomStagePayloadWrapper wrapper = this.unmarshalWrapper(spy, broadcastMsg);
                if (wrapper.payload() != null) {
                    globalPayloadsFromClusterWideFinishMessages.add(wrapper.payload());
                } else {
                    log.info("The message contains null payload: " + msg + ", it is from " + spy.cluster().node(nodeId).consistentId());
                }
            }
        });
        return globalPayloadsFromClusterWideFinishMessages;
    }

    private CustomStagePayloadWrapper unmarshalWrapper(IgniteEx client, ClusterWideSnapshotOperationStageFinishedMessage broadcastMsg) {
        try {
            return (CustomStagePayloadWrapper)client.configuration().getMarshaller().unmarshal(broadcastMsg.payload(), U.gridClassLoader());
        }
        catch (IgniteCheckedException e) {
            throw new AssertionError((Object)e);
        }
    }

    private void waitTillCustomOperationHasFinished() throws IgniteInterruptedCheckedException {
        GridTestUtils.waitForCondition(StageStaticData.finalStageDone::get, (long)TimeUnit.SECONDS.toMillis(60000L));
    }

    @Parameterized.Parameters(name="{0}")
    public static Collection<TestData> testData() {
        return IntStream.range(0, 8).mapToObj(idx -> {
            boolean node0 = (idx & 1) != 0;
            boolean node1 = (idx & 2) != 0;
            boolean node2 = (idx & 4) != 0;
            return new TestData(new boolean[]{node0, node1, node2});
        }).collect(Collectors.toList());
    }

    private static class GlobalPayload
    implements Serializable {
        private final int value;

        private GlobalPayload(int value) {
            this.value = value;
        }

        public String toString() {
            return "GlobalPayload{value=" + this.value + '}';
        }
    }

    private static class StageStaticData {
        private static final AtomicBoolean stage2Done = new AtomicBoolean();
        private static final AtomicBoolean finalStageDone = new AtomicBoolean();
        private static final List<Integer> mergedGlobalPayloads = new CopyOnWriteArrayList<Integer>();

        private StageStaticData() {
        }

        private static void reset() {
            stage2Done.set(false);
            finalStageDone.set(false);
            mergedGlobalPayloads.clear();
        }
    }

    private static interface CrdHook
    extends Runnable,
    Serializable {
    }

    public static class TestStage
    implements CustomStage {
        private static final long serialVersionUID = 0L;
        private final int maxSleep;
        private final CrdHook crdHook;

        public TestStage(int maxSleep) {
            this(maxSleep, null);
        }

        public TestStage(int maxSleep, CrdHook crdHook) {
            this.maxSleep = maxSleep;
            this.crdHook = crdHook;
        }

        public boolean execute(CustomStageContext stageContext) {
            ThreadLocalRandom random = ThreadLocalRandom.current();
            GridAbstractTest.doSleep((long)((Random)random).nextInt(this.maxSleep));
            log.info("Executing stage: " + stageContext.stageNum() + ", previous stage cluster-wide payload: " + stageContext.previousStageClusterWidePayload() + ", previousStage=" + stageContext.previousStageNum());
            if (stageContext.stageType() == SnapshotOperationStage.CANCELLED) {
                SnapshotCustomOperationCoordinatorChangeDuringStageFinishTest.assertNotNull((Object)stageContext.error());
            }
            stageContext.stageResultPayload((Object)5);
            return true;
        }

        public void mergePayloadOnCrd(CustomStageContext stageContext, UUID nodeId, Object payload) {
            GlobalPayload newGlobalPayload;
            GlobalPayload oldGlobalPayload;
            log.info("Merge payload on coordinator, stage " + stageContext.stageNum() + ", type=" + stageContext.stageType() + ", node=" + stageContext.ctx().grid().cluster().node(nodeId).consistentId() + ", payload=" + payload);
            if (payload == null) {
                return;
            }
            do {
                Integer oldGlobalPayloadInt = (oldGlobalPayload = (GlobalPayload)stageContext.globalPayload()) == null ? null : Integer.valueOf(oldGlobalPayload.value);
                int newGlobalPayloadInt = oldGlobalPayloadInt == null ? (Integer)payload : oldGlobalPayloadInt + (Integer)payload;
                newGlobalPayload = new GlobalPayload(newGlobalPayloadInt);
            } while (!stageContext.globalPayloadRef().compareAndSet(oldGlobalPayload, newGlobalPayload));
            log.info("Set new global payload to " + newGlobalPayload);
        }

        public boolean onStageDoneCrdHook(CustomStageContext stageContext, boolean cancelled) {
            log.info("Crd hook of stage " + stageContext.stageNum() + ", type=" + stageContext.stageType() + ", cancelled=" + cancelled + ", global payload=" + stageContext.globalPayload());
            GlobalPayload globalPayload = (GlobalPayload)stageContext.globalPayload();
            StageStaticData.mergedGlobalPayloads.add(globalPayload.value);
            if (this.crdHook != null) {
                this.crdHook.run();
            }
            return true;
        }
    }

    private static class TestData {
        private final boolean[] allowance;

        private TestData(boolean[] allowance) {
            this.allowance = allowance;
        }

        public String toString() {
            return "Node accessibility = " + Arrays.toString(this.allowance);
        }

        public boolean allowsAnyClusterWideFinishMessagesFromOriginalCrd() {
            for (boolean allowed : this.allowance) {
                if (!allowed) continue;
                return true;
            }
            return false;
        }
    }
}

