/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.sql.engine.exec;

import com.github.benmanes.caffeine.cache.Caffeine;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.time.Clock;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.Debuggable;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.InternalSqlRowImpl;
import org.apache.ignite.internal.sql.engine.InternalSqlRowSingleBoolean;
import org.apache.ignite.internal.sql.engine.InternalSqlRowSingleString;
import org.apache.ignite.internal.sql.engine.NodeLeftException;
import org.apache.ignite.internal.sql.engine.QueryCancel;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.SchemaAwareConverter;
import org.apache.ignite.internal.sql.engine.SqlOperationContext;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.AsyncDataCursor;
import org.apache.ignite.internal.sql.engine.exec.DclCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite.internal.sql.engine.exec.ExecutablePlan;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.ExecutionDependencyResolver;
import org.apache.ignite.internal.sql.engine.exec.ExecutionId;
import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
import org.apache.ignite.internal.sql.engine.exec.LogicalRelImplementor;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.RemoteFragmentExecutionException;
import org.apache.ignite.internal.sql.engine.exec.RemoteFragmentKey;
import org.apache.ignite.internal.sql.engine.exec.ResolvedDependencies;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.TxAttributes;
import org.apache.ignite.internal.sql.engine.exec.TxAwareAsyncCursor;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistry;
import org.apache.ignite.internal.sql.engine.exec.kill.KillCommand;
import org.apache.ignite.internal.sql.engine.exec.kill.KillCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentPrinter;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappedFragment;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappedFragments;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingParameters;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingUtils;
import org.apache.ignite.internal.sql.engine.exec.memory.MemoryContext;
import org.apache.ignite.internal.sql.engine.exec.memory.MemoryContextFactory;
import org.apache.ignite.internal.sql.engine.exec.memory.MemoryContextProvider;
import org.apache.ignite.internal.sql.engine.exec.memory.NoOpMemoryContext;
import org.apache.ignite.internal.sql.engine.exec.query.CopyHandler;
import org.apache.ignite.internal.sql.engine.exec.query.ShowHandler;
import org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode;
import org.apache.ignite.internal.sql.engine.exec.rel.AsyncRootNode;
import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
import org.apache.ignite.internal.sql.engine.externalize.RelJsonReader;
import org.apache.ignite.internal.sql.engine.message.ErrorMessage;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.QueryCloseMessage;
import org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
import org.apache.ignite.internal.sql.engine.message.QueryStartRequestV2;
import org.apache.ignite.internal.sql.engine.message.QueryStartResponse;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
import org.apache.ignite.internal.sql.engine.prepare.CopyPlan;
import org.apache.ignite.internal.sql.engine.prepare.DclPlan;
import org.apache.ignite.internal.sql.engine.prepare.DdlPlan;
import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan;
import org.apache.ignite.internal.sql.engine.prepare.ExplainablePlan;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
import org.apache.ignite.internal.sql.engine.prepare.KillPlan;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.prepare.ShowPlan;
import org.apache.ignite.internal.sql.engine.prepare.copy.CopyCommand;
import org.apache.ignite.internal.sql.engine.prepare.copy.CopyLocationSelect;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionContext;
import org.apache.ignite.internal.sql.engine.tx.QueryTransactionWrapper;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.IteratorToDataCursorAdapter;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.gridgain.internal.security.context.SecurityContext;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

