/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.client.table;

import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.IgniteClientConfiguration;
import org.apache.ignite.internal.client.ClientUtils;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.util.CompletableFutures;

public class WriteBehindService {
    private final IgniteLogger log;
    private final AtomicBoolean isStarted = new AtomicBoolean();
    private final Queue<CompletableFuture<Void>> queue = new LinkedBlockingQueue<CompletableFuture<Void>>();
    private final int writeBehindParallelOperations;

    public WriteBehindService(IgniteClientConfiguration cfg) {
        this.log = ClientUtils.logger(cfg, WriteBehindService.class);
        this.writeBehindParallelOperations = Optional.ofNullable(cfg.cacheConfiguration()).map(ClientCacheConfiguration::cacheWriteBehindParallelOperations).orElse(1024);
        this.isStarted.compareAndSet(false, true);
    }

    public CompletableFuture<Void> enqueue(CompletableFuture<Void> fut) {
        if (!this.isStarted.get()) {
            return fut;
        }
        if (fut.isDone()) {
            return fut;
        }
        this.cleanup();
        if (this.queue.size() >= this.writeBehindParallelOperations) {
            return fut;
        }
        this.queue.add(fut);
        return CompletableFutures.nullCompletedFuture();
    }

    public CompletableFuture<Void> stopAsync() {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Waiting for write behind service...", new Object[0]);
        }
        this.isStarted.set(false);
        return this.queue.isEmpty() ? CompletableFutures.nullCompletedFuture() : CompletableFuture.allOf((CompletableFuture[])this.queue.toArray(CompletableFuture[]::new)).thenAccept(unused -> {
            assert (this.queue.stream().allMatch(CompletableFuture::isDone));
            this.queue.clear();
        });
    }

    private void cleanup() {
        CompletableFuture<Void> head = this.queue.peek();
        while (head != null && head.isDone()) {
            this.queue.poll();
            head = this.queue.peek();
        }
    }
}

