/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.example.streaming;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.example.streaming.Account;
import org.apache.ignite.table.DataStreamerItem;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.RecordView;

public class SingleTableDataStreamerExample {
    private static final int ACCOUNTS_COUNT = 10;

    public static void main(String[] arg) {
        try (IgniteClient client = IgniteClient.builder().addresses(new String[]{"127.0.0.1:10800"}).build();){
            System.out.println("Creating Accounts table");
            client.sql().execute("CREATE TABLE IF NOT EXISTS ACCOUNTS (id INT PRIMARY KEY, name VARCHAR(255), balance BIGINT, active BOOLEAN);", new Object[0]);
            RecordView view = client.tables().table("Accounts").recordView(Account.class);
            SingleTableDataStreamerExample.streamAccountDataPut((RecordView<Account>)view);
            SingleTableDataStreamerExample.verifyPut((RecordView<Account>)view);
            SingleTableDataStreamerExample.streamAccountDataRemove((RecordView<Account>)view);
            SingleTableDataStreamerExample.verifyRemove((RecordView<Account>)view);
            System.out.println("Dropping Accounts table.");
            client.sql().execute("DROP TABLE IF EXISTS ACCOUNTS;", new Object[0]);
        }
    }

    private static void streamAccountDataPut(RecordView<Account> view) {
        CompletableFuture streamerFut;
        DataStreamerOptions options = DataStreamerOptions.builder().pageSize(1000).perPartitionParallelOperations(1).autoFlushInterval(1000).retryLimit(16).build();
        try (SubmissionPublisher<DataStreamerItem> publisher = new SubmissionPublisher<DataStreamerItem>();){
            streamerFut = view.streamData(publisher, options);
            ThreadLocalRandom rnd = ThreadLocalRandom.current();
            for (int i = 0; i < 10; ++i) {
                Account entry = new Account(i, "name" + i, rnd.nextLong(100000L), rnd.nextBoolean());
                publisher.submit(DataStreamerItem.of((Object)entry));
            }
        }
        streamerFut.join();
    }

    private static void streamAccountDataRemove(RecordView<Account> view) {
        CompletableFuture streamerFut;
        DataStreamerOptions options = DataStreamerOptions.builder().pageSize(1000).perPartitionParallelOperations(1).autoFlushInterval(1000).retryLimit(16).build();
        try (SubmissionPublisher<DataStreamerItem> publisher = new SubmissionPublisher<DataStreamerItem>();){
            streamerFut = view.streamData(publisher, options);
            for (int i = 0; i < 10; ++i) {
                Account entry = new Account(i);
                publisher.submit(DataStreamerItem.removed((Object)entry));
            }
        }
        streamerFut.join();
    }

    private static void verifyPut(RecordView<Account> view) {
        System.out.println("=== Table data after PUT ===");
        for (int i = 0; i < 10; ++i) {
            Account keyRec = new Account(i);
            if (view.contains(null, (Object)keyRec)) {
                Account record = (Account)view.get(null, (Object)keyRec);
                System.out.printf("Found: id=%d, name=%s, balance=%d, active=%b%n", record.getId(), record.getName(), record.getBalance(), record.isActive());
                continue;
            }
            System.out.printf("Missing id=%d%n", i);
        }
    }

    private static void verifyRemove(RecordView<Account> view) {
        System.out.println("=== Table data after REMOVE ===");
        List keys = IntStream.range(0, 10).mapToObj(Account::new).collect(Collectors.toList());
        List records = view.getAll(null, keys);
        for (int i = 0; i < records.size(); ++i) {
            System.out.printf("id=%d exists? %b%n", i, records.get(i) != null);
        }
    }
}