public class ExecutionServiceImpl<RowT>
implements ExecutionService,
LogicalTopologyEventListener,
Debuggable {
    private static final int CACHE_SIZE = 1024;
    private static final IgniteLogger LOG = Loggers.forClass(ExecutionServiceImpl.class);
    private static final SqlQueryMessagesFactory FACTORY = new SqlQueryMessagesFactory();
    private static final List<InternalSqlRow> APPLIED_ANSWER = List.of(new InternalSqlRowSingleBoolean(true));
    private static final List<InternalSqlRow> NOT_APPLIED_ANSWER = List.of(new InternalSqlRowSingleBoolean(false));
    private static final FragmentDescription DUMMY_DESCRIPTION = new FragmentDescription(0L, true, (Long2ObjectMap<ColocationGroup>)Long2ObjectMaps.emptyMap(), null, null, null);
    private final ConcurrentMap<FragmentCacheKey, IgniteRel> physNodesCache = Caffeine.newBuilder().maximumSize(1024L).build().asMap();
    private final AtomicInteger executionTokenGen = new AtomicInteger();
    private final MessageService messageService;
    private final InternalClusterNode localNode;
    private final SqlSchemaManager sqlSchemaManager;
    private final QueryTaskExecutor taskExecutor;
    private final MappingService mappingService;
    private final RowHandler<RowT> handler;
    private final DdlCommandHandler ddlCmdHnd;
    private final DclCommandHandler dclCommandHandler;
    private final ShowHandler showHandler;
    private final CopyHandler copyHandler;
    private final ExecutionDependencyResolver dependencyResolver;
    private final ExecutableTableRegistry tableRegistry;
    private final ImplementorFactory<RowT> implementorFactory;
    private final Map<ExecutionId, DistributedQueryManager> queryManagerMap = new ConcurrentHashMap<ExecutionId, DistributedQueryManager>();
    private final long shutdownTimeout;
    private final ClockService clockService;
    private final KillCommandHandler killCommandHandler;
    private final Supplier<MemoryContextFactory<RowT>> memoryContextFactorySupplier;
    private final ExpressionFactory expressionFactory;

    public ExecutionServiceImpl(MessageService messageService, TopologyService topSrvc, MappingService mappingService, SqlSchemaManager sqlSchemaManager, DdlCommandHandler ddlCmdHnd, DclCommandHandler dclCommandHandler, ShowHandler showHandler, CopyHandler copyHandler, QueryTaskExecutor taskExecutor, RowHandler<RowT> handler, ExecutableTableRegistry tableRegistry, ExecutionDependencyResolver dependencyResolver, ImplementorFactory<RowT> implementorFactory, ClockService clockService, KillCommandHandler killCommandHandler, ExpressionFactory expressionFactory, long shutdownTimeout, Supplier<MemoryContextFactory<RowT>> memoryContextFactorySupplier) {
        this.localNode = topSrvc.localMember();
        this.handler = handler;
        this.messageService = messageService;
        this.mappingService = mappingService;
        this.sqlSchemaManager = sqlSchemaManager;
        this.taskExecutor = taskExecutor;
        this.ddlCmdHnd = ddlCmdHnd;
        this.dclCommandHandler = dclCommandHandler;
        this.showHandler = showHandler;
        this.copyHandler = copyHandler;
        this.tableRegistry = tableRegistry;
        this.dependencyResolver = dependencyResolver;
        this.implementorFactory = implementorFactory;
        this.clockService = clockService;
        this.killCommandHandler = killCommandHandler;
        this.expressionFactory = expressionFactory;
        this.shutdownTimeout = shutdownTimeout;
        this.memoryContextFactorySupplier = memoryContextFactorySupplier;
    }

    public static <RowT> ExecutionServiceImpl<RowT> create(TopologyService topSrvc, MessageService msgSrvc, SqlSchemaManager sqlSchemaManager, DdlCommandHandler ddlCommandHandler, DclCommandHandler dclCommandHandler, ShowHandler showHandler, CopyHandler copyHandler, QueryTaskExecutor taskExecutor, RowHandler<RowT> handler, MailboxRegistry mailboxRegistry, ExchangeService exchangeSrvc, MappingService mappingService, ExecutableTableRegistry tableRegistry, ExecutionDependencyResolver dependencyResolver, TableFunctionRegistry tableFunctionRegistry, ClockService clockService, KillCommandHandler killCommandHandler, ExpressionFactory expressionFactory, long shutdownTimeout, Supplier<MemoryContextFactory<RowT>> memoryContextFactorySupplier) {
        return new ExecutionServiceImpl<RowT>(msgSrvc, topSrvc, mappingService, sqlSchemaManager, ddlCommandHandler, dclCommandHandler, showHandler, copyHandler, taskExecutor, handler, tableRegistry, dependencyResolver, (ctx, deps) -> new LogicalRelImplementor(ctx, mailboxRegistry, exchangeSrvc, deps, tableFunctionRegistry), clockService, killCommandHandler, expressionFactory, shutdownTimeout, memoryContextFactorySupplier);
    }

    @Override
    public void start() {
        this.messageService.register((n, m) -> this.onMessage(n, (QueryStartRequest)m), (short)0);
        this.messageService.register((n, m) -> this.onMessage(n, (QueryStartRequest)m), (short)1000);
        this.messageService.register((n, m) -> this.onMessage(n, (QueryStartResponse)m), (short)1);
        this.messageService.register((n, m) -> this.onMessage(n, (QueryCloseMessage)m), (short)5);
        this.messageService.register((n, m) -> this.onMessage(n, (ErrorMessage)m), (short)2);
    }

    @TestOnly
    public DdlCommandHandler ddlCommandHandler() {
        return this.ddlCmdHnd;
    }

    @TestOnly
    public ShowHandler showHandler() {
        return this.showHandler;
    }

    @TestOnly
    public CopyHandler copyHandler() {
        return this.copyHandler;
    }

    private CompletableFuture<AsyncDataCursor<InternalSqlRow>> executeQuery(SqlOperationContext operationContext, MultiStepPlan plan) {
        ExecutionId executionid = this.nextExecutionId(operationContext.queryId());
        DistributedQueryManager queryManager = new DistributedQueryManager(executionid, this.localNode.name(), true, operationContext);
        DistributedQueryManager old = this.queryManagerMap.put(executionid, queryManager);
        assert (old == null);
        QueryTransactionContext txContext = operationContext.txContext();
        assert (txContext != null);
        boolean readOnly = plan.type().implicitTransactionReadOnlyMode();
        QueryTransactionWrapper txWrapper = txContext.getOrStartSqlManaged(readOnly, false, plan.hasCaches());
        InternalTransaction tx = txWrapper.unwrap();
        operationContext.notifyTxUsed(txWrapper);
        SqlQueryProcessor.PrefetchCallback prefetchCallback = queryManager.prefetchCallback;
        CompletionStage<Void> firstPageReady = prefetchCallback.prefetchFuture();
        if (plan.type() == SqlQueryType.DML) {
            firstPageReady = firstPageReady.thenCompose(none -> txWrapper.finalise());
        }
        CompletableFuture<Void> firstPageReady0 = firstPageReady;
        Predicate<String> nodeExclusionFilter = operationContext.nodeExclusionFilter();
        CompletionStage f = queryManager.execute(tx, plan, nodeExclusionFilter).thenApply(dataCursor -> new TxAwareAsyncCursor(txWrapper, dataCursor, firstPageReady0, x$0 -> queryManager.close((AsyncDataCursor.CancellationReason)x$0), operationContext::notifyError));
        return ((CompletableFuture)((CompletableFuture)f).handle((r, t) -> {
            if (t != null) {
                return txWrapper.finalise((Throwable)t).handle((none, finalizationErr) -> {
                    if (finalizationErr != null) {
                        t.addSuppressed((Throwable)finalizationErr);
                    }
                    ExceptionUtils.sneakyThrow((Throwable)t);
                    return null;
                });
            }
            return CompletableFuture.completedFuture(r);
        })).thenCompose(Function.identity());
    }

    private static SqlOperationContext createOperationContext(UUID queryId, ZoneId timeZoneId, Object[] params, HybridTimestamp operationTime, @Nullable String username, @Nullable Long topologyVersion) {
        return SqlOperationContext.builder().queryId(queryId).parameters(params).timeZoneId(timeZoneId).operationTime(operationTime).userName(username).topologyVersion(topologyVersion).build();
    }

    private IgniteRel relationalTreeFromJsonString(int catalogVersion, String jsonFragment) {
        IgniteSchemas schemas = this.sqlSchemaManager.schemas(catalogVersion);
        SchemaPlus rootSchema = schemas.root();
        return this.physNodesCache.computeIfAbsent(new FragmentCacheKey(catalogVersion, jsonFragment), key -> (IgniteRel)RelJsonReader.fromJson(rootSchema, key.fragmentString));
    }

    @Override
    public CompletableFuture<AsyncDataCursor<InternalSqlRow>> executePlan(QueryPlan plan, SqlOperationContext operationContext) {
        SqlQueryType queryType = plan.type();
        switch (queryType) {
            case DML: 
            case QUERY: {
                if (plan instanceof ExecutablePlan) {
                    return CompletableFuture.completedFuture(this.executeExecutablePlan(operationContext, (ExecutablePlan)((Object)plan)));
                }
                assert (plan instanceof MultiStepPlan) : plan.getClass();
                return this.executeQuery(operationContext, (MultiStepPlan)plan);
            }
            case EXPLAIN: {
                return CompletableFuture.completedFuture(this.executeExplain(operationContext, (ExplainPlan)plan));
            }
            case DDL: {
                return CompletableFuture.completedFuture(this.executeDdl(operationContext, (DdlPlan)plan));
            }
            case DCL: {
                return CompletableFuture.completedFuture(this.executeDcl(operationContext, (DclPlan)plan));
            }
            case KILL: {
                return CompletableFuture.completedFuture(this.executeKill((KillPlan)plan));
            }
            case SHOW: {
                return CompletableFuture.completedFuture(this.executeShow(operationContext, (ShowPlan)plan));
            }
            case COPY: {
                return this.executeCopy(operationContext, (CopyPlan)plan);
            }
        }
        throw new AssertionError((Object)("Unexpected query type: " + plan));
    }

    @Override
    public CompletableFuture<List<AsyncDataCursor<InternalSqlRow>>> executeDdlBatch(List<DdlPlan> batch, SecurityContext securityContext, Consumer<HybridTimestamp> activationTimeListener) {
        List<CatalogCommand> commands = batch.stream().map(DdlPlan::command).collect(Collectors.toList());
        return this.ddlCmdHnd.handle(commands).thenApply(result -> {
            activationTimeListener.accept(HybridTimestamp.hybridTimestamp((long)result.getCatalogTime()));
            ArrayList<IteratorToDataCursorAdapter<InternalSqlRow>> cursors = new ArrayList<IteratorToDataCursorAdapter<InternalSqlRow>>(commands.size());
            for (int i = 0; i < commands.size(); ++i) {
                List<InternalSqlRow> resultSet = result.isApplied(i) ? APPLIED_ANSWER : NOT_APPLIED_ANSWER;
                cursors.add(new IteratorToDataCursorAdapter<InternalSqlRow>(resultSet.iterator()));
            }
            return cursors;
        });
    }

    private CompletableFuture<AsyncDataCursor<InternalSqlRow>> executeCopy(SqlOperationContext operationContext, CopyPlan plan) {
        CopyCommand cmd = plan.copyCommand();
        if (cmd.from() instanceof CopyLocationSelect) {
            CopyLocationSelect location = (CopyLocationSelect)cmd.from();
            return this.executePlan(location.plan(), operationContext).thenApply(locationCursor -> {
                location.cursor((AsyncCursor<InternalSqlRow>)locationCursor);
                return this.executeCopyHavingLocationCursorResolved(cmd, operationContext);
            });
        }
        return CompletableFuture.completedFuture(this.executeCopyHavingLocationCursorResolved(cmd, operationContext));
    }

    private AsyncDataCursor<InternalSqlRow> executeCopyHavingLocationCursorResolved(CopyCommand cmd, SqlOperationContext operationContext) {
        QueryCancel queryCancel = operationContext.cancel();
        assert (queryCancel != null);
        CompletableFuture result = this.copyHandler.handle(cmd);
        queryCancel.add(timeout -> {
            if (!timeout) {
                return;
            }
            result.completeExceptionally((Throwable)((Object)new QueryCancelledException("Query timeout")));
        });
        return new IteratorToDataCursorAdapter<InternalSqlRow>(result, Runnable::run);
    }

    private AsyncDataCursor<InternalSqlRow> executeShow(SqlOperationContext operationContext, ShowPlan plan) {
        QueryCancel queryCancel = operationContext.cancel();
        assert (queryCancel != null);
        CompletableFuture result = this.showHandler.handle(plan.showCommand());
        queryCancel.add(timeout -> {
            if (!timeout) {
                return;
            }
            result.completeExceptionally((Throwable)((Object)new QueryCancelledException("Query timeout")));
        });
        return new IteratorToDataCursorAdapter<InternalSqlRow>(result, Runnable::run);
    }

    private AsyncDataCursor<InternalSqlRow> executeExecutablePlan(SqlOperationContext operationContext, ExecutablePlan plan) {
        AsyncDataCursor<InternalSqlRow> dataCursor;
        ExecutionId executionId = this.nextExecutionId(operationContext.queryId());
        ExecutionContext<RowT> ectx = new ExecutionContext<RowT>(this.expressionFactory, this.taskExecutor, executionId, this.localNode, this.localNode.name(), this.localNode.id(), DUMMY_DESCRIPTION, this.handler, Commons.parametersMap(operationContext.parameters()), TxAttributes.dummy(), operationContext.timeZoneId(), NoOpMemoryContext.instance(), null, this.tableRegistry, -1, Clock.systemUTC(), operationContext.userName(), null);
        QueryTransactionContext txContext = operationContext.txContext();
        assert (txContext != null);
        ExplainablePlan queryPlan = (ExplainablePlan)((Object)plan);
        boolean readOnly = queryPlan.type().implicitTransactionReadOnlyMode();
        QueryTransactionWrapper txWrapper = txContext.getOrStartSqlManaged(readOnly, true, plan.hasCaches());
        operationContext.notifyTxUsed(txWrapper);
        try {
            dataCursor = plan.execute(ectx, txWrapper.unwrap(), this.tableRegistry);
        }
        catch (Throwable t) {
            dataCursor = new IteratorToDataCursorAdapter<InternalSqlRow>(CompletableFuture.failedFuture(t), Runnable::run);
        }
        return new TxAwareAsyncCursor<InternalSqlRow>(txWrapper, (AsyncCursor<InternalSqlRow>)dataCursor, dataCursor.onFirstPageReady(), arg_0 -> dataCursor.cancelAsync(arg_0), operationContext::notifyError);
    }

    private AsyncDataCursor<InternalSqlRow> executeDdl(SqlOperationContext operationContext, DdlPlan plan) {
        CompletionStage ret = this.ddlCmdHnd.handle(plan.command()).thenApply(result -> {
            QueryTransactionContext txCtx = operationContext.txContext();
            assert (txCtx != null);
            txCtx.updateObservableTime(HybridTimestamp.hybridTimestamp((long)result.getCatalogTime()));
            List<InternalSqlRow> resultSet = result.isApplied(0) ? APPLIED_ANSWER : NOT_APPLIED_ANSWER;
            return resultSet.iterator();
        });
        return new IteratorToDataCursorAdapter<InternalSqlRow>((CompletableFuture<Iterator<InternalSqlRow>>)ret, Runnable::run);
    }

    private AsyncDataCursor<InternalSqlRow> executeDcl(SqlOperationContext operationContext, DclPlan plan) {
        CompletionStage ret = this.dclCommandHandler.handle(plan.command()).thenApply(applied -> {
            List<InternalSqlRow> resultSet = applied != false ? APPLIED_ANSWER : NOT_APPLIED_ANSWER;
            return resultSet.iterator();
        });
        return new IteratorToDataCursorAdapter<InternalSqlRow>((CompletableFuture<Iterator<InternalSqlRow>>)ret, Runnable::run);
    }

    private AsyncDataCursor<InternalSqlRow> executeKill(KillPlan plan) {
        KillCommand cmd = plan.command();
        CompletionStage ret = ((CompletableFuture)this.killCommandHandler.handle(cmd).thenApply(cancelled -> (cancelled != false ? APPLIED_ANSWER : NOT_APPLIED_ANSWER).iterator())).exceptionally(th -> {
            Throwable e = ExceptionUtils.unwrapCause((Throwable)th);
            if (e instanceof IgniteInternalCheckedException) {
                throw new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, "Failed to execute KILL statement [command=" + cmd + ", err=" + e.getMessage() + "]", e);
            }
            throw e instanceof RuntimeException ? (RuntimeException)e : new IgniteInternalException(ErrorGroups.Common.INTERNAL_ERR, e);
        });
        return new IteratorToDataCursorAdapter<InternalSqlRow>((CompletableFuture<Iterator<InternalSqlRow>>)ret, Runnable::run);
    }

    private AsyncDataCursor<InternalSqlRow> executeExplain(SqlOperationContext operationContext, ExplainPlan plan) {
        switch (plan.mode()) {
            case PLAN: {
                String planString = plan.plan().explain();
                InternalSqlRowSingleString res = new InternalSqlRowSingleString(planString);
                return new IteratorToDataCursorAdapter<InternalSqlRow>(List.of(res).iterator());
            }
            case MAPPING: {
                CompletableFuture<MappedFragments> mappedFragments;
                if (plan.plan() instanceof MultiStepPlan) {
                    QueryTransactionContext txContext = operationContext.txContext();
                    assert (txContext != null);
                    boolean readOnly = plan.type().implicitTransactionReadOnlyMode();
                    QueryTransactionWrapper txWrapper = txContext.explicitTx();
                    if (txWrapper != null) {
                        InternalTransaction tx = txWrapper.unwrap();
                        readOnly = tx.isReadOnly();
                    }
                    Predicate<String> nodeExclusionFilter = operationContext.nodeExclusionFilter();
                    MappingParameters mappingParameters = MappingParameters.create(operationContext.parameters(), readOnly, nodeExclusionFilter);
                    mappedFragments = this.mappingService.map((MultiStepPlan)plan.plan(), mappingParameters);
                } else {
                    MappedFragment singleNodeMapping = MappingUtils.createSingleNodeMapping(this.localNode.name(), plan.plan().getRel());
                    mappedFragments = CompletableFuture.completedFuture(new MappedFragments(List.of(singleNodeMapping), 0L));
                }
                CompletionStage fragments0 = ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)mappedFragments.thenApply(mfs -> FragmentPrinter.fragmentsToString(false, mfs.fragments()))).thenApply(InternalSqlRowSingleString::new)).thenApply(InternalSqlRow.class::cast)).thenApply(List::of)).thenApply(List::iterator);
                return new IteratorToDataCursorAdapter<InternalSqlRow>((CompletableFuture<Iterator<InternalSqlRow>>)fragments0, Runnable::run);
            }
        }
        throw new IllegalArgumentException("Unsupported mode: " + plan.mode());
    }

    private void onMessage(InternalClusterNode node, QueryStartRequest msg) {
        assert (node != null && msg != null);
        CompletableFuture<Void> fut = this.sqlSchemaManager.schemaReadyFuture(msg.catalogVersion());
        if (fut.isDone()) {
            this.submitFragment(node, msg);
        } else {
            fut.whenComplete((mgr, ex) -> {
                if (ex != null) {
                    this.handleError((Throwable)ex, node.name(), msg);
                    return;
                }
                this.taskExecutor.execute(msg.queryId(), msg.fragmentId(), () -> this.submitFragment(node, msg));
            });
        }
    }

    private void onMessage(InternalClusterNode node, QueryStartResponse msg) {
        assert (node != null && msg != null);
        DistributedQueryManager dqm = this.queryManagerMap.get(new ExecutionId(msg.queryId(), msg.executionToken()));
        if (dqm != null) {
            dqm.acknowledgeFragment(node.name(), msg.fragmentId(), msg.error());
        }
    }

    private void onMessage(InternalClusterNode node, ErrorMessage msg) {
        assert (node != null && msg != null);
        DistributedQueryManager dqm = this.queryManagerMap.get(new ExecutionId(msg.queryId(), msg.executionToken()));
        if (dqm != null) {
            RemoteFragmentExecutionException e = new RemoteFragmentExecutionException(node.name(), msg.queryId(), msg.fragmentId(), msg.traceId(), msg.code(), msg.message());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Query remote fragment execution failed [nodeName={}, queryId={}, fragmentId={}, originalMessage={}]", new Object[]{node.name(), e.queryId(), e.fragmentId(), e.getMessage()});
            }
            dqm.onError(e);
        }
    }

    private void onMessage(InternalClusterNode node, QueryCloseMessage msg) {
        assert (node != null && msg != null);
        DistributedQueryManager dqm = this.queryManagerMap.get(new ExecutionId(msg.queryId(), msg.executionToken()));
        if (dqm != null) {
            dqm.close(AsyncDataCursor.CancellationReason.CANCEL);
        }
    }

    @Override
    public void stop() throws Exception {
        CompletableFuture<Void> f = CompletableFuture.allOf((CompletableFuture[])this.queryManagerMap.values().stream().filter(mgr -> mgr.rootFragmentId != null).map(mgr -> mgr.close(AsyncDataCursor.CancellationReason.CANCEL)).toArray(CompletableFuture[]::new));
        try {
            f.get(this.shutdownTimeout, TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            String message = IgniteStringFormatter.format((String)"SQL execution service could not be stopped within the specified timeout ({} ms).", (Object[])new Object[]{this.shutdownTimeout});
            LOG.warn(message + this.dumpDebugInfo() + ExecutionServiceImpl.dumpThreads(), new Object[0]);
        }
        catch (CancellationException e) {
            LOG.warn("The stop future was cancelled, going to proceed the stop procedure", (Throwable)e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.warn("The stop future was interrupted, going to proceed the stop procedure", (Throwable)e);
        }
    }

    public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) {
        this.queryManagerMap.values().forEach(qm -> qm.onNodeLeft(leftNode.name(), newTopology.version()));
    }

    @TestOnly
    public List<AbstractNode<?>> localFragments(UUID queryId) {
        return this.queryManagerMap.entrySet().stream().filter(e -> ((ExecutionId)e.getKey()).queryId().equals(queryId)).flatMap(e -> ((DistributedQueryManager)e.getValue()).localFragments().stream()).collect(Collectors.toList());
    }

    private void submitFragment(InternalClusterNode initiatorNode, QueryStartRequest msg) {
        DistributedQueryManager queryManager = this.getOrCreateQueryManager(initiatorNode.name(), msg);
        queryManager.submitFragment(initiatorNode, msg.catalogVersion(), msg.root(), msg.fragmentDescription(), msg.txAttributes(), msg instanceof QueryStartRequestV2 ? ((QueryStartRequestV2)msg).topologyVersion() : null);
    }

    private void handleError(Throwable ex, String nodeName, QueryStartRequest msg) {
        DistributedQueryManager queryManager = this.getOrCreateQueryManager(nodeName, msg);
        queryManager.handleError(ex, nodeName, msg.fragmentDescription().fragmentId());
    }

    private DistributedQueryManager getOrCreateQueryManager(String coordinatorNodeName, QueryStartRequest msg) {
        return this.queryManagerMap.computeIfAbsent(new ExecutionId(msg.queryId(), msg.executionToken()), key -> {
            SqlOperationContext operationContext = ExecutionServiceImpl.createOperationContext(key.queryId(), ZoneId.of(msg.timeZoneId()), msg.parameters(), msg.operationTime(), msg instanceof QueryStartRequestV2 ? ((QueryStartRequestV2)msg).username() : null, msg instanceof QueryStartRequestV2 ? ((QueryStartRequestV2)msg).topologyVersion() : null);
            return new DistributedQueryManager((ExecutionId)key, coordinatorNodeName, operationContext);
        });
    }

    @TestOnly
    public String dumpDebugInfo() {
        IgniteStringBuilder writer = new IgniteStringBuilder();
        this.dumpState(writer, "");
        return writer.length() > 0 ? writer.toString() : " No debug information available.";
    }

    @TestOnly
    public void dumpState(IgniteStringBuilder writer, String indent) {
        String indent0 = Debuggable.childIndentation((String)indent);
        for (Map.Entry<ExecutionId, DistributedQueryManager> entry : this.queryManagerMap.entrySet()) {
            long fragmentId;
            List localFragments;
            ExecutionId executionId = entry.getKey();
            DistributedQueryManager mgr = entry.getValue();
            writer.app(indent).app("Debug info for query: ").app((Object)executionId).app(" (canceled=").app(mgr.cancelled.get()).app(", stopped=").app(mgr.cancelFut.isDone()).app(")").nl();
            writer.app(indent0).app("Coordinator node: ").app(mgr.coordinatorNodeName).app(mgr.coordinator ? " (current node)" : "").nl();
            CompletableFuture rootNodeFut = mgr.root;
            if (rootNodeFut != null) {
                writer.app(indent0).app("Root node state: ");
                try {
                    AsyncRootNode rootNode = rootNodeFut.getNow(null);
                    String state = rootNode == null ? "absent" : (rootNode.isClosed() ? "closed" : "opened");
                    writer.app(state);
                }
                catch (CompletionException ex) {
                    writer.app("completed exceptionally ").app('(').app((Object)ExceptionUtils.unwrapCause((Throwable)ex)).app(')');
                }
                catch (CancellationException ex) {
                    writer.app("canceled");
                }
                writer.nl();
            }
            writer.nl();
            List initFragments = mgr.remoteFragmentInitCompletion.entrySet().stream().filter(entry0 -> !((CompletableFuture)entry0.getValue()).isDone()).map(Map.Entry::getKey).sorted(Comparator.comparingLong(RemoteFragmentKey::fragmentId)).collect(Collectors.toList());
            if (!initFragments.isEmpty()) {
                writer.app(indent0).app("Fragments awaiting init completion:").nl();
                String fragmentIndent = Debuggable.childIndentation((String)indent0);
                for (RemoteFragmentKey fragmentKey : initFragments) {
                    writer.app(fragmentIndent).app("id=").app(fragmentKey.fragmentId()).app(", node=").app(fragmentKey.nodeName());
                    writer.nl();
                }
                writer.nl();
            }
            if ((localFragments = mgr.localFragments().stream().sorted(Comparator.comparingLong(n -> n.context().fragmentId())).collect(Collectors.toList())).isEmpty()) continue;
            writer.app(indent0).app("Local fragments:").nl();
            String fragmentIndent = Debuggable.childIndentation((String)indent0);
            for (AbstractNode fragment : localFragments) {
                fragmentId = fragment.context().fragmentId();
                writer.app(fragmentIndent).app("id=").app(fragmentId).app(", state=").app(fragment.isClosed() ? "closed" : "opened").app(", canceled=").app(fragment.context().isCancelled()).app(", class=").app(fragment.getClass().getSimpleName());
                Long rootFragmentId = mgr.rootFragmentId;
                writer.app(rootFragmentId != null && rootFragmentId == fragmentId ? " (root)" : "");
                writer.nl();
            }
            writer.nl();
            for (AbstractNode fragment : localFragments) {
                fragmentId = fragment.context().fragmentId();
                writer.app(indent0).app("Fragment#").app(fragmentId).app(" tree:").nl();
                fragment.dumpState(writer, Debuggable.childIndentation((String)indent0));
                writer.nl();
            }
        }
    }

    private static String dumpThreads() {
        ThreadMXBean bean = ManagementFactory.getThreadMXBean();
        ThreadInfo[] infos = bean.dumpAllThreads(bean.isObjectMonitorUsageSupported(), bean.isSynchronizerUsageSupported());
        IgniteStringBuilder buf = new IgniteStringBuilder();
        buf.nl().nl().app("Dumping threads:").nl().nl();
        for (ThreadInfo info : infos) {
            if (info == null) continue;
            buf.app(info.toString()).nl();
        }
        return buf.toString();
    }

    private ExecutionId nextExecutionId(UUID queryId) {
        return new ExecutionId(queryId, this.executionTokenGen.getAndIncrement());
    }

    @FunctionalInterface
    public static interface ImplementorFactory<RowT> {
        public LogicalRelImplementor<RowT> create(ExecutionContext<RowT> var1, ResolvedDependencies var2);
    }

    private class DistributedQueryManager {
        private final ExecutionId executionId;
        private final boolean coordinator;
        private final String coordinatorNodeName;
        private final SqlOperationContext ctx;
        private final CompletableFuture<Void> cancelFut = new CompletableFuture();
        private final SqlQueryProcessor.PrefetchCallback prefetchCallback = new SqlQueryProcessor.PrefetchCallback();
        private final AtomicBoolean cancelled = new AtomicBoolean();
        private final Map<RemoteFragmentKey, CompletableFuture<Void>> remoteFragmentInitCompletion = new HashMap<RemoteFragmentKey, CompletableFuture<Void>>();
        private final Queue<AbstractNode<RowT>> localFragments = new ConcurrentLinkedQueue();
        @Nullable
        private final CompletableFuture<AsyncRootNode<RowT, InternalSqlRow>> root;
        private final Object initMux = new Object();
        private volatile Long rootFragmentId = null;
        @Nullable
        private volatile Long topologyVersion;
        private final MemoryContextFactory<RowT> memoryContextFactory;
        private final MemoryContextProvider<RowT> memoryContextProvider;

        private DistributedQueryManager(ExecutionId executionId, String coordinatorNodeName, boolean coordinator, SqlOperationContext ctx) {
            this.executionId = executionId;
            this.ctx = ctx;
            this.coordinator = coordinator;
            this.coordinatorNodeName = coordinatorNodeName;
            this.memoryContextFactory = ExecutionServiceImpl.this.memoryContextFactorySupplier.get();
            this.memoryContextProvider = this.memoryContextFactory.createMemoryContextProvider();
            if (coordinator) {
                CompletableFuture root = new CompletableFuture();
                root.exceptionally(t -> {
                    this.close(AsyncDataCursor.CancellationReason.CANCEL);
                    return null;
                });
                this.root = root;
            } else {
                this.root = null;
            }
            this.topologyVersion = ctx.topologyVersion();
        }

        private DistributedQueryManager(ExecutionId executionId, String coordinatorNodeName, SqlOperationContext ctx) {
            this(executionId, coordinatorNodeName, false, ctx);
        }

        private List<AbstractNode<?>> localFragments() {
            return List.copyOf(this.localFragments);
        }

        private CompletableFuture<Void> sendFragment(String targetNodeName, String serialisedFragment, FragmentDescription desc, TxAttributes txAttributes, int catalogVersion, long topologyVersion) {
            QueryStartRequestV2 request = FACTORY.queryStartRequestV2().queryId(this.executionId.queryId()).executionToken(this.executionId.executionToken()).fragmentId(desc.fragmentId()).root(serialisedFragment).fragmentDescription(desc).parameters(this.ctx.parameters()).txAttributes(txAttributes).catalogVersion(catalogVersion).timeZoneId(this.ctx.timeZoneId().getId()).operationTime(this.ctx.operationTime()).timestamp(ExecutionServiceImpl.this.clockService.now()).username(this.ctx.userName()).topologyVersion(topologyVersion).build();
            return ExecutionServiceImpl.this.messageService.send(targetNodeName, request);
        }

        private void acknowledgeFragment(String nodeName, long fragmentId, @Nullable Throwable ex) {
            if (ex != null) {
                Long rootFragmentId0 = this.rootFragmentId;
                if (rootFragmentId0 != null && fragmentId == rootFragmentId0) {
                    this.root.completeExceptionally(ex);
                } else {
                    this.root.thenAccept(root -> {
                        root.onError(ex);
                        this.close(AsyncDataCursor.CancellationReason.CANCEL);
                    });
                }
            }
            this.remoteFragmentInitCompletion.get(new RemoteFragmentKey(nodeName, fragmentId)).complete(null);
        }

        private void onError(RemoteFragmentExecutionException ex) {
            this.root.thenAccept(root -> {
                root.onError((Throwable)((Object)ex));
                this.close(AsyncDataCursor.CancellationReason.CANCEL);
            });
        }

        private void onNodeLeft(String nodeName, long topologyVersion) {
            Long mappingTopologyVersion = this.topologyVersion;
            if (mappingTopologyVersion != null && mappingTopologyVersion > topologyVersion) {
                return;
            }
            this.remoteFragmentInitCompletion.entrySet().stream().filter(e -> nodeName.equals(((RemoteFragmentKey)e.getKey()).nodeName())).forEach(e -> ((CompletableFuture)e.getValue()).completeExceptionally((Throwable)((Object)new NodeLeftException(nodeName))));
        }

        private CompletableFuture<Void> executeFragment(IgniteRel treeRoot, ResolvedDependencies deps, ExecutionContext<RowT> ectx) {
            String origNodeName = ectx.originatingNodeName();
            AbstractNode node = (AbstractNode)ExecutionServiceImpl.this.implementorFactory.create(ectx, deps).go(treeRoot);
            this.localFragments.add(node);
            if (!(node instanceof Outbox)) {
                SchemaAwareConverter<Object, Object> internalTypeConverter = TypeUtils.resultTypeConverter(treeRoot.getRowType());
                AsyncRootNode<Object, InternalSqlRow> rootNode = new AsyncRootNode<Object, InternalSqlRow>(ectx, ectx.rowHandler().factory(TypeUtils.convertStructuredType(treeRoot.getRowType())), node, inRow -> new InternalSqlRowImpl<Object>(inRow, ectx.rowHandler(), internalTypeConverter));
                node.onRegister(rootNode);
                rootNode.startPrefetch().whenCompleteAsync((res, err) -> this.prefetchCallback.onPrefetchComplete((Throwable)err), (Executor)ExecutionServiceImpl.this.taskExecutor);
                assert (this.root != null);
                this.root.complete(rootNode);
            }
            CompletableFuture<Void> sendingResult = ExecutionServiceImpl.this.messageService.send(origNodeName, FACTORY.queryStartResponse().queryId(ectx.queryId()).executionToken(ectx.executionToken()).fragmentId(ectx.fragmentId()).build());
            if (node instanceof Outbox) {
                ((Outbox)node).prefetch();
            }
            return sendingResult;
        }

        private ExecutionContext<RowT> createContext(InternalClusterNode initiatorNode, FragmentDescription desc, TxAttributes txAttributes, @Nullable Long topologyVersion, MemoryContextFactory<RowT> memoryContextFactory, MemoryContextProvider<RowT> memoryContextProvider) {
            MemoryContext memCtx = memoryContextProvider.create();
            return new ExecutionContext(ExecutionServiceImpl.this.expressionFactory, ExecutionServiceImpl.this.taskExecutor, this.executionId, ExecutionServiceImpl.this.localNode, initiatorNode.name(), initiatorNode.id(), desc, ExecutionServiceImpl.this.handler, Commons.parametersMap(this.ctx.parameters()), txAttributes, this.ctx.timeZoneId(), memCtx, memoryContextFactory.createRowStorageFactory(memCtx, this.executionId, desc.fragmentId()), ExecutionServiceImpl.this.tableRegistry, -1, Clock.systemUTC(), this.ctx.userName(), topologyVersion);
        }

        private void submitFragment(InternalClusterNode initiatorNode, int catalogVersion, String fragmentString, FragmentDescription desc, TxAttributes txAttributes, @Nullable Long topologyVersion) {
            try {
                ExecutionContext context = this.createContext(initiatorNode, desc, txAttributes, topologyVersion, this.memoryContextFactory, this.memoryContextProvider);
                IgniteRel treeRoot = ExecutionServiceImpl.this.relationalTreeFromJsonString(catalogVersion, fragmentString);
                ResolvedDependencies resolvedDependencies = ExecutionServiceImpl.this.dependencyResolver.resolveDependencies(List.of(treeRoot), catalogVersion);
                this.executeFragment(treeRoot, resolvedDependencies, context).exceptionally(ex -> {
                    this.handleError((Throwable)ex, initiatorNode.name(), desc.fragmentId());
                    return null;
                });
            }
            catch (Throwable ex2) {
                this.handleError(ex2, initiatorNode.name(), desc.fragmentId());
            }
        }

        private void handleError(Throwable ex, String initiatorNode, long fragmentId) {
            LOG.debug("Unable to start query fragment", ex);
            try {
                ExecutionServiceImpl.this.messageService.send(initiatorNode, FACTORY.queryStartResponse().queryId(this.executionId.queryId()).executionToken(this.executionId.executionToken()).fragmentId(fragmentId).error(ex).build());
            }
            catch (Exception e) {
                LOG.info("Unable to send error message", (Throwable)e);
                this.close(AsyncDataCursor.CancellationReason.CANCEL);
            }
        }

        private CompletableFuture<AsyncCursor<InternalSqlRow>> execute(InternalTransaction tx, MultiStepPlan multiStepPlan, @Nullable Predicate<String> nodeExclusionFilter) {
            assert (this.root != null);
            boolean mapOnBackups = tx.isReadOnly();
            MappingParameters mappingParameters = MappingParameters.create(this.ctx.parameters(), mapOnBackups, nodeExclusionFilter);
            return ((CompletableFuture)ExecutionServiceImpl.this.mappingService.map(multiStepPlan, mappingParameters).thenComposeAsync(mappedFragments -> this.sendFragments(tx, multiStepPlan, (MappedFragments)mappedFragments), (Executor)ExecutionServiceImpl.this.taskExecutor)).thenApply(this::wrapRootNode);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private CompletableFuture<AsyncCursor<InternalSqlRow>> sendFragments(InternalTransaction tx, MultiStepPlan multiStepPlan, MappedFragments mappedFragmentList) {
            long topologyVersion = mappedFragmentList.topologyVersion();
            this.topologyVersion = topologyVersion;
            List<MappedFragment> mappedFragments = mappedFragmentList.fragments();
            assert (!CollectionUtils.nullOrEmpty(mappedFragments) && mappedFragments.get(0).fragment().rootFragment()) : mappedFragments;
            if (!tx.isReadOnly()) {
                for (MappedFragment mappedFragment : mappedFragments) {
                    this.enlistPartitions(mappedFragment, tx);
                }
            }
            Object object = this.initMux;
            synchronized (object) {
                QueryCancel queryCancel = this.ctx.cancel();
                if (queryCancel != null) {
                    queryCancel.throwIfCancelled();
                }
                for (MappedFragment mappedFragment : mappedFragments) {
                    if (mappedFragment.fragment().rootFragment()) {
                        assert (this.rootFragmentId == null);
                        this.rootFragmentId = mappedFragment.fragment().fragmentId();
                    }
                    for (String nodeName : mappedFragment.nodes()) {
                        this.remoteFragmentInitCompletion.put(new RemoteFragmentKey(nodeName, mappedFragment.fragment().fragmentId()), new CompletableFuture());
                    }
                }
            }
            TxAttributes attributes = TxAttributes.fromTx(tx);
            ArrayList<CompletableFuture<Void>> resultsOfFragmentSending = new ArrayList<CompletableFuture<Void>>();
            for (MappedFragment mappedFragment : mappedFragments) {
                Fragment fragment = mappedFragment.fragment();
                FragmentDescription fragmentDesc = new FragmentDescription(fragment.fragmentId(), !fragment.correlated(), mappedFragment.groupsBySourceId(), mappedFragment.target(), mappedFragment.sourcesByExchangeId(), mappedFragment.partitionPruningMetadata());
                for (String nodeName : mappedFragment.nodes()) {
                    CompletableFuture<Void> resultOfSending = this.sendFragment(nodeName, fragment.serialized(), fragmentDesc, attributes, multiStepPlan.catalogVersion(), topologyVersion);
                    resultOfSending.whenComplete((ignored, t) -> {
                        if (t == null) {
                            return;
                        }
                        CompletableFuture<Void> completionFuture = this.remoteFragmentInitCompletion.get(new RemoteFragmentKey(nodeName, fragment.fragmentId()));
                        if (completionFuture != null) {
                            completionFuture.complete(null);
                        }
                    });
                    resultsOfFragmentSending.add(resultOfSending);
                }
            }
            return ((CompletableFuture)CompletableFutures.allOf(resultsOfFragmentSending).handle((ignoredVal, ignoredTh) -> {
                if (ignoredTh == null) {
                    return this.root;
                }
                Throwable error = Commons.deriveExceptionFromListOfFutures(resultsOfFragmentSending);
                assert (error != null);
                return ((CompletableFuture)CompletableFutures.allOf(this.remoteFragmentInitCompletion.values()).thenCompose(none -> {
                    if (!this.root.completeExceptionally(error)) {
                        this.root.thenAccept(root -> root.onError(error));
                        this.close(AsyncDataCursor.CancellationReason.CANCEL);
                    }
                    return this.cancelFut.thenRun(() -> ExceptionUtils.sneakyThrow((Throwable)error));
                })).thenCompose(none -> this.root);
            })).thenCompose(Commons::cast);
        }

        private void enlistPartitions(final MappedFragment mappedFragment, final InternalTransaction tx) {
            if (mappedFragment.fragment().tables().isEmpty()) {
                return;
            }
            new IgniteRelShuttle(){

                @Override
                public IgniteRel visit(IgniteIndexScan rel) {
                    this.enlist(rel);
                    return super.visit(rel);
                }

                @Override
                public IgniteRel visit(IgniteTableScan rel) {
                    this.enlist(rel);
                    return super.visit(rel);
                }

                @Override
                public IgniteRel visit(IgniteTableModify rel) {
                    this.enlist(rel);
                    return super.visit(rel);
                }

                private void enlist(IgniteTable table, Int2ObjectMap<NodeWithConsistencyToken> assignments) {
                    int tableId = table.id();
                    int zoneId = table.zoneId();
                    assert (!table.useSecondaryStorage());
                    if (assignments.isEmpty()) {
                        return;
                    }
                    int partsCnt = assignments.size();
                    if (!tx.external()) {
                        tx.assignCommitPartition(new ZonePartitionId(zoneId, ThreadLocalRandom.current().nextInt(partsCnt)));
                    } else {
                        tx.assignCommitPartition(ZonePartitionId.NOT_EXISTING);
                    }
                    for (Int2ObjectMap.Entry partWithToken : assignments.int2ObjectEntrySet()) {
                        ZonePartitionId replicationGroupId = new ZonePartitionId(zoneId, partWithToken.getIntKey());
                        NodeWithConsistencyToken assignment = (NodeWithConsistencyToken)partWithToken.getValue();
                        tx.enlist(replicationGroupId, tableId, assignment.name(), assignment.enlistmentConsistencyToken());
                    }
                }

                private void enlist(SourceAwareIgniteRel rel) {
                    IgniteTable table = (IgniteTable)rel.getTable().unwrap(IgniteTable.class);
                    ColocationGroup colocationGroup = (ColocationGroup)mappedFragment.groupsBySourceId().get(rel.sourceId());
                    Int2ObjectMap<NodeWithConsistencyToken> assignments = colocationGroup.assignments();
                    this.enlist(table, assignments);
                }
            }.visit(mappedFragment.fragment().root());
        }

        private CompletableFuture<Void> close(AsyncDataCursor.CancellationReason reason) {
            if (!this.cancelled.compareAndSet(false, true)) {
                return this.cancelFut;
            }
            CompletableFuture<Void> start = new CompletableFuture<Void>();
            CompletionStage stage = this.coordinator ? ((CompletableFuture)start.thenCompose(ignored -> this.closeRootNode(reason))).thenCompose(ignored -> this.awaitFragmentInitialisationAndClose()) : ((CompletableFuture)start.thenCompose(ignored -> ExecutionServiceImpl.this.messageService.send(this.coordinatorNodeName, FACTORY.queryCloseMessage().queryId(this.executionId.queryId()).executionToken(this.executionId.executionToken()).build()))).thenCompose(ignored -> this.closeLocalFragments());
            ((CompletableFuture)((CompletableFuture)stage).whenComplete((r, e) -> {
                if (e != null) {
                    Throwable ex = ExceptionUtils.unwrapCause((Throwable)e);
                    LOG.warn("Fragment closing processed with errors: [queryId={}]", ex, new Object[]{this.ctx.queryId()});
                }
                ExecutionServiceImpl.this.queryManagerMap.remove(this.executionId);
                this.cancelFut.complete(null);
            })).whenComplete((r, t) -> {
                this.localFragments.forEach(f -> f.context().cancel());
                this.memoryContextProvider.close();
            });
            start.completeAsync(() -> null, ExecutionServiceImpl.this.taskExecutor);
            return this.cancelFut;
        }

        private CompletableFuture<Void> closeLocalFragments() {
            ArrayList localFragmentCompletions = new ArrayList();
            for (AbstractNode abstractNode : this.localFragments) {
                assert (!abstractNode.context().isCancelled()) : "node context is cancelled, but node still processed";
                localFragmentCompletions.add(abstractNode.context().submit(() -> {
                    Throwable th = null;
                    try {
                        node.close();
                    }
                    catch (Throwable th0) {
                        th = th0;
                    }
                    try {
                        node.context().cleanUp();
                    }
                    catch (Throwable th0) {
                        if (th == null) {
                            th = th0;
                        }
                        th.addSuppressed(th0);
                    }
                    if (th != null) {
                        node.onError(th);
                    }
                }, abstractNode::onError));
            }
            return CompletableFuture.allOf(localFragmentCompletions.toArray(new CompletableFuture[0]));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private CompletableFuture<Void> awaitFragmentInitialisationAndClose() {
            HashMap<String, List> requestsPerNode = new HashMap<String, List>();
            Object object = this.initMux;
            synchronized (object) {
                for (Map.Entry<RemoteFragmentKey, CompletableFuture<Void>> entry : this.remoteFragmentInitCompletion.entrySet()) {
                    requestsPerNode.computeIfAbsent(entry.getKey().nodeName(), key -> new ArrayList()).add(entry.getValue());
                }
            }
            ArrayList<CompletionStage> cancelFuts = new ArrayList<CompletionStage>();
            for (Map.Entry<RemoteFragmentKey, CompletableFuture<Void>> entry : requestsPerNode.entrySet()) {
                String nodeId = (String)((Object)entry.getKey());
                cancelFuts.add(((CompletableFuture)CompletableFuture.allOf(((List)((Object)entry.getValue())).toArray(new CompletableFuture[0])).handle((none2, t) -> null)).thenCompose(ignored -> {
                    if (ExecutionServiceImpl.this.localNode.name().equals(nodeId)) {
                        return this.closeLocalFragments();
                    }
                    return ExecutionServiceImpl.this.messageService.send(nodeId, FACTORY.queryCloseMessage().queryId(this.executionId.queryId()).executionToken(this.executionId.executionToken()).build());
                }));
            }
            return CompletableFuture.allOf(cancelFuts.toArray(new CompletableFuture[0]));
        }

        private CompletableFuture<Void> closeRootNode(AsyncDataCursor.CancellationReason closeReason) {
            assert (this.root != null);
            String message = closeReason == AsyncDataCursor.CancellationReason.TIMEOUT ? "Query timeout" : "The query was cancelled while executing.";
            if (!this.root.isDone()) {
                this.root.completeExceptionally((Throwable)((Object)new QueryCancelledException(message)));
            }
            if (!this.root.isCompletedExceptionally()) {
                AsyncRootNode node = this.root.getNow(null);
                if (closeReason != AsyncDataCursor.CancellationReason.CLOSE) {
                    node.onError((Throwable)((Object)new QueryCancelledException(message)));
                }
                return node.closeAsync();
            }
            return CompletableFutures.nullCompletedFuture();
        }

        private AsyncCursor<InternalSqlRow> wrapRootNode(final AsyncCursor<InternalSqlRow> cursor) {
            return new AsyncCursor<InternalSqlRow>(){

                public CompletableFuture<AsyncCursor.BatchedResult<InternalSqlRow>> requestNextAsync(int rows) {
                    CompletableFuture fut = cursor.requestNextAsync(rows);
                    fut.thenAccept(batch -> {
                        if (!batch.hasMore()) {
                            DistributedQueryManager.this.close(AsyncDataCursor.CancellationReason.CLOSE);
                        }
                    });
                    return fut;
                }

                public CompletableFuture<Void> closeAsync() {
                    return DistributedQueryManager.this.close(AsyncDataCursor.CancellationReason.CLOSE);
                }
            };
        }
    }

    private static class FragmentCacheKey {
        private final int catalogVersion;
        private final String fragmentString;

        FragmentCacheKey(int catalogVersion, String fragmentString) {
            this.catalogVersion = catalogVersion;
            this.fragmentString = fragmentString;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FragmentCacheKey that = (FragmentCacheKey)o;
            return this.catalogVersion == that.catalogVersion && this.fragmentString.equals(that.fragmentString);
        }

        public int hashCode() {
            return Objects.hash(this.catalogVersion, this.fragmentString);
        }
    }
}

