package org.apache.ignite3.internal.eventlog.impl;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.security.GeneralSecurityException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.apache.ignite3.internal.eventlog.api.Event;
import org.apache.ignite3.internal.eventlog.api.Sink;
import org.apache.ignite3.internal.eventlog.config.schema.WebhookSinkRetryPolicyView;
import org.apache.ignite3.internal.eventlog.config.schema.WebhookSinkView;
import org.apache.ignite3.internal.eventlog.ser.EventSerializer;
import org.apache.ignite3.internal.logger.IgniteLogger;
import org.apache.ignite3.internal.logger.Loggers;
import org.apache.ignite3.internal.network.configuration.SslView;
import org.apache.ignite3.internal.network.ssl.KeystoreLoader;
import org.apache.ignite3.internal.rest.constants.HttpCode;
import org.apache.ignite3.internal.rest.constants.MediaType;
import org.apache.ignite3.internal.thread.NamedThreadFactory;
import org.apache.ignite3.internal.util.IgniteUtils;
import org.apache.ignite3.lang.ErrorGroups;
import org.apache.ignite3.lang.IgniteException;
import org.jetbrains.annotations.TestOnly;

/* loaded from: input_file:org/apache/ignite3/internal/eventlog/impl/WebhookSink.class */
class WebhookSink implements Sink<WebhookSinkView> {
    private static final IgniteLogger LOG = Loggers.forClass(WebhookSink.class);
    private static final Set<Integer> RETRYABLE_STATUSES = Set.of(Integer.valueOf(HttpCode.TOO_MANY_REQUESTS.code()), Integer.valueOf(HttpCode.BAD_GATEWAY.code()), Integer.valueOf(HttpCode.SERVICE_UNAVAILABLE.code()), Integer.valueOf(HttpCode.GATEWAY_TIMEOUT.code()));
    private final WebhookSinkView cfg;
    private final EventSerializer serializer;
    private final Supplier<UUID> clusterIdSupplier;
    private final String nodeName;
    private final HttpClient client;
    private final BlockingQueue<Event> events;
    private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("eventlog-webhook-sink", LOG));
    private long lastSendMillis;

    /* JADX INFO: Access modifiers changed from: package-private */
    public WebhookSink(WebhookSinkView webhookSinkView, EventSerializer eventSerializer, Supplier<UUID> supplier, String str) {
        this.cfg = webhookSinkView;
        this.serializer = eventSerializer;
        this.clusterIdSupplier = supplier;
        this.nodeName = str;
        this.events = new LinkedBlockingQueue(webhookSinkView.queueSize());
        this.client = configureClient(webhookSinkView);
        this.executorService.scheduleAtFixedRate(this::tryToSendBatch, webhookSinkView.batchSendFrequencyMillis(), webhookSinkView.batchSendFrequencyMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.ignite3.internal.eventlog.api.Sink
    public void stop() {
        if (this.executorService != null) {
            if (!this.events.isEmpty()) {
                this.executorService.execute(this::tryToSendBatch);
            }
            IgniteUtils.shutdownAndAwaitTermination(this.executorService, 1L, TimeUnit.SECONDS);
        }
        this.events.clear();
    }

    @Override // org.apache.ignite3.internal.eventlog.api.Sink
    public void write(Event event) {
        while (!this.events.offer(event)) {
            this.events.poll();
        }
        if (this.events.size() >= this.cfg.batchSize()) {
            this.executorService.execute(this::tryToSendBatch);
        }
    }

    @TestOnly
    BlockingQueue<Event> getEvents() {
        return this.events;
    }

    public long getLastSendMillis() {
        return this.lastSendMillis;
    }

    private void sendInternal(Collection<Event> collection) {
        HttpResponse send;
        WebhookSinkRetryPolicyView retryPolicy = this.cfg.retryPolicy();
        int i = 0;
        Throwable th = null;
        do {
            if (i > 0) {
                try {
                    Thread.sleep(Math.min((long) (retryPolicy.initBackoffMillis() * Math.pow(retryPolicy.backoffMultiplier(), i)), retryPolicy.maxBackoffMillis()));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            try {
                send = this.client.send(createRequest(this.serializer.serialize(collection)), HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
            } catch (Throwable th2) {
                LOG.trace("Failed to send events to webhook, will retry attempt [name={}, retry={}]", this.cfg.endpoint(), Integer.valueOf(i), th2);
                th = th2;
            }
            if (!RETRYABLE_STATUSES.contains(Integer.valueOf(send.statusCode()))) {
                LOG.trace("Successfully send events to webhook [name={}, eventCount={}]", this.cfg.endpoint(), Integer.valueOf(collection.size()));
                th = null;
                break;
            } else {
                LOG.trace("Failed to send events to webhook, will retry attempt [name={}, retry={}, statusCode={}]", this.cfg.endpoint(), Integer.valueOf(i), Integer.valueOf(send.statusCode()));
                i++;
            }
        } while (i < retryPolicy.maxAttempts());
        if (th != null) {
            LOG.warn("Failed to send events to webhook [name={}, eventCount={} lastError={}]", this.cfg.endpoint(), Integer.valueOf(collection.size()), th.getMessage());
        }
    }

    private void tryToSendBatch() {
        if (this.events.isEmpty()) {
            this.lastSendMillis = System.currentTimeMillis();
            return;
        }
        ArrayList arrayList = new ArrayList(this.cfg.batchSize());
        while (!this.events.isEmpty()) {
            if (this.events.size() < this.cfg.batchSize() && System.currentTimeMillis() - this.lastSendMillis <= this.cfg.batchSendFrequencyMillis()) {
                return;
            }
            this.events.drainTo(arrayList, this.cfg.batchSize());
            sendInternal(arrayList);
            arrayList.clear();
            this.lastSendMillis = System.currentTimeMillis();
        }
    }

    private HttpRequest createRequest(byte[] bArr) {
        return HttpRequest.newBuilder(URI.create(this.cfg.endpoint())).header("Content-Type", MediaType.APPLICATION_JSON).header("X-SINK-CLUSTER-ID", String.valueOf(this.clusterIdSupplier.get())).header("X-SINK-NODE-NAME", this.nodeName).POST(HttpRequest.BodyPublishers.ofByteArray(bArr)).build();
    }

    private static HttpClient configureClient(WebhookSinkView webhookSinkView) {
        HttpClient.Builder newBuilder = HttpClient.newBuilder();
        if (webhookSinkView.ssl().enabled()) {
            newBuilder.sslContext(createClientSslContext(webhookSinkView.ssl(), createTrustManagerFactory(webhookSinkView.ssl())));
        }
        return newBuilder.build();
    }

    private static TrustManagerFactory createTrustManagerFactory(SslView sslView) {
        try {
            TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            trustManagerFactory.init(KeystoreLoader.load(sslView.trustStore()));
            return trustManagerFactory;
        } catch (IOException | GeneralSecurityException e) {
            throw new IgniteException(ErrorGroups.Common.SSL_CONFIGURATION_ERR, e);
        }
    }

    private static SSLContext createClientSslContext(SslView sslView, TrustManagerFactory trustManagerFactory) {
        try {
            KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
            keyManagerFactory.init(KeystoreLoader.load(sslView.keyStore()), sslView.keyStore().password().toCharArray());
            SSLContext sSLContext = SSLContext.getInstance("TLS");
            sSLContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom());
            return sSLContext;
        } catch (NoSuchFileException e) {
            throw new IgniteException(ErrorGroups.Common.SSL_CONFIGURATION_ERR, String.format("File %s not found", e.getMessage()), e);
        } catch (IOException | GeneralSecurityException e2) {
            throw new IgniteException(ErrorGroups.Common.SSL_CONFIGURATION_ERR, e2);
        }
    }
}
