/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.ignite.client.IgniteClientAuthenticator;
import org.apache.ignite.client.IgniteClientConnectionException;
import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.ClientChannelConfiguration;
import org.apache.ignite.internal.client.ClientClusterNode;
import org.apache.ignite.internal.client.ClientDelayedAckException;
import org.apache.ignite.internal.client.ClientMetricSource;
import org.apache.ignite.internal.client.ClientSchemaVersionMismatchException;
import org.apache.ignite.internal.client.ClientTimeoutWorker;
import org.apache.ignite.internal.client.ClientTransactionInflights;
import org.apache.ignite.internal.client.ClientUtils;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.PayloadOutputChannel;
import org.apache.ignite.internal.client.PayloadReader;
import org.apache.ignite.internal.client.PayloadWriter;
import org.apache.ignite.internal.client.ProtocolContext;
import org.apache.ignite.internal.client.io.ClientConnection;
import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.client.io.ClientConnectionStateHandler;
import org.apache.ignite.internal.client.io.ClientMessageHandler;
import org.apache.ignite.internal.client.proto.ClientMessageCommon;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.proto.HandshakeExtension;
import org.apache.ignite.internal.client.proto.HandshakeUtils;
import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.client.proto.ResponseFlags;
import org.apache.ignite.internal.future.timeout.TimeoutObject;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.thread.PublicApiThreading;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.FastTimestamps;
import org.apache.ignite.internal.util.ViewUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.sql.SqlBatchException;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;

