/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.table.distributed.storage;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.continuousquery.ContinuousQueryScanResult;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.util.Pair;
import org.jetbrains.annotations.Nullable;

public class ContinuousQueryResponseHandler
implements NetworkMessageHandler {
    private static final IgniteLogger LOG = Loggers.forClass(ContinuousQueryResponseHandler.class);
    private final ConcurrentHashMap<Long, CompletableFuture<ContinuousQueryScanResult<BinaryRow>>> pendingRequests = new ConcurrentHashMap();
    private final AtomicLong requestIdGenerator = new AtomicLong();

    Pair<CompletableFuture<ContinuousQueryScanResult<BinaryRow>>, Long> addPendingRequest() {
        long requestId = this.requestIdGenerator.incrementAndGet();
        CompletableFuture fut = new CompletableFuture();
        this.pendingRequests.put(requestId, fut);
        return new Pair(fut, (Object)requestId);
    }

    void removePendingRequest(long requestId) {
        this.pendingRequests.remove(requestId);
    }

    public void onReceived(NetworkMessage message, InternalClusterNode sender, @Nullable Long correlationId) {
        if (correlationId != null || !(message instanceof ReplicaResponse)) {
            return;
        }
        ReplicaResponse response = (ReplicaResponse)message;
        if (!(response.result() instanceof ContinuousQueryScanResult)) {
            return;
        }
        ContinuousQueryScanResult result = (ContinuousQueryScanResult)response.result();
        CompletableFuture<ContinuousQueryScanResult<BinaryRow>> fut = this.pendingRequests.remove(result.requestId());
        if (fut != null) {
            if (result.error() != null) {
                fut.completeExceptionally(result.error());
            } else {
                fut.complete((ContinuousQueryScanResult<BinaryRow>)result);
            }
        } else {
            LOG.warn("Received unexpected ContinuousQueryScanResult: {}", new Object[]{response});
        }
    }
}

