/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.cdc.core.exec;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite3.internal.continuousquery.ContinuousQueryEventWatermark;
import org.apache.ignite3.internal.continuousquery.TableRowEventImpl;
import org.apache.ignite3.internal.lang.IgniteInternalException;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.table.ContinuousQueryOptions;
import org.apache.ignite3.table.KeyValueView;
import org.apache.ignite3.table.Table;
import org.apache.ignite3.table.TableRowEvent;
import org.apache.ignite3.table.TableRowEventBatch;
import org.apache.ignite3.table.Tuple;
import org.gridgain.internal.cdc.api.sink.TableSink;
import org.gridgain.internal.cdc.core.replication.CdcReplicationProgressMappings;
import org.jetbrains.annotations.Nullable;

class CdcReplicationSubscriber
implements Flow.Subscriber<TableRowEventBatch<Map.Entry<Tuple, Tuple>>>,
AutoCloseable {
    private final AtomicReference<Flow.Subscription> subscription = new AtomicReference();
    private final TableSink sink;
    private final String replicationName;
    private final String tableName;
    private final Table replicationsProgressTable;
    private KeyValueView<Tuple, Tuple> replicationsProgressKv;
    private KeyValueView<Tuple, Tuple> sourceIgniteKv;

    CdcReplicationSubscriber(TableSink sink, String replicationName, String tableName, Table replicationsProgressTable) {
        this.sink = sink;
        this.replicationName = replicationName;
        this.tableName = tableName;
        this.replicationsProgressTable = replicationsProgressTable;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        if (this.subscription.compareAndSet(null, subscription)) {
            this.replicationsProgressKv = this.replicationsProgressTable.keyValueView();
            subscription.request(Long.MAX_VALUE);
        }
    }

    @Override
    public void onNext(TableRowEventBatch<Map.Entry<Tuple, Tuple>> item) {
        ArrayList<Map.Entry<Tuple, Tuple>> entriesToInsert = new ArrayList<Map.Entry<Tuple, Tuple>>();
        ArrayList<Map.Entry<Tuple, Tuple>> entriesToUpdate = new ArrayList<Map.Entry<Tuple, Tuple>>();
        ArrayList<Tuple> entriesToDelete = new ArrayList<Tuple>();
        block5: for (TableRowEvent<Map.Entry<Tuple, Tuple>> event : item.rows()) {
            switch (event.type()) {
                case CREATED: {
                    entriesToInsert.add(event.entry());
                    continue block5;
                }
                case UPDATED: {
                    entriesToUpdate.add(event.entry());
                    continue block5;
                }
                case REMOVED: 
                case ARCHIVED: {
                    entriesToDelete.add(event.oldEntry().getKey());
                    continue block5;
                }
            }
            throw new IllegalStateException("Unexpected value: " + event.type());
        }
        this.sink.writeKvBatch(entriesToInsert);
        this.sink.updateKvBatch(entriesToUpdate);
        this.sink.removeBatch(entriesToDelete);
        this.sink.flush();
        this.saveWatermark(CdcReplicationSubscriber.watermarkAsBytes(item));
    }

    private void saveWatermark(byte @Nullable [] watermarkBytes) {
        this.replicationsProgressKv.put(null, CdcReplicationProgressMappings.keyTuple(this.replicationName, this.tableName), CdcReplicationProgressMappings.valueTuple(watermarkBytes == null ? new byte[]{} : watermarkBytes));
    }

    private static byte @Nullable [] watermarkAsBytes(TableRowEventBatch<Map.Entry<Tuple, Tuple>> item) {
        TableRowEvent<Map.Entry<Tuple, Tuple>> lastEvent;
        byte[] watermarkBytes = null;
        List<TableRowEvent<Map.Entry<Tuple, Tuple>>> rows = item.rows();
        if (!rows.isEmpty() && (lastEvent = rows.get(rows.size() - 1)) instanceof TableRowEventImpl) {
            watermarkBytes = ((TableRowEventImpl)lastEvent).watermarkBytes();
        }
        return watermarkBytes;
    }

    @Override
    public void onError(Throwable throwable) {
        this.stop();
    }

    @Override
    public void onComplete() {
        this.stop();
    }

    @Override
    public void close() throws Exception {
        this.stop();
    }

    void startCq(Table table, byte @Nullable [] lastWatermarkBytes) {
        try {
            if (this.sourceIgniteKv != null) {
                throw new IgniteInternalException("CDC Replication subscription is already started for " + this.replicationName);
            }
            this.sourceIgniteKv = table.keyValueView();
            this.sourceIgniteKv.queryContinuously(this, CdcReplicationSubscriber.createCqOptions(lastWatermarkBytes));
        }
        catch (Exception ex) {
            throw new IgniteInternalException(ex);
        }
    }

    @Nullable
    private static ContinuousQueryOptions createCqOptions(byte @Nullable [] lastWatermarkBytes) {
        if (lastWatermarkBytes != null) {
            ContinuousQueryEventWatermark watermark = ContinuousQueryEventWatermark.fromBytes(lastWatermarkBytes);
            return watermark != null ? ContinuousQueryOptions.builder().watermark(watermark).build() : null;
        }
        return null;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void stop() {
        Flow.Subscription sub = this.subscription.getAndSet(null);
        if (sub == null) return;
        sub.cancel();
        try {
            this.sink.close();
        }
        catch (Exception e) {
            try {
                throw new IgniteInternalException(e);
            }
            catch (Throwable throwable) {
                try {
                    IgniteUtils.closeAll(this.replicationsProgressKv, this.sourceIgniteKv);
                    throw throwable;
                }
                catch (Exception e2) {
                    throw new IgniteInternalException(e2);
                }
            }
        }
        try {
            IgniteUtils.closeAll(this.replicationsProgressKv, this.sourceIgniteKv);
            return;
        }
        catch (Exception e) {
            throw new IgniteInternalException(e);
        }
    }
}

