package org.apache.ignite.internal.sql.engine.exec;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.TraceableException;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.class */
public class ExchangeServiceImpl implements ExchangeService {
    private static final IgniteLogger LOG = Loggers.forClass(ExchangeServiceImpl.class);
    private static final SqlQueryMessagesFactory FACTORY = new SqlQueryMessagesFactory();
    private final MailboxRegistry mailboxRegistry;
    private final MessageService messageService;
    private final ClockService clockService;

    public ExchangeServiceImpl(MailboxRegistry mailboxRegistry, MessageService messageService, ClockService clockService) {
        this.mailboxRegistry = mailboxRegistry;
        this.messageService = messageService;
        this.clockService = clockService;
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.LifecycleAware
    public void start() {
        this.messageService.register((clusterNode, networkMessage) -> {
            onMessage(clusterNode, (QueryBatchRequestMessage) networkMessage);
        }, (short) 4);
        this.messageService.register((clusterNode2, networkMessage2) -> {
            onMessage(clusterNode2, (QueryBatchMessage) networkMessage2);
        }, (short) 3);
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.ExchangeService
    public CompletableFuture<Void> sendBatch(String str, ExecutionId executionId, long j, long j2, int i, boolean z, List<BinaryTupleMessage> list) {
        return this.messageService.send(str, FACTORY.queryBatchMessage().queryId(executionId.queryId()).executionToken(executionId.executionToken()).fragmentId(j).exchangeId(j2).batchId(i).last(z).rows(list).timestamp(this.clockService.now()).build());
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.ExchangeService
    public CompletableFuture<Void> request(String str, ExecutionId executionId, long j, long j2, int i, @Nullable SharedState sharedState) {
        return this.messageService.send(str, FACTORY.queryBatchRequestMessage().queryId(executionId.queryId()).executionToken(executionId.executionToken()).fragmentId(j).exchangeId(j2).amountOfBatches(i).sharedState(sharedState).build());
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.ExchangeService
    public CompletableFuture<Void> sendError(String str, ExecutionId executionId, long j, Throwable th) {
        TraceableException unwrapCause = ExceptionUtils.unwrapCause(th);
        if (!(unwrapCause instanceof TraceableException)) {
            TraceableException igniteInternalException = new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, th);
            unwrapCause = igniteInternalException;
            LOG.info(IgniteStringFormatter.format("Failed to execute query fragment: traceId={}, executionId={}, fragmentId={}", new Object[]{unwrapCause.traceId(), executionId, Long.valueOf(j)}), igniteInternalException);
        } else if (LOG.isDebugEnabled()) {
            LOG.debug(IgniteStringFormatter.format("Failed to execute query fragment: traceId={}, executionId={}, fragmentId={}", new Object[]{unwrapCause.traceId(), executionId, Long.valueOf(j)}), th);
        }
        return this.messageService.send(str, FACTORY.errorMessage().queryId(executionId.queryId()).executionToken(executionId.executionToken()).fragmentId(j).traceId(unwrapCause.traceId()).code(unwrapCause.code()).message(unwrapCause.getMessage()).build());
    }

    private void onMessage(ClusterNode clusterNode, QueryBatchRequestMessage queryBatchRequestMessage) {
        CompletableFuture<Outbox<?>> outbox = this.mailboxRegistry.outbox(new ExecutionId(queryBatchRequestMessage.queryId(), queryBatchRequestMessage.executionToken()), queryBatchRequestMessage.exchangeId());
        Consumer<? super Outbox<?>> consumer = outbox2 -> {
            try {
                SharedState sharedState = queryBatchRequestMessage.sharedState();
                if (sharedState != null) {
                    outbox2.onRewindRequest(clusterNode.name(), sharedState, queryBatchRequestMessage.amountOfBatches());
                } else {
                    outbox2.onRequest(clusterNode.name(), queryBatchRequestMessage.amountOfBatches());
                }
            } catch (Throwable th) {
                outbox2.onError(th);
                throw new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, "Unexpected exception", th);
            }
        };
        if (outbox.isDone()) {
            consumer.accept(outbox.join());
        } else {
            outbox.thenAccept(consumer);
        }
    }

    private void onMessage(ClusterNode clusterNode, QueryBatchMessage queryBatchMessage) {
        ExecutionId executionId = new ExecutionId(queryBatchMessage.queryId(), queryBatchMessage.executionToken());
        Inbox<?> inbox = this.mailboxRegistry.inbox(executionId, queryBatchMessage.exchangeId());
        if (inbox == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Stale batch message received: [nodeName={}, executionId={}, fragmentId={}, exchangeId={}, batchId={}]", new Object[]{clusterNode.name(), executionId, Long.valueOf(queryBatchMessage.fragmentId()), Long.valueOf(queryBatchMessage.exchangeId()), Integer.valueOf(queryBatchMessage.batchId())});
                return;
            }
            return;
        }
        try {
            inbox.onBatchReceived(clusterNode.name(), queryBatchMessage.batchId(), queryBatchMessage.last(), queryBatchMessage.rows());
        } catch (Throwable th) {
            inbox.onError(th);
            if (th instanceof IgniteException) {
                return;
            }
            LOG.warn("Unexpected exception", th);
        }
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.LifecycleAware
    public void stop() {
    }
}
