/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.dr;

import io.netty.channel.Channel;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.util.CompletableFutures;
import org.gridgain.dr.configuration.DrReceiverServerView;
import org.gridgain.internal.dr.DrEntryConverter;
import org.gridgain.internal.dr.DrMessageHandler;
import org.gridgain.internal.dr.DrReceiverMetadataManager;
import org.gridgain.internal.dr.DrReceiverServer;
import org.gridgain.internal.dr.DrUpdate;
import org.gridgain.internal.dr.DrUpdateHandler;
import org.gridgain.internal.dr.DrUtils;
import org.gridgain.internal.dr.MarshallerContext;
import org.gridgain.internal.dr.binary.BinaryMetadata;
import org.gridgain.internal.dr.binary.BinaryObject;
import org.gridgain.internal.dr.common.GridCacheRawVersionedEntry;
import org.gridgain.internal.dr.mapping.TupleColumnWriter;
import org.gridgain.internal.dr.mapping.TupleColumnWriterFactory;
import org.gridgain.internal.dr.mapping.TupleColumnWriterFactoryImpl;
import org.gridgain.internal.dr.messages.DrExternalBatchRequest;
import org.gridgain.internal.dr.messages.DrExternalMetadataRequest;
import org.gridgain.internal.dr.metrics.DrBatchEventListener;
import org.gridgain.internal.dr.metrics.DrReceiverMetricsManager;
import org.jetbrains.annotations.TestOnly;

public class DrReceiver {
    private static final IgniteLogger LOG = Loggers.forClass(DrReceiverServer.class);
    private final DrReceiverServer server;
    private final MarshallerContext marshallerContext;
    private final DrReceiverMetadataManager metadataManager;
    private final TupleColumnWriterFactory fieldWriterFactory;
    private final DrUpdateHandler drUpdateHandler;
    private final DrReceiverMetricsManager metricsManager;

    @TestOnly
    public DrReceiver(DrReceiverServerView cfg, DrUpdateHandler updateHandler) {
        this(cfg, updateHandler, new TupleColumnWriterFactoryImpl(ZoneId.systemDefault(), Map.of()));
    }

    public DrReceiver(DrReceiverServerView cfg, DrUpdateHandler updateHandler, TupleColumnWriterFactory fieldWriterFactory) {
        Objects.requireNonNull(cfg);
        this.metricsManager = new DrReceiverMetricsManager(cfg);
        this.marshallerContext = new MarshallerContext();
        this.metadataManager = new DrReceiverMetadataManager(this.marshallerContext.binaryContext());
        this.drUpdateHandler = updateHandler;
        this.fieldWriterFactory = fieldWriterFactory;
        this.server = new DrReceiverServer(cfg, new DrMessageHandler(cfg){

            @Override
            protected void onMetadataRequest(Channel channel, DrExternalMetadataRequest req) {
                DrReceiver.this.processMetadataRequest(req);
            }

            @Override
            protected CompletableFuture<Void> onBatchRequest(Channel channel, DrExternalBatchRequest req) {
                DrBatchEventListener cacheMetrics = DrReceiver.this.metricsManager.getOrCreateCacheMetrics(req.dataCenterId(), req.cacheName());
                cacheMetrics.onReceived(req.entryCount(), req.dataSize());
                List<GridCacheRawVersionedEntry> entries = DrUtils.unmarshalEntries(req);
                return DrReceiver.this.processEntries(req.dataCenterId(), req.cacheName(), entries).whenComplete((r, t) -> cacheMetrics.onProcessed(req.dataSize()));
            }
        });
    }

    private void processMetadataRequest(DrExternalMetadataRequest req) {
        req.binaryMetadata().forEach(meta -> this.metadataManager.registerMetadata((BinaryMetadata)meta.value()));
    }

    private CompletableFuture<Void> processEntries(byte datacenterId, String cacheName, List<GridCacheRawVersionedEntry> entries) {
        TupleColumnWriter tupleFieldWriter = this.fieldWriterFactory.getWriter(cacheName);
        DrEntryConverter converter = new DrEntryConverter(this.marshallerContext.binaryContext(), tupleFieldWriter);
        ArrayList<DrUpdate> drEntries = new ArrayList<DrUpdate>(entries.size());
        for (GridCacheRawVersionedEntry entry : entries) {
            entry.unmarshal(this.marshallerContext.binaryMarshaller());
            if (this.hasMissedMetadataFor(entry)) {
                return this.waitMissedMetadataFor(entry).thenCompose(ignore -> this.processEntries(datacenterId, cacheName, entries));
            }
            drEntries.add(converter.convert(entry));
        }
        return this.drUpdateHandler.applyUpdates(tupleFieldWriter.tableName(), drEntries);
    }

    private CompletableFuture<?> waitMissedMetadataFor(GridCacheRawVersionedEntry entry) {
        int typeId;
        Object key = entry.key();
        Object value = entry.value();
        CompletionStage metadataReadyFuture = CompletableFutures.nullCompletedFuture();
        if (key instanceof BinaryObject) {
            typeId = ((BinaryObject)key).typeId();
            LOG.debug("Awaiting metadata for binary key object: typeId={}", typeId);
            metadataReadyFuture = metadataReadyFuture.thenCompose(ignore -> this.metadataManager.metadataAsync(typeId));
        }
        if (value instanceof BinaryObject) {
            typeId = ((BinaryObject)value).typeId();
            LOG.debug("Awaiting metadata for binary key object: typeId={}", typeId);
            metadataReadyFuture = metadataReadyFuture.thenCompose(ignore -> this.metadataManager.metadataAsync(typeId));
        }
        return metadataReadyFuture;
    }

    private boolean hasMissedMetadataFor(GridCacheRawVersionedEntry entry) {
        Object key = entry.key();
        Object value = entry.value();
        return key instanceof BinaryObject && this.metadataManager.metadata(((BinaryObject)key).typeId()) == null || value instanceof BinaryObject && this.metadataManager.metadata(((BinaryObject)value).typeId()) == null;
    }

    public void start() {
        this.metricsManager.start();
        this.server.start();
    }

    public void stop() {
        if (this.server != null) {
            this.server.stop();
        }
        if (this.metadataManager != null) {
            this.metadataManager.stop();
        }
        this.metricsManager.stop();
    }

    @TestOnly
    public DrReceiverMetadataManager metadataManager() {
        return this.metadataManager;
    }
}

