package org.apache.ignite.internal.configuration.storage;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.configuration.annotation.ConfigurationType;
import org.apache.ignite.internal.configuration.util.ConfigurationSerializationUtil;
import org.apache.ignite.internal.future.InFlightFutures;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;

/* loaded from: input_file:org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.class */
public class DistributedConfigurationStorage implements ConfigurationStorage {
    private static final IgniteLogger LOG;
    private static final String DISTRIBUTED_PREFIX = "dst-cfg.";
    private static final ByteArray MASTER_KEY;
    private static final ByteArray DST_KEYS_START_RANGE;
    private final MetaStorageManager metaStorageMgr;
    private volatile ConfigurationStorageListener lsnr;
    private volatile long changeId;
    private final ExecutorService threadPool;
    private final InFlightFutures futureTracker = new InFlightFutures();
    static final /* synthetic */ boolean $assertionsDisabled;

    public DistributedConfigurationStorage(String str, MetaStorageManager metaStorageManager) {
        this.metaStorageMgr = metaStorageManager;
        this.threadPool = Executors.newFixedThreadPool(4, NamedThreadFactory.create(str, "dst-cfg", LOG));
    }

    public void close() {
        IgniteUtils.shutdownAndAwaitTermination(this.threadPool, 10L, TimeUnit.SECONDS);
        this.futureTracker.cancelInFlightFutures();
    }

