/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.dr.fst;

import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.util.worker.GridWorkerListener;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.gridgain.grid.internal.processors.dr.fst.Batch;
import org.gridgain.grid.internal.processors.dr.fst.StateTransferTask;
import org.gridgain.grid.internal.processors.dr.fst.StateTransferWorker;

public class DrStateTransferWorkerPool {
    private final ExecutorService exec;
    private final IgniteLogger log;
    private final Collection<GridWorker> workers = new GridConcurrentHashSet<GridWorker>();

    public DrStateTransferWorkerPool(GridKernalContext ctx, int poolSize, IgniteLogger log, CountDownLatch started, BlockingQueue<StateTransferTask<Batch>> taskQ) {
        this.log = log;
        this.exec = new IgniteThreadPoolExecutor("dr-state-transfer-pool", ctx.igniteInstanceName(), poolSize, poolSize, 60000L, new LinkedBlockingQueue<Runnable>());
        StateTransferWorkerListener lsnr = new StateTransferWorkerListener(ctx.workersRegistry());
        for (int i = 0; i < poolSize; ++i) {
            this.workers.add(new StateTransferWorker(ctx, started, taskQ, (GridWorkerListener)lsnr));
        }
    }

    public void start() {
        for (GridWorker w : this.workers) {
            try {
                this.exec.execute(w);
            }
            catch (Throwable e) {
                this.workers.remove(w);
                if (e instanceof Error) {
                    throw e;
                }
                throw new IgniteException("Failed to execute worker due to runtime exception.", e);
            }
        }
    }

    public void shutdown() {
        this.exec.shutdown();
        U.cancel(this.workers);
    }

    public void awaitTermination() {
        U.join(this.workers, this.log);
    }

    private class StateTransferWorkerListener
    implements GridWorkerListener {
        private final GridWorkerListener delegate;

        StateTransferWorkerListener(GridWorkerListener delegate) {
            this.delegate = delegate;
        }

        @Override
        public void onStarted(GridWorker w) {
            this.delegate.onStarted(w);
        }

        @Override
        public void onIdle(GridWorker w) {
            this.delegate.onIdle(w);
        }

        @Override
        public void onStopped(GridWorker w) {
            DrStateTransferWorkerPool.this.workers.remove(w);
            this.delegate.onStopped(w);
        }
    }
}

