/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.shaded.org.apache.ignite.internal.client;

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import org.gridgain.shaded.org.apache.ignite.client.ClientOperationType;
import org.gridgain.shaded.org.apache.ignite.client.IgniteClientConfiguration;
import org.gridgain.shaded.org.apache.ignite.client.IgniteClientConnectionException;
import org.gridgain.shaded.org.apache.ignite.client.RetryPolicy;
import org.gridgain.shaded.org.apache.ignite.internal.client.ChannelValidator;
import org.gridgain.shaded.org.apache.ignite.internal.client.ClientChannel;
import org.gridgain.shaded.org.apache.ignite.internal.client.ClientChannelConfiguration;
import org.gridgain.shaded.org.apache.ignite.internal.client.ClientChannelFactory;
import org.gridgain.shaded.org.apache.ignite.internal.client.ClientFutureUtils;
import org.gridgain.shaded.org.apache.ignite.internal.client.ClientMetricSource;
import org.gridgain.shaded.org.apache.ignite.internal.client.ClientTransactionInflights;
import org.gridgain.shaded.org.apache.ignite.internal.client.ClientUtils;
import org.gridgain.shaded.org.apache.ignite.internal.client.HostAndPort;
import org.gridgain.shaded.org.apache.ignite.internal.client.IgniteClientConfigurationImpl;
import org.gridgain.shaded.org.apache.ignite.internal.client.InetAddressResolver;
import org.gridgain.shaded.org.apache.ignite.internal.client.PayloadReader;
import org.gridgain.shaded.org.apache.ignite.internal.client.PayloadWriter;
import org.gridgain.shaded.org.apache.ignite.internal.client.RetryPolicyContextImpl;
import org.gridgain.shaded.org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
import org.gridgain.shaded.org.apache.ignite.internal.client.io.netty.NettyClientConnectionMultiplexer;
import org.gridgain.shaded.org.apache.ignite.internal.close.ManuallyCloseable;
import org.gridgain.shaded.org.apache.ignite.internal.hlc.HybridTimestamp;
import org.gridgain.shaded.org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.gridgain.shaded.org.apache.ignite.internal.logger.IgniteLogger;
import org.gridgain.shaded.org.apache.ignite.internal.thread.NamedThreadFactory;
import org.gridgain.shaded.org.apache.ignite.internal.util.CompletableFutures;
import org.gridgain.shaded.org.apache.ignite.internal.util.ExceptionUtils;
import org.gridgain.shaded.org.apache.ignite.internal.util.IgniteUtils;
import org.gridgain.shaded.org.apache.ignite.lang.ErrorGroups;
import org.gridgain.shaded.org.apache.ignite.lang.IgniteException;
import org.gridgain.shaded.org.apache.ignite.network.ClusterNode;
import org.gridgain.shaded.org.jetbrains.annotations.Nullable;

