/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.internal.cdc.core.exec;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite3.table.TableRowEvent;
import org.apache.ignite3.table.TableRowEventBatch;
import org.apache.ignite3.table.Tuple;
import org.gridgain.internal.cdc.api.sink.TableSink;

class CdcReplicationSubscriber
implements Flow.Subscriber<TableRowEventBatch<Map.Entry<Tuple, Tuple>>> {
    private final AtomicReference<Flow.Subscription> subscription = new AtomicReference();
    private final TableSink sink;

    CdcReplicationSubscriber(TableSink sink) {
        this.sink = sink;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        if (this.subscription.compareAndSet(null, subscription)) {
            subscription.request(Long.MAX_VALUE);
        }
    }

    @Override
    public void onNext(TableRowEventBatch<Map.Entry<Tuple, Tuple>> item) {
        ArrayList<Map.Entry<Tuple, Tuple>> entriesToInsert = new ArrayList<Map.Entry<Tuple, Tuple>>();
        ArrayList<Map.Entry<Tuple, Tuple>> entriesToUpdate = new ArrayList<Map.Entry<Tuple, Tuple>>();
        ArrayList<Tuple> entriesToDelete = new ArrayList<Tuple>();
        block5: for (TableRowEvent<Map.Entry<Tuple, Tuple>> event : item.rows()) {
            switch (event.type()) {
                case CREATED: {
                    entriesToInsert.add(event.entry());
                    continue block5;
                }
                case UPDATED: {
                    entriesToUpdate.add(event.entry());
                    continue block5;
                }
                case REMOVED: 
                case ARCHIVED: {
                    entriesToDelete.add(event.oldEntry().getKey());
                    continue block5;
                }
            }
            throw new IllegalStateException("Unexpected value: " + event.type());
        }
        this.sink.writeKvBatch(entriesToInsert);
        this.sink.updateKvBatch(entriesToUpdate);
        this.sink.removeBatch(entriesToDelete);
        this.sink.flush();
    }

    @Override
    public void onError(Throwable throwable) {
        Flow.Subscription sub = this.subscription.getAndSet(null);
        if (sub != null) {
            sub.cancel();
            this.sink.close();
        }
    }

    @Override
    public void onComplete() {
        Flow.Subscription sub = this.subscription.getAndSet(null);
        if (sub != null) {
            sub.cancel();
            this.sink.close();
        }
    }

    public void stop() {
        Flow.Subscription sub = this.subscription.getAndSet(null);
        if (sub != null) {
            sub.cancel();
            this.sink.close();
        }
    }
}

