/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.example.streaming;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.catalog.ColumnType;
import org.apache.ignite.catalog.definitions.ColumnDefinition;
import org.apache.ignite.catalog.definitions.TableDefinition;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.table.DataStreamerReceiver;
import org.apache.ignite.table.DataStreamerReceiverContext;
import org.apache.ignite.table.DataStreamerReceiverDescriptor;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;

public class DistributedComputeWithReceiverExample {
    public static void main(String[] arg) {
        try (IgniteClient client = IgniteClient.builder().addresses(new String[]{"127.0.0.1:10800"}).build();){
            CompletableFuture streamerFut;
            List sourceData = IntStream.range(1, 10).mapToObj(i -> Tuple.create().set("txId", (Object)i).set("txData", (Object)"{some-json-data}")).collect(Collectors.toList());
            DataStreamerReceiverDescriptor desc = DataStreamerReceiverDescriptor.builder(FraudDetectorReceiver.class).build();
            TableDefinition txDummyTableDef = TableDefinition.builder((String)"tx_dummy").columns(new ColumnDefinition[]{ColumnDefinition.column((String)"id", (ColumnType)ColumnType.INTEGER)}).primaryKey(new String[]{"id"}).build();
            Table dummyTable = client.catalog().createTable(txDummyTableDef);
            Function<Tuple, Tuple> keyFunc = sourceItem -> Tuple.create().set("id", sourceItem.value("txId"));
            Function payloadFunc = Function.identity();
            Flow.Subscriber<Tuple> resultSubscriber = new Flow.Subscriber<Tuple>(){

                @Override
                public void onSubscribe(Flow.Subscription subscription) {
                    subscription.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Tuple item) {
                    System.out.println("Transaction processed: " + item);
                }

                @Override
                public void onError(Throwable throwable) {
                    System.err.println("Error during streaming: " + throwable.getMessage());
                }

                @Override
                public void onComplete() {
                    System.out.println("Streaming completed.");
                }
            };
            try (SubmissionPublisher<Tuple> publisher = new SubmissionPublisher<Tuple>();){
                streamerFut = dummyTable.recordView().streamData(publisher, desc, keyFunc, payloadFunc, null, (Flow.Subscriber)resultSubscriber, null);
                for (Tuple item : sourceData) {
                    publisher.submit(item);
                }
            }
            streamerFut.join();
        }
    }

    private static class FraudDetectorReceiver
    implements DataStreamerReceiver<Tuple, Void, Tuple> {
        private FraudDetectorReceiver() {
        }

        public CompletableFuture<List<Tuple>> receive(List<Tuple> page, DataStreamerReceiverContext ctx, Void arg) {
            ArrayList<Tuple> results = new ArrayList<Tuple>(page.size());
            for (Tuple tx : page) {
                results.add(FraudDetectorReceiver.detectFraud(tx));
            }
            return CompletableFuture.completedFuture(results);
        }

        private static Tuple detectFraud(Tuple txInfo) {
            double fraudRisk = Math.random();
            return txInfo.set("fraudRisk", (Object)fraudRisk);
        }
    }
}