public final class ReliableChannel
implements AutoCloseable {
    private final ClientChannelFactory chFactory;
    private final ClientMetricSource metrics;
    private volatile List<ClientChannelHolder> channels;
    private volatile int defaultChIdx = -1;
    private final AtomicInteger curChIdx = new AtomicInteger();
    private final IgniteClientConfigurationImpl clientCfg;
    private final Map<String, ClientChannelHolder> nodeChannelsByName = new ConcurrentHashMap<String, ClientChannelHolder>();
    private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
    private volatile boolean closed;
    private final ArrayList<Runnable> chFailLsnrs = new ArrayList();
    private final ReadWriteLock curChannelsGuard = new ReentrantReadWriteLock();
    private final ClientConnectionMultiplexer connMgr;
    private final IgniteLogger log;
    private volatile String[] prevHostAddrs;
    private final AtomicLong partitionAssignmentTimestamp = new AtomicLong();
    private final HybridTimestampTracker observableTimeTracker;
    private final AtomicReference<UUID> clusterId = new AtomicReference();
    @Nullable
    private ScheduledExecutorService streamerFlushExecutor;
    private final Executor asyncContinuationExecutor;
    private final ClientTransactionInflights inflights;
    private final InetAddressResolver addressResolver;
    private volatile CompletableFuture<Void> scheduledReResolveAddressesFuture;
    @Nullable
    private final ChannelValidator channelValidator;

    ReliableChannel(ClientChannelFactory chFactory, IgniteClientConfigurationImpl clientCfg, ClientMetricSource metrics, HybridTimestampTracker observableTimeTracker, @Nullable ChannelValidator channelValidator) {
        this.clientCfg = Objects.requireNonNull(clientCfg, "clientCfg");
        this.chFactory = Objects.requireNonNull(chFactory, "chFactory");
        this.log = ClientUtils.logger(clientCfg, ReliableChannel.class);
        this.metrics = metrics;
        this.observableTimeTracker = Objects.requireNonNull(observableTimeTracker, "observableTime");
        this.channelValidator = channelValidator;
        this.connMgr = new NettyClientConnectionMultiplexer(metrics);
        this.connMgr.start(clientCfg);
        this.inflights = new ClientTransactionInflights();
        this.addressResolver = Objects.requireNonNullElse(clientCfg.addressResolver(), InetAddressResolver.DEFAULT);
        this.asyncContinuationExecutor = clientCfg.asyncContinuationExecutor() == null ? ForkJoinPool.commonPool() : clientCfg.asyncContinuationExecutor();
    }

    @Override
    public synchronized void close() throws Exception {
        this.closed = true;
        @Nullable CompletableFuture<Void> fut = this.scheduledReResolveAddressesFuture;
        if (fut != null && !fut.isDone()) {
            fut.cancel(true);
        }
        List<ClientChannelHolder> holders = this.channels;
        ArrayList<ManuallyCloseable> closeables = new ArrayList<ManuallyCloseable>();
        if (holders != null) {
            for (ClientChannelHolder hld : holders) {
                closeables.add(hld::close);
            }
        }
        closeables.add(this.connMgr::stop);
        closeables.add(() -> IgniteUtils.shutdownAndAwaitTermination(this.streamerFlushExecutor, 10L, TimeUnit.SECONDS));
        IgniteUtils.closeAllManually(closeables);
    }

    public ClientMetricSource metrics() {
        return this.metrics;
    }

    public List<ClusterNode> connections() {
        ArrayList<ClusterNode> res = new ArrayList<ClusterNode>(this.channels.size());
        for (ClientChannelHolder holder : this.nodeChannelsByName.values()) {
            ClientChannel ch;
            CompletableFuture<ClientChannel> chFut = holder.chFut;
            if (chFut == null || (ch = ClientFutureUtils.getNowSafe(chFut)) == null || ch.closed()) continue;
            res.add(ch.protocolContext().clusterNode());
        }
        return res;
    }

    public IgniteClientConfiguration configuration() {
        return this.clientCfg;
    }

    public HybridTimestampTracker observableTimestamp() {
        return this.observableTimeTracker;
    }

    public UUID clusterId() {
        return this.clusterId.get();
    }

    public List<ClientChannel> channels() {
        ArrayList<ClientChannel> res = new ArrayList<ClientChannel>(this.channels.size());
        for (ClientChannelHolder holder : this.nodeChannelsByName.values()) {
            ClientChannel ch;
            CompletableFuture<ClientChannel> chFut = holder.chFut;
            if (chFut == null || (ch = ClientFutureUtils.getNowSafe(chFut)) == null || ch.closed()) continue;
            res.add(ch);
        }
        return res;
    }

    public <T> CompletableFuture<T> serviceAsync(int opCode, @Nullable PayloadWriter payloadWriter, @Nullable PayloadReader<T> payloadReader, @Nullable String preferredNodeName, @Nullable RetryPolicy retryPolicyOverride, boolean expectNotifications) {
        return ClientFutureUtils.doWithRetryAsync(() -> this.getChannelAsync(preferredNodeName).thenCompose(ch -> this.serviceAsyncInternal(opCode, payloadWriter, payloadReader, expectNotifications, (ClientChannel)ch)), ctx -> this.shouldRetry(opCode, (ClientFutureUtils.RetryContext)ctx, retryPolicyOverride));
    }

    public <T> CompletableFuture<T> serviceAsync(int opCode, @Nullable PayloadWriter payloadWriter, @Nullable PayloadReader<T> payloadReader, Supplier<CompletableFuture<ClientChannel>> channelResolver, @Nullable RetryPolicy retryPolicyOverride, boolean expectNotifications) {
        return ClientFutureUtils.doWithRetryAsync(() -> ((CompletableFuture)channelResolver.get()).thenCompose(ch -> this.serviceAsyncInternal(opCode, payloadWriter, payloadReader, expectNotifications, (ClientChannel)ch)), ctx -> this.shouldRetry(opCode, (ClientFutureUtils.RetryContext)ctx, retryPolicyOverride));
    }

    public <T> CompletableFuture<T> serviceAsync(ToIntFunction<ClientChannel> opCodeFunc, int retryOpType, @Nullable PayloadWriter payloadWriter, @Nullable PayloadReader<T> payloadReader) {
        return ClientFutureUtils.doWithRetryAsync(() -> this.getChannelAsync(null).thenCompose(ch -> {
            int opCode = opCodeFunc.applyAsInt((ClientChannel)ch);
            return this.serviceAsyncInternal(opCode, payloadWriter, payloadReader, false, (ClientChannel)ch);
        }), ctx -> this.shouldRetry(retryOpType, (ClientFutureUtils.RetryContext)ctx, null));
    }

    public <T> CompletableFuture<T> serviceAsync(int opCode, PayloadWriter payloadWriter, @Nullable PayloadReader<T> payloadReader) {
        return this.serviceAsync(opCode, payloadWriter, payloadReader, (String)null, null, false);
    }

    public <T> CompletableFuture<T> serviceAsync(int opCode, PayloadReader<T> payloadReader) {
        return this.serviceAsync(opCode, null, payloadReader, (String)null, null, false);
    }

    private <T> CompletableFuture<T> serviceAsyncInternal(int opCode, @Nullable PayloadWriter payloadWriter, @Nullable PayloadReader<T> payloadReader, boolean expectNotifications, ClientChannel ch) {
        return ch.serviceAsync(opCode, payloadWriter, payloadReader, expectNotifications).whenComplete((res, err) -> {
            if (err != null && ReliableChannel.unwrapConnectionException(err) != null) {
                this.onChannelFailure(ch);
            }
        });
    }

    public CompletableFuture<ClientChannel> getChannelAsync(@Nullable String preferredNodeName) {
        ClientChannelHolder holder;
        if (preferredNodeName != null && (holder = this.nodeChannelsByName.get(preferredNodeName)) != null) {
            return holder.getOrCreateChannelAsync().thenCompose(ch -> {
                if (ch != null) {
                    return CompletableFuture.completedFuture(ch);
                }
                return this.getDefaultChannelAsync();
            });
        }
        ClientChannel nextCh = this.getNextChannelWithoutReconnect();
        if (nextCh != null) {
            return CompletableFuture.completedFuture(nextCh);
        }
        return this.getDefaultChannelAsync();
    }

    private static Map<InetSocketAddress, Integer> parsedAddresses(InetAddressResolver addressResolver, String[] addrs) {
        if (addrs == null || addrs.length == 0) {
            throw new IgniteException(ErrorGroups.Client.CONFIGURATION_ERR, "Empty addresses");
        }
        ArrayList<HostAndPort> parsedAddrs = new ArrayList<HostAndPort>(addrs.length);
        for (String a : addrs) {
            parsedAddrs.add(HostAndPort.parse(a, 10800, "Failed to parse Ignite server address"));
        }
        HashMap<InetSocketAddress, Integer> map = IgniteUtils.newHashMap(parsedAddrs.size());
        for (HostAndPort addr : parsedAddrs) {
            try {
                for (InetSocketAddress sockAddr : addressResolver.getAllByName(addr.host(), addr.port())) {
                    map.merge(sockAddr, 1, Integer::sum);
                }
            }
            catch (UnknownHostException e) {
                InetSocketAddress sockAddr;
                sockAddr = InetSocketAddress.createUnresolved(addr.host(), addr.port());
                map.merge(sockAddr, 1, Integer::sum);
            }
        }
        return map;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rollCurrentChannel(ClientChannelHolder hld) {
        this.curChannelsGuard.writeLock().lock();
        try {
            int idx = this.defaultChIdx;
            List<ClientChannelHolder> holders = this.channels;
            ClientChannelHolder dfltHld = holders.get(idx);
            if (dfltHld == hld) {
                this.defaultChIdx = ++idx >= holders.size() ? 0 : idx;
            }
        }
        finally {
            this.curChannelsGuard.writeLock().unlock();
        }
    }

    private void onChannelFailure(ClientChannel ch) {
        this.onChannelFailure(this.channels.get(this.defaultChIdx), ch);
    }

    private void onChannelFailure(ClientChannelHolder hld, @Nullable ClientChannel ch) {
        this.chFailLsnrs.forEach(Runnable::run);
        this.rollCurrentChannel(hld);
        this.asyncContinuationExecutor.execute(this::reResolveAddresses);
    }

    public void addChannelFailListener(Runnable chFailLsnr) {
        this.chFailLsnrs.add(chFailLsnr);
    }

    public ClientTransactionInflights inflights() {
        return this.inflights;
    }

    private boolean shouldStopChannelsReinit() {
        return this.scheduledChannelsReinit.get() || this.closed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized boolean initChannelHolders() {
        List<ClientChannelHolder> holders = this.channels;
        this.scheduledChannelsReinit.set(false);
        Map<InetSocketAddress, Integer> newAddrs = null;
        if (this.clientCfg.addressesFinder() != null) {
            Object[] hostAddrs = this.clientCfg.addressesFinder().getAddresses();
            if (hostAddrs.length == 0) {
                throw new IgniteException(ErrorGroups.Client.CONFIGURATION_ERR, "Empty addresses");
            }
            if (!Arrays.equals(hostAddrs, this.prevHostAddrs)) {
                newAddrs = ReliableChannel.parsedAddresses(this.addressResolver, (String[])hostAddrs);
                this.prevHostAddrs = hostAddrs;
            }
        } else {
            newAddrs = ReliableChannel.parsedAddresses(this.addressResolver, this.clientCfg.addresses());
        }
        if (newAddrs == null) {
            return true;
        }
        HashMap<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<InetSocketAddress, ClientChannelHolder>();
        HashSet<InetSocketAddress> allAddrs = new HashSet<InetSocketAddress>(newAddrs.keySet());
        if (holders != null) {
            for (ClientChannelHolder h : holders) {
                curAddrs.put(h.chCfg.getAddress(), h);
                allAddrs.add(h.chCfg.getAddress());
            }
        }
        ArrayList<ClientChannelHolder> reinitHolders = new ArrayList<ClientChannelHolder>();
        int dfltChannelIdx = -1;
        ClientChannelHolder currDfltHolder = null;
        int idx = this.defaultChIdx;
        if (idx != -1) {
            currDfltHolder = holders.get(idx);
        }
        for (InetSocketAddress addr : allAddrs) {
            int i;
            ClientChannelHolder hld;
            if (this.shouldStopChannelsReinit()) {
                return false;
            }
            if (!newAddrs.containsKey(addr)) {
                ((ClientChannelHolder)curAddrs.get(addr)).close();
                continue;
            }
            if (!curAddrs.containsKey(addr)) {
                hld = new ClientChannelHolder(new ClientChannelConfiguration(this.clientCfg, addr));
                for (i = 0; i < newAddrs.get(addr); ++i) {
                    reinitHolders.add(hld);
                }
                continue;
            }
            hld = (ClientChannelHolder)curAddrs.get(addr);
            for (i = 0; i < newAddrs.get(addr); ++i) {
                reinitHolders.add(hld);
            }
            if (hld != currDfltHolder) continue;
            dfltChannelIdx = reinitHolders.size() - 1;
        }
        if (dfltChannelIdx == -1) {
            dfltChannelIdx = 0;
        }
        this.curChannelsGuard.writeLock().lock();
        try {
            this.channels = reinitHolders;
            this.defaultChIdx = dfltChannelIdx;
        }
        finally {
            this.curChannelsGuard.writeLock().unlock();
        }
        return true;
    }

    CompletableFuture<ClientChannel> channelsInitAsync() {
        if (!this.initChannelHolders()) {
            return CompletableFutures.nullCompletedFuture();
        }
        CompletableFuture<ClientChannel> fut = this.getDefaultChannelAsync();
        fut.thenAccept(unused -> {
            ForkJoinPool.commonPool().submit(this::initAllChannelsAsync);
            this.scheduleNextReResolveAddresses();
        });
        return fut;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private ClientChannel getNextChannelWithoutReconnect() {
        this.curChannelsGuard.readLock().lock();
        try {
            int startIdx = this.curChIdx.incrementAndGet();
            for (int i = 0; i < this.channels.size(); ++i) {
                ClientChannel ch;
                int nextIdx = Math.abs(startIdx + i) % this.channels.size();
                ClientChannelHolder hld = this.channels.get(nextIdx);
                ClientChannel clientChannel = ch = hld == null ? null : hld.getNow();
                if (ch == null) continue;
                ClientChannel clientChannel2 = ch;
                return clientChannel2;
            }
        }
        finally {
            this.curChannelsGuard.readLock().unlock();
        }
        return null;
    }

    private CompletableFuture<ClientChannel> getDefaultChannelAsync() {
        return ClientFutureUtils.doWithRetryAsync(() -> {
            ClientChannelHolder hld;
            this.curChannelsGuard.readLock().lock();
            try {
                hld = this.channels.get(this.defaultChIdx);
            }
            finally {
                this.curChannelsGuard.readLock().unlock();
            }
            return hld.getOrCreateChannelAsync();
        }, ctx -> this.shouldRetry(ClientOperationType.CHANNEL_CONNECT, (ClientFutureUtils.RetryContext)ctx, null));
    }

    private boolean shouldRetry(int opCode, ClientFutureUtils.RetryContext ctx, @Nullable RetryPolicy retryPolicyOverride) {
        ClientOperationType opType = ClientUtils.opCodeToClientOperationType(opCode);
        boolean res = this.shouldRetry(opType, ctx, retryPolicyOverride);
        if (this.log.isDebugEnabled()) {
            if (res) {
                this.log.debug("Retrying operation [opCode=" + opCode + ", opType=" + opType + ", attempt=" + ctx.attempt + ", lastError=" + ctx.lastError() + "]", new Object[0]);
            } else {
                this.log.debug("Not retrying operation [opCode=" + opCode + ", opType=" + opType + ", attempt=" + ctx.attempt + ", lastError=" + ctx.lastError() + "]", new Object[0]);
            }
        }
        if (res) {
            this.metrics.requestsRetriedIncrement();
        }
        return res;
    }

    private boolean shouldRetry(@Nullable ClientOperationType opType, ClientFutureUtils.RetryContext ctx, @Nullable RetryPolicy retryPolicyOverride) {
        RetryPolicy plc;
        Throwable err = ctx.lastError();
        if (err == null) {
            return opType == ClientOperationType.CHANNEL_CONNECT && ctx.attempt < 16;
        }
        IgniteClientConnectionException exception = ReliableChannel.unwrapConnectionException(err);
        if (exception == null) {
            return false;
        }
        if (exception.code() == ErrorGroups.Client.CLUSTER_ID_MISMATCH_ERR) {
            return false;
        }
        if (opType == null) {
            return ctx.attempt < 16;
        }
        RetryPolicy retryPolicy = plc = retryPolicyOverride != null ? retryPolicyOverride : this.clientCfg.retryPolicy();
        if (plc == null) {
            return false;
        }
        RetryPolicyContextImpl retryPolicyContext = new RetryPolicyContextImpl(this.clientCfg, opType, ctx.attempt, exception);
        return plc.shouldRetry(retryPolicyContext);
    }

    private void initAllChannelsAsync() {
        List<ClientChannelHolder> holders = this.channels;
        ArrayList<CompletableFuture<ClientChannel>> futs = new ArrayList<CompletableFuture<ClientChannel>>(holders.size());
        for (ClientChannelHolder hld : holders) {
            if (this.closed) {
                return;
            }
            try {
                futs.add(hld.getOrCreateChannelAsync());
            }
            catch (Exception e) {
                this.logFailedEstablishConnection(hld, e);
            }
        }
        long interval = this.clientCfg.backgroundReconnectInterval();
        if (interval > 0L && !this.closed) {
            CompletableFuture.allOf((CompletableFuture[])futs.toArray(CompletableFuture[]::new)).whenCompleteAsync((res, err) -> this.initAllChannelsAsync(), CompletableFuture.delayedExecutor(interval, TimeUnit.MILLISECONDS));
        }
    }

    private void onObservableTimestampReceived(long newTs) {
        this.observableTimeTracker.update(HybridTimestamp.nullableHybridTimestamp(newTs));
    }

    private void onPartitionAssignmentChanged(long timestamp) {
        long old = this.partitionAssignmentTimestamp.getAndUpdate(curTs -> Math.max(curTs, timestamp));
        if (timestamp > old) {
            this.asyncContinuationExecutor.execute(this::reResolveAddresses);
        }
    }

    public long partitionAssignmentTimestamp() {
        return this.partitionAssignmentTimestamp.get();
    }

    public synchronized ScheduledExecutorService streamerFlushExecutor() {
        if (this.streamerFlushExecutor == null) {
            this.streamerFlushExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("client-data-streamer-flush-" + this.hashCode(), ClientUtils.logger(this.clientCfg, ReliableChannel.class)));
        }
        return this.streamerFlushExecutor;
    }

    @Nullable
    private static IgniteClientConnectionException unwrapConnectionException(Throwable err) {
        if (!((err = ExceptionUtils.unwrapCause(err)) instanceof IgniteClientConnectionException)) {
            return null;
        }
        return (IgniteClientConnectionException)err;
    }

    private void logFailedEstablishConnection(ClientChannelHolder ch, Throwable err) {
        String logMessage = "Failed to establish connection to {}: {}";
        if (ReliableChannel.isLogFailedEstablishConnectionExceptionStackTrace(err)) {
            this.log.warn(logMessage, err, ch.chCfg.getAddress(), err.getMessage());
        } else {
            this.log.info(logMessage, ch.chCfg.getAddress(), err.getMessage());
        }
    }

    private static boolean isLogFailedEstablishConnectionExceptionStackTrace(Throwable err) {
        return !ExceptionUtils.hasCauseOrSuppressed(err, "Connection refused", ConnectException.class);
    }

    private void scheduleNextReResolveAddresses() {
        if (this.closed) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Skipping scheduling re-resolve of addresses since channel is closed", new Object[0]);
            }
            return;
        }
        long interval = this.clientCfg.backgroundReResolveAddressesInterval();
        if (interval > 0L) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Scheduling next re-resolve of addresses in {} ms", interval);
            }
            this.scheduledReResolveAddressesFuture = CompletableFuture.runAsync(this::reResolveAddresses, CompletableFuture.delayedExecutor(interval, TimeUnit.MILLISECONDS, this.asyncContinuationExecutor));
        }
    }

    private void reResolveAddresses() {
        CompletableFuture<Void> fut = this.scheduledReResolveAddressesFuture;
        if (this.closed || fut != null && !fut.cancel(false)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Skipping re-resolve of addresses since another re-resolve is already running or channel is closed", new Object[0]);
            }
            return;
        }
        if (this.scheduledChannelsReinit.compareAndSet(false, true)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Re-resolving addresses and re-initializing channel holders", new Object[0]);
            }
            this.initChannelHolders();
        }
        this.scheduleNextReResolveAddresses();
    }

    class ClientChannelHolder {
        private final ClientChannelConfiguration chCfg;
        @Nullable
        private volatile CompletableFuture<ClientChannel> chFut;
        private volatile ClusterNode serverNode;
        private volatile boolean close;

        private ClientChannelHolder(ClientChannelConfiguration chCfg) {
            this.chCfg = chCfg;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private CompletableFuture<ClientChannel> getOrCreateChannelAsync() {
            if (this.close) {
                return CompletableFuture.failedFuture(new IgniteClientConnectionException(ErrorGroups.Client.CONNECTION_ERR, "Channel is closed", this.chCfg.getAddress().toString()));
            }
            CompletionStage<ClientChannel> chFut0 = this.chFut;
            if (this.isFutureInProgressOrDoneAndChannelOpen((CompletableFuture<ClientChannel>)chFut0)) {
                return chFut0;
            }
            ClientChannelHolder clientChannelHolder = this;
            synchronized (clientChannelHolder) {
                if (this.close) {
                    return CompletableFuture.failedFuture(new IgniteClientConnectionException(ErrorGroups.Client.CONNECTION_ERR, "Channel is closed", this.chCfg.getAddress().toString()));
                }
                chFut0 = this.chFut;
                if (this.isFutureInProgressOrDoneAndChannelOpen((CompletableFuture<ClientChannel>)chFut0)) {
                    return chFut0;
                }
                CompletableFuture<ClientChannel> createFut = ReliableChannel.this.chFactory.create(this.chCfg, ReliableChannel.this.connMgr, ReliableChannel.this.metrics, x$0 -> ReliableChannel.this.onPartitionAssignmentChanged((long)x$0), x$0 -> ReliableChannel.this.onObservableTimestampReceived((long)x$0), ReliableChannel.this.inflights);
                chFut0 = createFut.thenApply(ch -> {
                    if (ReliableChannel.this.channelValidator != null) {
                        ReliableChannel.this.channelValidator.validate(ch.protocolContext());
                    }
                    UUID currentClusterId = ch.protocolContext().clusterId();
                    UUID oldClusterId = ReliableChannel.this.clusterId.compareAndExchange(null, currentClusterId);
                    List<UUID> validClusterIds = ch.protocolContext().clusterIds();
                    if (oldClusterId != null && !validClusterIds.contains(oldClusterId)) {
                        try {
                            ch.close();
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                        String clusterIdsString = validClusterIds.stream().map(UUID::toString).collect(Collectors.joining(", "));
                        throw new IgniteClientConnectionException(ErrorGroups.Client.CLUSTER_ID_MISMATCH_ERR, "Cluster ID mismatch: expected=" + oldClusterId + ", actual=" + clusterIdsString, ch.endpoint());
                    }
                    ClusterNode newNode = ch.protocolContext().clusterNode();
                    ReliableChannel.this.nodeChannelsByName.put(newNode.name(), this);
                    ClusterNode oldServerNode = this.serverNode;
                    if (oldServerNode != null && !oldServerNode.id().equals(newNode.id())) {
                        ReliableChannel.this.nodeChannelsByName.remove(oldServerNode.name(), this);
                    }
                    this.serverNode = newNode;
                    return ch;
                });
                ((CompletableFuture)chFut0).exceptionally(err -> {
                    this.closeChannel();
                    ReliableChannel.this.onChannelFailure(this, null);
                    ReliableChannel.this.logFailedEstablishConnection(this, (Throwable)err);
                    return null;
                });
                this.chFut = chFut0;
                return chFut0;
            }
        }

        @Nullable
        private ClientChannel getNow() {
            if (this.close) {
                return null;
            }
            CompletableFuture<ClientChannel> f = this.chFut;
            if (f == null) {
                return null;
            }
            ClientChannel ch = ClientFutureUtils.getNowSafe(f);
            if (ch == null || ch.closed()) {
                return null;
            }
            return ch;
        }

        private synchronized void closeChannel() {
            CompletableFuture<ClientChannel> ch0 = this.chFut;
            if (ch0 != null) {
                ch0.thenAccept(c -> {
                    try {
                        c.close();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                });
                ClusterNode oldServerNode = this.serverNode;
                if (oldServerNode != null) {
                    this.rollNodeChannelsByName();
                }
                this.chFut = null;
            }
        }

        void close() {
            this.close = true;
            ClusterNode oldServerNode = this.serverNode;
            if (oldServerNode != null) {
                this.rollNodeChannelsByName();
            }
            this.closeChannel();
        }

        private boolean isFutureInProgressOrDoneAndChannelOpen(@Nullable CompletableFuture<ClientChannel> f) {
            if (f == null || f.isCompletedExceptionally()) {
                return false;
            }
            if (!f.isDone()) {
                return true;
            }
            ClientChannel ch = ClientFutureUtils.getNowSafe(f);
            return ch != null && !ch.closed();
        }

        private void rollNodeChannelsByName() {
            List<ClientChannelHolder> holders = ReliableChannel.this.channels;
            for (ClientChannelHolder h : holders) {
                if (h == this || h.serverNode == null || !Objects.equals(this.serverNode.id(), h.serverNode.id())) continue;
                ReliableChannel.this.nodeChannelsByName.put(h.serverNode.name(), h);
                return;
            }
            ReliableChannel.this.nodeChannelsByName.remove(this.serverNode.name(), this);
        }
    }
}