    public CompletableFuture<Map<String, ? extends Serializable>> readAllLatest(String str) {
        ByteArray byteArray = new ByteArray("dst-cfg." + str);
        final CompletableFuture completableFuture = new CompletableFuture();
        this.metaStorageMgr.prefix(byteArray).subscribe(new Flow.Subscriber<Entry>() { // from class: org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage.1
            private final Map<String, Serializable> data = new HashMap();
            static final /* synthetic */ boolean $assertionsDisabled;

            @Override // java.util.concurrent.Flow.Subscriber
            public void onSubscribe(Flow.Subscription subscription) {
                subscription.request(Long.MAX_VALUE);
            }

            @Override // java.util.concurrent.Flow.Subscriber
            public void onNext(Entry entry) {
                byte[] key = entry.key();
                if (Arrays.equals(key, DistributedConfigurationStorage.MASTER_KEY.bytes())) {
                    return;
                }
                byte[] value = entry.value();
                if (!$assertionsDisabled && entry.tombstone()) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && value == null) {
                    throw new AssertionError();
                }
                this.data.put(new String(key, StandardCharsets.UTF_8).substring(DistributedConfigurationStorage.DISTRIBUTED_PREFIX.length()), ConfigurationSerializationUtil.fromBytes(value));
            }

            @Override // java.util.concurrent.Flow.Subscriber
            public void onError(Throwable th) {
                completableFuture.completeExceptionally(th);
            }

            @Override // java.util.concurrent.Flow.Subscriber
            public void onComplete() {
                completableFuture.complete(this.data);
            }

            static {
                $assertionsDisabled = !DistributedConfigurationStorage.class.desiredAssertionStatus();
            }
        });
        return registerFuture(completableFuture);
    }

    public CompletableFuture<Serializable> readLatest(String str) {
        return this.metaStorageMgr.get(new ByteArray("dst-cfg." + str)).thenApply(entry -> {
            byte[] value = entry.value();
            if (value == null) {
                return null;
            }
            return ConfigurationSerializationUtil.fromBytes(value);
        }).exceptionally(th -> {
            throw new StorageException("Exception while reading data from Meta Storage", th);
        });
    }

    public CompletableFuture<Data> readDataOnRecovery() throws StorageException {
        return registerFuture(this.metaStorageMgr.recoveryFinishedFuture().thenApplyAsync(revisions -> {
            return readDataOnRecovery0(revisions.revision());
        }, (Executor) this.threadPool));
    }

    private Data readDataOnRecovery0(long j) {
        HashMap hashMap = new HashMap();
        long j2 = 0;
        byte[] bytes = MASTER_KEY.bytes();
        boolean z = false;
        try {
            Cursor<Entry> prefixLocally = this.metaStorageMgr.prefixLocally(DST_KEYS_START_RANGE, j);
            try {
                for (Entry entry : prefixLocally) {
                    if (!entry.tombstone()) {
                        byte[] key = entry.key();
                        byte[] value = entry.value();
                        if (!$assertionsDisabled && value == null) {
                            throw new AssertionError();
                        }
                        if (z || !Arrays.equals(bytes, key)) {
                            int length = DST_KEYS_START_RANGE.length();
                            hashMap.put(new String(key, length, key.length - length, StandardCharsets.UTF_8), ConfigurationSerializationUtil.fromBytes(value));
                        } else {
                            z = true;
                            j2 = entry.revision();
                        }
                    }
                }
                if (prefixLocally != null) {
                    prefixLocally.close();
                }
                if (!$assertionsDisabled && !hashMap.isEmpty() && j2 <= 0) {
                    throw new AssertionError();
                }
                this.changeId = j2;
                return new Data(hashMap, j2);
            } finally {
            }
        } catch (Exception e) {
            throw new StorageException("Exception reading data on recovery", e);
        }
    }

    public CompletableFuture<Boolean> write(Map<String, ? extends Serializable> map, long j) {
        if (!$assertionsDisabled && j > this.changeId) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.lsnr == null) {
            throw new AssertionError("Configuration listener must be initialized before write.");
        }
        if (j < this.changeId) {
            return CompletableFutures.falseCompletedFuture();
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, ? extends Serializable> entry : map.entrySet()) {
            ByteArray byteArray = new ByteArray("dst-cfg." + entry.getKey());
            if (entry.getValue() != null) {
                arrayList.add(Operations.put(byteArray, ConfigurationSerializationUtil.toBytes(entry.getValue())));
            } else {
                arrayList.add(Operations.remove(byteArray));
            }
        }
        arrayList.add(Operations.put(MASTER_KEY, ByteUtils.longToBytes(j)));
        return this.metaStorageMgr.invoke(j == 0 ? Conditions.notExists(MASTER_KEY) : Conditions.revision(MASTER_KEY).eq(j), arrayList, List.of(Operations.noop()));
    }

    public void registerConfigurationListener(final ConfigurationStorageListener configurationStorageListener) {
        if (!$assertionsDisabled && this.lsnr != null) {
            throw new AssertionError();
        }
        this.lsnr = configurationStorageListener;
        this.metaStorageMgr.registerPrefixWatch(DST_KEYS_START_RANGE, new WatchListener() { // from class: org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage.2
            static final /* synthetic */ boolean $assertionsDisabled;

            public CompletableFuture<Void> onUpdate(WatchEvent watchEvent) {
                HashMap newHashMap = IgniteUtils.newHashMap(watchEvent.entryEvents().size() - 1);
                Entry entry = null;
                Iterator it = watchEvent.entryEvents().iterator();
                while (it.hasNext()) {
                    Entry newEntry = ((EntryEvent) it.next()).newEntry();
                    if (Arrays.equals(newEntry.key(), DistributedConfigurationStorage.MASTER_KEY.bytes())) {
                        entry = newEntry;
                    } else {
                        newHashMap.put(new String(newEntry.key(), StandardCharsets.UTF_8).substring(DistributedConfigurationStorage.DISTRIBUTED_PREFIX.length()), newEntry.value() == null ? null : ConfigurationSerializationUtil.fromBytes(newEntry.value()));
                    }
                }
                if (!$assertionsDisabled && entry == null) {
                    throw new AssertionError();
                }
                long revision = entry.revision();
                if (!$assertionsDisabled && revision <= DistributedConfigurationStorage.this.changeId) {
                    throw new AssertionError();
                }
                DistributedConfigurationStorage.this.changeId = revision;
                return configurationStorageListener.onEntriesChanged(new Data(newHashMap, revision));
            }

            public void onError(Throwable th) {
                DistributedConfigurationStorage.LOG.warn("Meta storage listener issue", th);
            }

            static {
                $assertionsDisabled = !DistributedConfigurationStorage.class.desiredAssertionStatus();
            }
        });
    }

    public ConfigurationType type() {
        return ConfigurationType.DISTRIBUTED;
    }

    public CompletableFuture<Long> lastRevision() {
        return this.metaStorageMgr.get(MASTER_KEY).thenApply((v0) -> {
            return v0.revision();
        });
    }

    public CompletableFuture<Long> localRevision() {
        return this.metaStorageMgr.recoveryFinishedFuture().thenApply(revisions -> {
            return Long.valueOf(this.metaStorageMgr.getLocally(MASTER_KEY, revisions.revision()).revision());
        });
    }

    private <T> CompletableFuture<T> registerFuture(CompletableFuture<T> completableFuture) {
        this.futureTracker.registerFuture(completableFuture);
        return completableFuture;
    }

    static {
        $assertionsDisabled = !DistributedConfigurationStorage.class.desiredAssertionStatus();
        LOG = Loggers.forClass(DistributedConfigurationStorage.class);
        MASTER_KEY = new ByteArray("dst-cfg.$master$key");
        DST_KEYS_START_RANGE = new ByteArray(DISTRIBUTED_PREFIX);
    }
}