class TcpClientChannel
implements ClientChannel,
ClientMessageHandler,
ClientConnectionStateHandler {
    private static final ProtocolVersion DEFAULT_VERSION = ProtocolVersion.LATEST_VER;
    private static final BitSet SUPPORTED_FEATURES = ProtocolBitmaskFeature.featuresAsBitSet(EnumSet.of(ProtocolBitmaskFeature.USER_ATTRIBUTES, new ProtocolBitmaskFeature[]{ProtocolBitmaskFeature.TABLE_GET_REQS_USE_QUALIFIED_NAME, ProtocolBitmaskFeature.TX_DIRECT_MAPPING, ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB, ProtocolBitmaskFeature.COMPUTE_TASK_ID, ProtocolBitmaskFeature.TX_DELAYED_ACKS, ProtocolBitmaskFeature.TX_PIGGYBACK, ProtocolBitmaskFeature.TX_ALLOW_NOOP_ENLIST, ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS, ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING, ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS, ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT, ProtocolBitmaskFeature.CQ_EVENT_TYPE, ProtocolBitmaskFeature.CQ_PARTITION_SCAN_STATUS, ProtocolBitmaskFeature.CQ_SKIP_OLD_ENTRIES}));
    private static final long MIN_RECOMMENDED_HEARTBEAT_INTERVAL = 500L;
    private final ClientChannelConfiguration cfg;
    private final ClientMetricSource metrics;
    private volatile ProtocolContext protocolCtx;
    private volatile ClientConnection sock;
    private final AtomicLong reqId = new AtomicLong(1L);
    private final ConcurrentMap<Long, TimeoutObjectImpl> pendingReqs = new ConcurrentHashMap<Long, TimeoutObjectImpl>();
    private final Map<Long, CompletableFuture<PayloadInputChannel>> notificationHandlers = new ConcurrentHashMap<Long, CompletableFuture<PayloadInputChannel>>();
    private final Consumer<Long> assignmentChangeListener;
    private final Consumer<Long> observableTimestampListener;
    private final ClientTransactionInflights inflights;
    private final AtomicBoolean closed = new AtomicBoolean();
    private final Executor asyncContinuationExecutor;
    private final long connectTimeout;
    private final long heartbeatTimeout;
    private final long operationTimeout;
    private volatile Timer heartbeatTimer;
    private final IgniteLogger log;
    private volatile long lastSendMillis;
    private volatile long lastReceiveMillis;
    private volatile boolean tcpConnectionEstablished;

    private TcpClientChannel(ClientChannelConfiguration cfg, ClientMetricSource metrics, Consumer<Long> assignmentChangeListener, Consumer<Long> observableTimestampListener, ClientTransactionInflights inflights) {
        TcpClientChannel.validateConfiguration(cfg);
        this.cfg = cfg;
        this.metrics = metrics;
        this.assignmentChangeListener = assignmentChangeListener;
        this.observableTimestampListener = observableTimestampListener;
        this.inflights = inflights;
        this.log = ClientUtils.logger(cfg.clientConfiguration(), TcpClientChannel.class);
        this.asyncContinuationExecutor = cfg.clientConfiguration().asyncContinuationExecutor() == null ? ForkJoinPool.commonPool() : cfg.clientConfiguration().asyncContinuationExecutor();
        this.connectTimeout = cfg.clientConfiguration().connectTimeout();
        this.heartbeatTimeout = cfg.clientConfiguration().heartbeatTimeout();
        this.operationTimeout = cfg.clientConfiguration().operationTimeout();
    }

    private CompletableFuture<ClientChannel> initAsync(ClientConnectionMultiplexer connMgr) {
        return ((CompletableFuture)((CompletableFuture)connMgr.openAsync(this.cfg.getAddress(), this, this).thenCompose(s -> {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connection established [remoteAddress=" + s.remoteAddress() + "]", new Object[0]);
            }
            this.tcpConnectionEstablished = true;
            ClientTimeoutWorker.INSTANCE.registerClientChannel(this, this.cfg.clientConfiguration());
            this.sock = s;
            return this.handshakeAsync(DEFAULT_VERSION);
        })).whenComplete((res, err) -> {
            if (err != null) {
                this.close();
            }
        })).thenApplyAsync(unused -> {
            if (this.protocolCtx != null) {
                this.heartbeatTimer = this.initHeartbeat(this.cfg.clientConfiguration().heartbeatInterval());
            }
            return this;
        }, this.asyncContinuationExecutor);
    }

    static CompletableFuture<ClientChannel> createAsync(ClientChannelConfiguration cfg, ClientConnectionMultiplexer connMgr, ClientMetricSource metrics, Consumer<Long> assignmentChangeListener, Consumer<Long> observableTimestampListener, ClientTransactionInflights inflights) {
        return new TcpClientChannel(cfg, metrics, assignmentChangeListener, observableTimestampListener, inflights).initAsync(connMgr);
    }

    @Override
    public void close() {
        this.close(null, true);
    }

    private void close(@Nullable Throwable cause, boolean graceful) {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        if (cause != null && (cause instanceof TimeoutException || cause.getCause() instanceof TimeoutException)) {
            this.metrics.connectionsLostTimeoutIncrement();
        } else if (!graceful) {
            this.metrics.connectionsLostIncrement();
        }
        Timer timer = this.heartbeatTimer;
        if (timer != null) {
            timer.cancel();
        }
        for (TimeoutObjectImpl timeoutObjectImpl : this.pendingReqs.values()) {
            if (this.tcpConnectionEstablished && this.lastReceiveMillis == 0L) {
                timeoutObjectImpl.future().completeExceptionally(new IgniteClientConnectionException(ErrorGroups.Client.CONNECTION_ERR, "Channel is closed, cluster might not have been initialised", this.endpoint(), cause));
                continue;
            }
            timeoutObjectImpl.future().completeExceptionally(new IgniteClientConnectionException(ErrorGroups.Client.CONNECTION_ERR, "Channel is closed", this.endpoint(), cause));
        }
        for (CompletableFuture completableFuture : this.notificationHandlers.values()) {
            try {
                completableFuture.completeExceptionally(new IgniteClientConnectionException(ErrorGroups.Client.CONNECTION_ERR, "Channel is closed", this.endpoint(), cause));
            }
            catch (Throwable throwable) {}
        }
        if (this.sock != null) {
            try {
                this.sock.close();
            }
            catch (Throwable t) {
                this.log.warn("Failed to close the channel [remoteAddress=" + this.cfg.getAddress() + "]: " + t.getMessage(), t);
            }
        }
    }

    @Override
    public void onMessage(ByteBuf buf) {
        this.lastReceiveMillis = System.currentTimeMillis();
        try (ClientMessageUnpacker unpacker = new ClientMessageUnpacker(buf);){
            this.processNextMessage(unpacker);
        }
        catch (Throwable t) {
            this.close(t, false);
        }
    }

    @Override
    public void onDisconnected(@Nullable Exception e) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Connection closed [remoteAddress=" + this.cfg.getAddress() + "]", new Object[0]);
        }
        this.close(e, false);
    }

    @Override
    public <T> CompletableFuture<T> serviceAsync(int opCode, @Nullable PayloadWriter payloadWriter, @Nullable PayloadReader<T> payloadReader, boolean expectNotifications) {
        try {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Sending request [opCode=" + opCode + ", remoteAddress=" + this.cfg.getAddress() + "]", new Object[0]);
            }
            long id = this.reqId.getAndIncrement();
            CompletableFuture<PayloadInputChannel> notificationFut = null;
            if (expectNotifications) {
                notificationFut = new CompletableFuture<PayloadInputChannel>();
                this.notificationHandlers.put(id, notificationFut);
            }
            return this.send(opCode, id, payloadWriter, payloadReader, notificationFut, this.operationTimeout);
        }
        catch (Throwable t) {
            return CompletableFuture.failedFuture(t);
        }
    }

    private <T> CompletableFuture<T> send(int opCode, long id, @Nullable PayloadWriter payloadWriter, @Nullable PayloadReader<T> payloadReader, @Nullable CompletableFuture<PayloadInputChannel> notificationFut, long timeout) {
        if (this.closed()) {
            throw new IgniteClientConnectionException(ErrorGroups.Client.CONNECTION_ERR, "Channel is closed", this.endpoint());
        }
        CompletableFuture<ClientMessageUnpacker> fut = new CompletableFuture<ClientMessageUnpacker>();
        this.pendingReqs.put(id, new TimeoutObjectImpl(timeout, fut));
        this.metrics.requestsActiveIncrement();
        PayloadOutputChannel payloadCh = new PayloadOutputChannel(this, new ClientMessagePacker(this.sock.getBuffer()), id);
        boolean expectedException = false;
        try {
            ClientMessagePacker req = payloadCh.out();
            req.packInt(opCode);
            req.packLong(id);
            if (payloadWriter != null) {
                payloadWriter.accept(payloadCh);
            }
            this.write(req).addListener(f -> {
                if (!f.isSuccess()) {
                    String msg = "Failed to send request async [id=" + id + ", op=" + opCode + ", remoteAddress=" + this.cfg.getAddress() + "]";
                    IgniteClientConnectionException ex = new IgniteClientConnectionException(ErrorGroups.Client.CONNECTION_ERR, msg, this.endpoint(), f.cause());
                    fut.completeExceptionally(ex);
                    this.log.warn(msg + "]: " + f.cause().getMessage(), new Object[0]);
                    this.pendingReqs.remove(id);
                    this.metrics.requestsActiveDecrement();
                    this.onDisconnected(ex);
                } else {
                    this.metrics.requestsSentIncrement();
                    Runnable action = payloadCh.onSentAction();
                    if (action != null) {
                        this.asyncContinuationExecutor.execute(action);
                    }
                }
            });
            if (PublicApiThreading.executingSyncPublicApi() && !ClientOp.isBatch(opCode)) {
                try {
                    ClientMessageUnpacker unpacker2 = fut.join();
                    return CompletableFuture.completedFuture(this.complete(payloadReader, notificationFut, unpacker2));
                }
                catch (Throwable t) {
                    expectedException = true;
                    throw (RuntimeException)ExceptionUtils.sneakyThrow(ViewUtils.ensurePublicException(t));
                }
            }
            CompletableFuture resFut = new CompletableFuture();
            fut.handle((unpacker, err) -> {
                this.completeAsync(payloadReader, notificationFut, (ClientMessageUnpacker)unpacker, (Throwable)err, resFut);
                return null;
            });
            return resFut;
        }
        catch (Throwable t) {
            if (expectedException) {
                throw (RuntimeException)ExceptionUtils.sneakyThrow(t);
            }
            this.log.warn("Failed to send request sync [id=" + id + ", op=" + opCode + ", remoteAddress=" + this.cfg.getAddress() + "]: " + t.getMessage(), t);
            payloadCh.close();
            this.pendingReqs.remove(id);
            this.metrics.requestsActiveDecrement();
            throw (RuntimeException)ExceptionUtils.sneakyThrow(ViewUtils.ensurePublicException(t));
        }
    }

    private <T> void completeAsync(@Nullable PayloadReader<T> payloadReader, @Nullable CompletableFuture<PayloadInputChannel> notificationFut, ClientMessageUnpacker unpacker, @Nullable Throwable err, CompletableFuture<T> resFut) {
        if (err != null) {
            assert (unpacker == null) : "unpacker must be null if err is not null";
            try {
                this.asyncContinuationExecutor.execute(() -> resFut.completeExceptionally(ViewUtils.ensurePublicException(err)));
            }
            catch (Throwable execError) {
                execError.addSuppressed(err);
                resFut.completeExceptionally(ViewUtils.ensurePublicException(execError));
            }
            return;
        }
        try {
            this.asyncContinuationExecutor.execute(() -> {
                try {
                    resFut.complete(this.complete(payloadReader, notificationFut, unpacker));
                }
                catch (Throwable t) {
                    resFut.completeExceptionally(ViewUtils.ensurePublicException(t));
                }
            });
        }
        catch (Throwable execErr) {
            unpacker.close();
            resFut.completeExceptionally(ViewUtils.ensurePublicException(execErr));
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Nullable
    private <T> T complete(@Nullable PayloadReader<T> payloadReader, @Nullable CompletableFuture<PayloadInputChannel> notificationFut, ClientMessageUnpacker unpacker) {
        try (ClientMessageUnpacker clientMessageUnpacker = unpacker;){
            if (payloadReader != null) {
                T t = payloadReader.apply(new PayloadInputChannel(this, unpacker, notificationFut));
                return t;
            }
            T t = null;
            return t;
        }
        catch (Throwable e) {
            this.log.error("Failed to deserialize server response [remoteAddress=" + this.cfg.getAddress() + "]: " + e.getMessage(), e);
            throw new IgniteException(ErrorGroups.Client.PROTOCOL_ERR, "Failed to deserialize server response: " + e.getMessage(), e);
        }
    }

    private void processNextMessage(ClientMessageUnpacker unpacker) throws IgniteException {
        Throwable err;
        if (this.protocolCtx == null) {
            TcpClientChannel.completeRequestFuture(((TimeoutObjectImpl)this.pendingReqs.remove(-1L)).future(), unpacker);
            return;
        }
        Long resId = unpacker.unpackLong();
        int flags = unpacker.unpackInt();
        this.handlePartitionAssignmentChange(flags, unpacker);
        this.handleObservableTimestamp(unpacker);
        Throwable throwable = err = ResponseFlags.getErrorFlag(flags) ? TcpClientChannel.readError(unpacker) : null;
        if (ResponseFlags.getNotificationFlag(flags)) {
            this.handleNotification(resId, unpacker, err);
            return;
        }
        TimeoutObjectImpl pendingReq = (TimeoutObjectImpl)this.pendingReqs.remove(resId);
        if (pendingReq == null) {
            this.log.error("Unexpected response ID [remoteAddress=" + this.cfg.getAddress() + "]: " + resId, new Object[0]);
            throw new IgniteClientConnectionException(ErrorGroups.Client.PROTOCOL_ERR, String.format("Unexpected response ID [%s]", resId), this.endpoint());
        }
        this.metrics.requestsActiveDecrement();
        if (err == null) {
            this.metrics.requestsCompletedIncrement();
            TcpClientChannel.completeRequestFuture(pendingReq.future(), unpacker);
        } else {
            this.metrics.requestsFailedIncrement();
            this.notificationHandlers.remove(resId);
            pendingReq.future().completeExceptionally(err);
        }
    }

    private void handleObservableTimestamp(ClientMessageUnpacker unpacker) {
        long observableTimestamp = unpacker.unpackLong();
        this.observableTimestampListener.accept(observableTimestamp);
    }

    private void handlePartitionAssignmentChange(int flags, ClientMessageUnpacker unpacker) {
        if (ResponseFlags.getPartitionAssignmentChangedFlag(flags)) {
            if (this.log.isInfoEnabled()) {
                this.log.info("Partition assignment change notification received [remoteAddress=" + this.cfg.getAddress() + "]", new Object[0]);
            }
            long maxStartTime = unpacker.unpackLong();
            this.assignmentChangeListener.accept(maxStartTime);
        }
    }

    private void handleNotification(long id, ClientMessageUnpacker unpacker, @Nullable Throwable err) {
        CompletableFuture<PayloadInputChannel> handler = this.notificationHandlers.remove(id);
        if (handler == null) {
            if (err != null) {
                if (err instanceof ClientDelayedAckException) {
                    ClientDelayedAckException err0 = (ClientDelayedAckException)err;
                    this.inflights.removeInflight(err0.txId(), new TransactionException(err0.code(), err0.getMessage(), err0.getCause()));
                }
                return;
            }
            UUID txId = unpacker.unpackUuid();
            this.inflights.removeInflight(txId, err);
            return;
        }
        this.completeNotificationFuture(handler, unpacker, err);
    }

    private static Throwable readError(ClientMessageUnpacker unpacker) {
        UUID traceId = unpacker.unpackUuid();
        int code = unpacker.unpackInt();
        String errClassName = unpacker.unpackString();
        String errMsg = unpacker.tryUnpackNil() ? null : unpacker.unpackString();
        IgniteException causeWithStackTrace = unpacker.tryUnpackNil() ? null : new IgniteException(traceId, code, unpacker.unpackString());
        int extSize = unpacker.tryUnpackNil() ? 0 : unpacker.unpackInt();
        int expectedSchemaVersion = -1;
        long[] sqlUpdateCounters = null;
        UUID txId = null;
        for (int i = 0; i < extSize; ++i) {
            String key = unpacker.unpackString();
            if (key.equals("expected-schema-ver")) {
                expectedSchemaVersion = unpacker.unpackInt();
                continue;
            }
            if (key.equals("sql-update-counters")) {
                sqlUpdateCounters = unpacker.unpackLongArray();
                continue;
            }
            if (key.equals("delayed-ack")) {
                txId = unpacker.unpackUuid();
                continue;
            }
            unpacker.skipValues(1);
        }
        if (txId != null) {
            return new ClientDelayedAckException(traceId, code, errMsg, txId, causeWithStackTrace);
        }
        if (sqlUpdateCounters != null) {
            errMsg = errMsg != null ? errMsg : "SQL batch execution error";
            return new SqlBatchException(traceId, code, sqlUpdateCounters, errMsg, causeWithStackTrace);
        }
        if (code == ErrorGroups.Table.SCHEMA_VERSION_MISMATCH_ERR) {
            if (expectedSchemaVersion == -1) {
                return new IgniteException(traceId, ErrorGroups.Client.PROTOCOL_ERR, "Expected schema version is not specified in error extension map.", causeWithStackTrace);
            }
            return new ClientSchemaVersionMismatchException(traceId, code, errMsg, expectedSchemaVersion, causeWithStackTrace);
        }
        try {
            Class<?> errCls = Class.forName(errClassName);
            return ExceptionUtils.copyExceptionWithCause(errCls, traceId, code, errMsg, causeWithStackTrace);
        }
        catch (ClassNotFoundException classNotFoundException) {
            return new IgniteException(traceId, code, errClassName + ": " + errMsg, causeWithStackTrace);
        }
    }

    @Override
    public boolean closed() {
        return this.closed.get();
    }

    @Override
    public ProtocolContext protocolContext() {
        return this.protocolCtx;
    }

    @Override
    public ClientTransactionInflights inflights() {
        return this.inflights;
    }

    private static void validateConfiguration(ClientChannelConfiguration cfg) {
        String error = null;
        InetSocketAddress addr = cfg.getAddress();
        if (addr == null) {
            error = "At least one Ignite server node must be specified in the Ignite client configuration";
        }
        if (error != null) {
            throw new IllegalArgumentException(error);
        }
    }

    private CompletableFuture<Object> handshakeAsync(ProtocolVersion ver) throws IgniteClientConnectionException {
        CompletableFuture<ClientMessageUnpacker> fut = new CompletableFuture<ClientMessageUnpacker>();
        this.pendingReqs.put(-1L, new TimeoutObjectImpl(this.connectTimeout, fut));
        this.handshakeReqAsync(ver).addListener(f -> {
            if (!f.isSuccess()) {
                fut.completeExceptionally(new IgniteClientConnectionException(ErrorGroups.Client.CONNECTION_ERR, "Failed to send handshake request", this.endpoint(), f.cause()));
            }
        });
        CompletableFuture resFut = new CompletableFuture();
        fut.handle((unpacker, err) -> {
            this.completeAsync(r -> this.handshakeRes(r.in()), null, (ClientMessageUnpacker)unpacker, (Throwable)err, resFut);
            return null;
        });
        return resFut.exceptionally(err -> {
            if (ExceptionUtils.unwrapRootCause(err) instanceof TimeoutException) {
                this.metrics.handshakesFailedTimeoutIncrement();
                throw new IgniteClientConnectionException(ErrorGroups.Client.CONNECTION_ERR, "Handshake timeout", this.endpoint(), (Throwable)err);
            }
            this.metrics.handshakesFailedIncrement();
            throw new IgniteClientConnectionException(ErrorGroups.Client.CONNECTION_ERR, "Handshake error", this.endpoint(), (Throwable)err);
        });
    }

    private ChannelFuture handshakeReqAsync(ProtocolVersion proposedVer) {
        this.sock.send(Unpooled.wrappedBuffer(ClientMessageCommon.MAGIC_BYTES));
        ClientMessagePacker req = new ClientMessagePacker(this.sock.getBuffer());
        req.packInt(proposedVer.major());
        req.packInt(proposedVer.minor());
        req.packInt(proposedVer.patch());
        req.packInt(2);
        HandshakeUtils.packFeatures(req, SUPPORTED_FEATURES);
        IgniteClientAuthenticator authenticator = this.cfg.clientConfiguration().authenticator();
        if (authenticator != null) {
            Map<HandshakeExtension, Object> extensions = Map.of(HandshakeExtension.AUTHENTICATION_TYPE, authenticator.type(), HandshakeExtension.AUTHENTICATION_IDENTITY, authenticator.identity(), HandshakeExtension.AUTHENTICATION_SECRET, authenticator.secret());
            HandshakeUtils.packExtensions(req, extensions);
        } else {
            HandshakeUtils.packExtensions(req, Map.of());
        }
        return this.write(req);
    }

    @Nullable
    private Object handshakeRes(ClientMessageUnpacker unpacker) {
        try {
            ProtocolVersion srvVer = new ProtocolVersion(unpacker.unpackShort(), unpacker.unpackShort(), unpacker.unpackShort());
            if (!unpacker.tryUnpackNil()) {
                throw (RuntimeException)ExceptionUtils.sneakyThrow(TcpClientChannel.readError(unpacker));
            }
            long serverIdleTimeout = unpacker.unpackLong();
            UUID clusterNodeId = unpacker.unpackUuid();
            String clusterNodeName = unpacker.unpackString();
            InetSocketAddress addr = this.sock.remoteAddress();
            ClientClusterNode clusterNode = new ClientClusterNode(clusterNodeId, clusterNodeName, new NetworkAddress(addr.getHostName(), addr.getPort()));
            int clusterIdsLen = unpacker.unpackInt();
            if (clusterIdsLen <= 0) {
                throw new IgniteClientConnectionException(ErrorGroups.Client.PROTOCOL_ERR, "Unexpected cluster ids count: " + clusterIdsLen, this.endpoint());
            }
            ArrayList<UUID> clusterIds = new ArrayList<UUID>(clusterIdsLen);
            for (int i = 0; i < clusterIdsLen; ++i) {
                clusterIds.add(unpacker.unpackUuid());
            }
            String clusterName = unpacker.unpackString();
            long observableTimestamp = unpacker.unpackLong();
            this.observableTimestampListener.accept(observableTimestamp);
            byte major = unpacker.unpackByte();
            byte minor = unpacker.unpackByte();
            byte maintenance = unpacker.unpackByte();
            Byte patch = unpacker.unpackByteNullable();
            String preRelease = unpacker.unpackStringNullable();
            IgniteProductVersion nodeProductVersion = new IgniteProductVersion(major, minor, maintenance, patch, preRelease);
            BitSet serverFeatures = HandshakeUtils.unpackFeatures(unpacker);
            HandshakeUtils.unpackExtensions(unpacker);
            BitSet mutuallySupportedFeatures = HandshakeUtils.supportedFeatures(SUPPORTED_FEATURES, serverFeatures);
            EnumSet<ProtocolBitmaskFeature> features = ProtocolBitmaskFeature.enumSet(mutuallySupportedFeatures);
            this.protocolCtx = new ProtocolContext(srvVer, features, serverIdleTimeout, clusterNode, clusterIds, clusterName, nodeProductVersion);
            return null;
        }
        catch (Throwable e) {
            this.log.warn("Failed to handle handshake response [remoteAddress=" + this.cfg.getAddress() + "]: " + e.getMessage(), e);
            throw e;
        }
    }

    private ChannelFuture write(ClientMessagePacker packer) throws IgniteClientConnectionException {
        this.lastSendMillis = System.currentTimeMillis();
        ByteBuf buf = packer.getBuffer();
        return this.sock.send(buf);
    }

    private Timer initHeartbeat(long configuredInterval) {
        long heartbeatInterval = this.getHeartbeatInterval(configuredInterval);
        Timer timer = new Timer("tcp-client-channel-heartbeats-" + this.hashCode());
        timer.schedule((TimerTask)new HeartbeatTask(heartbeatInterval), heartbeatInterval, heartbeatInterval);
        return timer;
    }

    private long getHeartbeatInterval(long configuredInterval) {
        long serverIdleTimeoutMs = this.protocolCtx.serverIdleTimeout();
        if (serverIdleTimeoutMs <= 0L) {
            return configuredInterval;
        }
        long recommendedHeartbeatInterval = serverIdleTimeoutMs / 3L;
        if (recommendedHeartbeatInterval < 500L) {
            recommendedHeartbeatInterval = 500L;
        }
        return Math.min(configuredInterval, recommendedHeartbeatInterval);
    }

    public String toString() {
        return S.toString(TcpClientChannel.class.getSimpleName(), "remoteAddress", (Object)this.sock.remoteAddress(), false);
    }

    @Override
    public String endpoint() {
        return this.cfg.getAddress().toString();
    }

    private static void completeRequestFuture(CompletableFuture<ClientMessageUnpacker> fut, ClientMessageUnpacker unpacker) {
        unpacker.retain();
        try {
            if (!fut.complete(unpacker)) {
                unpacker.close();
            }
        }
        catch (Throwable t) {
            unpacker.close();
            throw t;
        }
    }

    private void completeNotificationFuture(CompletableFuture<PayloadInputChannel> fut, ClientMessageUnpacker unpacker, @Nullable Throwable err) {
        if (err != null) {
            this.asyncContinuationExecutor.execute(() -> fut.completeExceptionally(err));
            return;
        }
        unpacker.retain();
        try {
            this.asyncContinuationExecutor.execute(() -> {
                try {
                    if (!fut.complete(new PayloadInputChannel(this, unpacker, null))) {
                        unpacker.close();
                    }
                }
                catch (Throwable e) {
                    unpacker.close();
                    this.log.error("Failed to handle server notification [remoteAddress=" + this.cfg.getAddress() + "]: " + e.getMessage(), e);
                }
            });
        }
        catch (Throwable t) {
            unpacker.close();
            throw t;
        }
    }

    void checkTimeouts(long now) {
        for (Map.Entry req : this.pendingReqs.entrySet()) {
            TimeoutObject timeoutObject = (TimeoutObject)req.getValue();
            if (timeoutObject == null || timeoutObject.endTime() <= 0L || now <= timeoutObject.endTime()) continue;
            Object fut = timeoutObject.future();
            ((CompletableFuture)fut).completeExceptionally(new TimeoutException());
        }
    }

    private static class TimeoutObjectImpl
    implements TimeoutObject<CompletableFuture<ClientMessageUnpacker>> {
        private final long endTime;
        private final CompletableFuture<ClientMessageUnpacker> fut;

        private TimeoutObjectImpl(long timeout, CompletableFuture<ClientMessageUnpacker> fut) {
            this.endTime = timeout > 0L ? FastTimestamps.coarseCurrentTimeMillis() + timeout : 0L;
            this.fut = fut;
        }

        @Override
        public long endTime() {
            return this.endTime;
        }

        @Override
        public CompletableFuture<ClientMessageUnpacker> future() {
            return this.fut;
        }
    }

    private class HeartbeatTask
    extends TimerTask {
        private final long interval;

        HeartbeatTask(long interval) {
            this.interval = interval;
        }

        @Override
        public void run() {
            try {
                if (System.currentTimeMillis() - TcpClientChannel.this.lastSendMillis > this.interval) {
                    CompletableFuture fut = TcpClientChannel.this.serviceAsync(1, null, null, false);
                    if (TcpClientChannel.this.heartbeatTimeout > 0L) {
                        fut.orTimeout(TcpClientChannel.this.heartbeatTimeout, TimeUnit.MILLISECONDS).exceptionally(e -> {
                            if (e instanceof TimeoutException) {
                                long lastResponseAge = System.currentTimeMillis() - TcpClientChannel.this.lastReceiveMillis;
                                if (lastResponseAge < TcpClientChannel.this.heartbeatTimeout) {
                                    return null;
                                }
                                TcpClientChannel.this.log.warn("Heartbeat timeout, closing the channel [remoteAddress=" + TcpClientChannel.this.cfg.getAddress() + "]", new Object[0]);
                                TcpClientChannel.this.close(new IgniteClientConnectionException(ErrorGroups.Client.CONNECTION_ERR, "Heartbeat timeout", TcpClientChannel.this.endpoint(), (Throwable)e), false);
                            }
                            return null;
                        });
                    }
                }
            }
            catch (Throwable e2) {
                TcpClientChannel.this.log.warn("Failed to send heartbeat [remoteAddress=" + TcpClientChannel.this.cfg.getAddress() + "]: " + e2.getMessage(), e2);
            }
        }
    }
}

