/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.network.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import org.apache.ignite.internal.network.configuration.AckView;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;

public class InboundRecoveryHandler
extends ChannelInboundHandlerAdapter {
    public static final String NAME = "inbound-recovery-handler";
    private final RecoveryDescriptor descriptor;
    private final NetworkMessagesFactory factory;
    private final long syncAckThreshold;
    private final long postponeAckMillis;
    private boolean scheduleAcknowledgement = true;
    private long lastSentReceivedCount = 0L;

    public InboundRecoveryHandler(RecoveryDescriptor descriptor, NetworkMessagesFactory factory, AckView ackCfg) {
        this.descriptor = descriptor;
        this.factory = factory;
        this.syncAckThreshold = ackCfg.syncAckThreshold();
        this.postponeAckMillis = ackCfg.postponeAckMillis();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NetworkMessage message = (NetworkMessage)msg;
        if (message instanceof AcknowledgementMessage) {
            AcknowledgementMessage ackMessage = (AcknowledgementMessage)msg;
            long receivedMessages = ackMessage.receivedMessages();
            this.descriptor.acknowledge(receivedMessages);
        } else if (message.needAck()) {
            this.sendOrScheduleAcknowledgement(ctx);
        }
        super.channelRead(ctx, message);
    }

    private void sendOrScheduleAcknowledgement(ChannelHandlerContext ctx) {
        long receiveCnt = this.descriptor.onReceive();
        if (receiveCnt - this.lastSentReceivedCount > this.syncAckThreshold) {
            this.sendAcknowledgement(ctx, receiveCnt);
        } else if (this.scheduleAcknowledgement) {
            this.scheduleAcknowledgement = false;
            ctx.channel().eventLoop().schedule(() -> {
                this.scheduleAcknowledgement = true;
                this.sendAcknowledgement(ctx, this.descriptor.receivedCount());
            }, this.postponeAckMillis, TimeUnit.MILLISECONDS);
        }
    }

    private void sendAcknowledgement(ChannelHandlerContext ctx, long receiveCnt) {
        AcknowledgementMessage ackMsg = this.factory.acknowledgementMessage().receivedMessages(receiveCnt).build();
        ctx.channel().writeAndFlush(new OutNetworkObject(ackMsg, Collections.emptyList()));
        this.lastSentReceivedCount = ackMsg.receivedMessages();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.descriptor.release(ctx);
        super.channelInactive(ctx);
    }
}

