package org.gridgain.grid.kernal.processors.dr;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.StringTokenizer;
import java.util.UUID;
import org.gridgain.grid.GridException;
import org.gridgain.grid.GridMessageListener;
import org.gridgain.grid.GridNode;
import org.gridgain.grid.dr.GridDrReceiveHubConfiguration;
import org.gridgain.grid.dr.GridDrSendHubConfiguration;
import org.gridgain.grid.dr.GridDrSendRemoteConfiguration;
import org.gridgain.grid.kernal.GridKernalContext;
import org.gridgain.grid.kernal.GridNodeAttributes;
import org.gridgain.grid.kernal.managers.communication.GridIoPolicy;
import org.gridgain.grid.kernal.processors.GridProcessorAdapter;
import org.gridgain.grid.kernal.processors.cache.dr.GridCacheDrHandler;
import org.gridgain.grid.kernal.processors.dr.messages.internal.GridDrInternalRequest;
import org.gridgain.grid.kernal.processors.dr.messages.internal.GridDrInternalRequestEntry;
import org.gridgain.grid.kernal.processors.dr.messages.internal.GridDrInternalResponse;
import org.gridgain.grid.lang.GridPredicate;
import org.gridgain.grid.lang.utils.GridConcurrentHashSet;
import org.gridgain.grid.typedef.F;
import org.gridgain.grid.typedef.X;
import org.gridgain.grid.typedef.internal.CU;
import org.gridgain.grid.typedef.internal.U;
import org.gridgain.grid.util.GridBusyLock;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/gridgain/grid/kernal/processors/dr/GridDrProcessor.class */
public class GridDrProcessor<K, V> extends GridProcessorAdapter {
    private final GridBusyLock busyLock;
    private final Collection<String> registeredCaches;
    private GridDrReceiveHub<K, V> rcvHub;
    private GridDrSendHub<K, V> sndHub;
    static final /* synthetic */ boolean $assertionsDisabled;

    public GridDrProcessor(GridKernalContext gridKernalContext) {
        super(gridKernalContext);
        this.busyLock = new GridBusyLock();
        this.registeredCaches = new GridConcurrentHashSet();
    }

    @Override // org.gridgain.grid.kernal.processors.GridProcessorAdapter, org.gridgain.grid.kernal.GridComponent
    public void start() throws GridException {
        byte dataCenterId = this.ctx.config().getDataCenterId();
        if (!$assertionsDisabled && dataCenterId < 0) {
            throw new AssertionError();
        }
        if (this.ctx.config().getReplicationReceiveHubConfiguration() != null) {
            if (dataCenterId == 0) {
                throw new GridException("Data center ID should be positive if receive hub is configured [dataCenterId=0]");
            }
            validateReceiveHubConfig(this.ctx.config().getReplicationReceiveHubConfiguration());
            this.rcvHub = new GridDrReceiveHub<>(this.ctx);
            this.rcvHub.start();
        }
        if (this.ctx.config().getReplicationSendHubConfiguration() != null) {
            if (dataCenterId == 0) {
                throw new GridException("Data center ID should be positive if send hub is configured [dataCenterId=0]");
            }
            validateSendHubConfig(this.ctx.config().getReplicationSendHubConfiguration());
            this.sndHub = new GridDrSendHub<>(this.ctx);
            this.sndHub.start();
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Started replication processor.");
        }
    }

    @Override // org.gridgain.grid.kernal.processors.GridProcessorAdapter, org.gridgain.grid.kernal.GridComponent
    public void onKernalStart() throws GridException {
        if (this.ctx.config().getReplicationSendHubConfiguration() != null) {
            Iterator<GridNode> it = this.ctx.discovery().remoteNodes().iterator();
            while (it.hasNext()) {
                checkSendHub(it.next());
            }
        }
    }

    @Override // org.gridgain.grid.kernal.processors.GridProcessorAdapter, org.gridgain.grid.kernal.GridComponent
    public void onKernalStop(boolean z, boolean z2) {
        this.busyLock.block();
        this.ctx.io().removeMessageListener(CU.replicationTopicSend());
        Iterator<String> it = this.registeredCaches.iterator();
        while (it.hasNext()) {
            this.ctx.io().removeMessageListener(CU.replicationTopicReceive(it.next()));
        }
        if (this.ctx.config().getReplicationReceiveHubConfiguration() != null && this.rcvHub != null) {
            this.rcvHub.stop(z, z2);
        }
        if (this.ctx.config().getReplicationSendHubConfiguration() != null && this.sndHub != null) {
            this.sndHub.stop(z, z2);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Stopped replication processor.");
        }
    }

    public void registerSendHub(final GridDrSendHub gridDrSendHub) {
        this.ctx.io().addMessageListener(CU.replicationTopicSend(), new GridMessageListener() { // from class: org.gridgain.grid.kernal.processors.dr.GridDrProcessor.1
            static final /* synthetic */ boolean $assertionsDisabled;

            @Override // org.gridgain.grid.GridMessageListener
            public void onMessage(UUID uuid, Object obj) {
                if (!(obj instanceof GridDrInternalRequest)) {
                    if (!$assertionsDisabled) {
                        throw new AssertionError("Unexpected message type.");
                    }
                } else {
                    if (GridDrProcessor.this.log.isDebugEnabled()) {
                        GridDrProcessor.this.log.debug("Received internal replication request message [sourceNodeId=" + uuid + ", msg=" + obj + ']');
                    }
                    GridDrInternalRequest gridDrInternalRequest = (GridDrInternalRequest) obj;
                    gridDrSendHub.onReplicationRequest(uuid, gridDrInternalRequest.id(), gridDrInternalRequest.cacheName(), gridDrInternalRequest.dataCenterIds(), gridDrInternalRequest.entries(), gridDrInternalRequest.entryCount());
                }
            }

            static {
                $assertionsDisabled = !GridDrProcessor.class.desiredAssertionStatus();
            }
        }, new GridPredicate[0]);
    }

    public void registerHandler(String str, final GridCacheDrHandler gridCacheDrHandler) {
        this.registeredCaches.add(CU.mask(str));
        this.ctx.io().addMessageListener(CU.replicationTopicReceive(str), new GridMessageListener() { // from class: org.gridgain.grid.kernal.processors.dr.GridDrProcessor.2
            static final /* synthetic */ boolean $assertionsDisabled;

            @Override // org.gridgain.grid.GridMessageListener
            public void onMessage(UUID uuid, Object obj) {
                if (!(obj instanceof GridDrInternalResponse)) {
                    if (!$assertionsDisabled) {
                        throw new AssertionError("Unexpected message type.");
                    }
                } else {
                    if (GridDrProcessor.this.log.isDebugEnabled()) {
                        GridDrProcessor.this.log.debug("Received internal replication response message [sourceNodeId=" + uuid + ", msg=" + obj + ']');
                    }
                    GridDrInternalResponse gridDrInternalResponse = (GridDrInternalResponse) obj;
                    gridCacheDrHandler.onReplicationResponse(gridDrInternalResponse.id(), uuid, gridDrInternalResponse.error());
                }
            }

            static {
                $assertionsDisabled = !GridDrProcessor.class.desiredAssertionStatus();
            }
        }, new GridPredicate[0]);
    }

    public void sendReplicationRequest(UUID uuid, long j, String str, @Nullable Collection<Byte> collection, Collection<GridDrInternalRequestEntry> collection2, int i) throws GridException {
        if (this.busyLock.enterBusy()) {
            try {
                this.ctx.io().send(uuid, CU.replicationTopicSend(), new GridDrInternalRequest(j, str, collection, collection2, i), GridIoPolicy.PUBLIC_POOL);
                this.busyLock.leaveBusy();
            } catch (Throwable th) {
                this.busyLock.leaveBusy();
                throw th;
            }
        }
    }

    public void sendReplicationResponse(UUID uuid, String str, long j, @Nullable Throwable th) {
        if (this.busyLock.enterBusy()) {
            try {
                try {
                    this.ctx.io().send(uuid, CU.replicationTopicReceive(str), new GridDrInternalResponse(j, th), GridIoPolicy.PUBLIC_POOL);
                    this.busyLock.leaveBusy();
                } catch (GridException e) {
                    U.error(this.log, "Failed to send replication response message to the node: " + uuid, e);
                    this.busyLock.leaveBusy();
                }
            } catch (Throwable th2) {
                this.busyLock.leaveBusy();
                throw th2;
            }
        }
    }

    private void validateReceiveHubConfig(GridDrReceiveHubConfiguration gridDrReceiveHubConfiguration) throws GridException {
        if (gridDrReceiveHubConfiguration.getLocalInboundHost() != null) {
            try {
                InetAddress.getByName(gridDrReceiveHubConfiguration.getLocalInboundHost());
            } catch (UnknownHostException e) {
                throw new GridException("Configuration parameter 'localHost' cannot be resolved to local address.");
            }
        }
        assertParameter(gridDrReceiveHubConfiguration.getLocalInboundPort() >= 0 || gridDrReceiveHubConfiguration.getLocalInboundPort() <= 65535, "localPort >= 0 || localPort <= 65535");
        assertParameter(gridDrReceiveHubConfiguration.getWriteTimeout() >= 0, "writeTimeout >= 0");
        assertParameter(gridDrReceiveHubConfiguration.getIdleTimeout() >= 0, "idleTimeout >= 0");
        assertParameter(gridDrReceiveHubConfiguration.getSelectorCount() > 0, "selectorCount > 0");
        assertParameter(gridDrReceiveHubConfiguration.getWorkerThreads() > 0, "workerThreads > 0");
        assertParameter(gridDrReceiveHubConfiguration.getMessageQueueLimit() >= 0, "messageQueueLimit >= 0");
        assertParameter(gridDrReceiveHubConfiguration.getFlushFrequency() > 0, "keyValueFlushFrequency > 0");
    }

    private void validateSendHubConfig(GridDrSendHubConfiguration gridDrSendHubConfiguration) throws GridException {
        assertParameter(gridDrSendHubConfiguration.getCacheNames() != null, "cacheNames cannot be null");
        assertParameter(gridDrSendHubConfiguration.getCacheNames().length > 0, "cacheNames must contain at least one entry");
        HashSet hashSet = new HashSet(gridDrSendHubConfiguration.getCacheNames().length, 1.0f);
        for (String str : gridDrSendHubConfiguration.getCacheNames()) {
            if (!hashSet.add(str)) {
                throw new GridException("Configuration parameter 'cacheNames' cannot have duplicates: " + str);
            }
        }
        assertParameter(gridDrSendHubConfiguration.getHealthCheckFrequency() > 0, "healthCheckFrequency > 0");
        assertParameter(gridDrSendHubConfiguration.getReadTimeout() >= 0, "readTimeout >= 0");
        assertParameter(gridDrSendHubConfiguration.getSystemRequestTimeout() > 0, "systemRequestTimeout > 0");
        assertParameter(gridDrSendHubConfiguration.getReconnectOnFailureTimeout() >= 0, "reconnectOnFailureTimeout >= 0");
        assertParameter(gridDrSendHubConfiguration.getMaxQueueSize() >= 0, "maxQueueSize >= 0");
        assertParameter(gridDrSendHubConfiguration.getMaxErrors() >= 0, "maxErrors >= 0");
        assertParameter(gridDrSendHubConfiguration.getMaxFailedConnectAttempts() >= 0, "maxFailedConnectAttempts >= 0");
        GridDrSendRemoteConfiguration[] remoteDataCenters = gridDrSendHubConfiguration.getRemoteDataCenters();
        assertParameter(remoteDataCenters != null, "replicas cannot be null");
        assertParameter(remoteDataCenters.length > 0, "replicas must contain at least one entry");
        HashSet hashSet2 = new HashSet(remoteDataCenters.length, 1.0f);
        for (GridDrSendRemoteConfiguration gridDrSendRemoteConfiguration : remoteDataCenters) {
            if (gridDrSendRemoteConfiguration.getDataCenterId() <= 0) {
                throw new GridException("Replica configuration parameter 'dataCenterId' should be positive.");
            }
            if (F.eq(Byte.valueOf(gridDrSendRemoteConfiguration.getDataCenterId()), Byte.valueOf(this.ctx.config().getDataCenterId()))) {
                throw new GridException("Replica configuration parameter 'dataCenterId' cannot be the same as send hub data center ID.");
            }
            if (!hashSet2.add(Byte.valueOf(gridDrSendRemoteConfiguration.getDataCenterId()))) {
                throw new GridException("Replica configuration parameter 'dataCenterId' is not unique across all replicas defined within the send hub: " + ((int) gridDrSendRemoteConfiguration.getDataCenterId()));
            }
            if (gridDrSendRemoteConfiguration.getLocalOutboundHost() != null) {
                try {
                    InetAddress.getByName(gridDrSendRemoteConfiguration.getLocalOutboundHost());
                } catch (UnknownHostException e) {
                    throw new GridException("Replica configuration parameter 'localHost' cannot be resolved to local address: " + gridDrSendRemoteConfiguration.getLocalOutboundHost());
                }
            }
            if (gridDrSendRemoteConfiguration.getReceiveHubLoadBalancingPolicy() == null) {
                throw new GridException("Replica configuration parameter 'getReceiveHubLoadBalancingPolicy' cannot be null.");
            }
            if (gridDrSendRemoteConfiguration.getReceiveHubAddresses() == null) {
                throw new GridException("Replica configuration parameter 'addresses' cannot be null.");
            }
            if (gridDrSendRemoteConfiguration.getReceiveHubAddresses().length == 0) {
                throw new GridException("Replica configuration parameter 'addresses' must have at least one network address defined.");
            }
            HashSet hashSet3 = new HashSet(gridDrSendRemoteConfiguration.getReceiveHubAddresses().length, 1.0f);
            String[] receiveHubAddresses = gridDrSendRemoteConfiguration.getReceiveHubAddresses();
            int length = receiveHubAddresses.length;
            for (int i = 0; i < length; i++) {
                String str2 = receiveHubAddresses[i];
                if (str2.endsWith(":")) {
                    str2 = str2.substring(0, str2.length() - 1);
                }
                if (str2.indexOf(58) >= 0) {
                    StringTokenizer stringTokenizer = new StringTokenizer(str2, ":");
                    if (stringTokenizer.countTokens() != 2) {
                        throw new GridException("Replica address cannot be parsed: " + str2);
                    }
                    String nextToken = stringTokenizer.nextToken();
                    String nextToken2 = stringTokenizer.nextToken();
                    try {
                        InetAddress.getByName(nextToken);
                        try {
                            int parseInt = Integer.parseInt(nextToken2);
                            if (parseInt < 0 || parseInt > 65535) {
                                throw new GridException("Replica address has invalid port: " + str2);
                            }
                        } catch (NumberFormatException e2) {
                            throw new GridException("Replica address has invalid port: " + str2);
                        }
                    } catch (UnknownHostException e3) {
                        throw new GridException("Replica address host name cannot be resolved: " + gridDrSendRemoteConfiguration.getLocalOutboundHost());
                    }
                } else {
                    try {
                        InetAddress.getByName(str2);
                    } catch (UnknownHostException e4) {
                        throw new GridException("Replica address host name cannot be resolved: " + gridDrSendRemoteConfiguration.getLocalOutboundHost());
                    }
                }
                if (!hashSet3.add(str2)) {
                    throw new GridException("Replica address is not unique within the replica: " + str2);
                }
            }
        }
    }

    private void checkSendHub(GridNode gridNode) throws GridException {
        GridDrSendHubAttributes gridDrSendHubAttributes = (GridDrSendHubAttributes) gridNode.attribute(GridNodeAttributes.ATTR_REPLICATION_SND_HUB);
        GridDrSendHubAttributes gridDrSendHubAttributes2 = (GridDrSendHubAttributes) this.ctx.discovery().localNode().attribute(GridNodeAttributes.ATTR_REPLICATION_SND_HUB);
        if (gridDrSendHubAttributes != null) {
            if (F.eqArray(gridDrSendHubAttributes2.cacheNames(), gridDrSendHubAttributes.cacheNames(), false, false)) {
                if (!gridDrSendHubAttributes2.replicasIgnore().equals(gridDrSendHubAttributes.replicasIgnore())) {
                    throw new GridException("All replication send hubs should have same ignore settings [locIgnore=" + gridDrSendHubAttributes2.replicasIgnore() + ", rmtIgnore=" + gridDrSendHubAttributes.ignoreList() + ", rmtNodeId=" + gridNode.id() + ']');
                }
                return;
            }
            HashSet hashSet = new HashSet();
            Collections.addAll(hashSet, gridDrSendHubAttributes.cacheNames());
            Collections.addAll(hashSet, gridDrSendHubAttributes2.cacheNames());
            if (hashSet.size() != gridDrSendHubAttributes.cacheNames().length + gridDrSendHubAttributes2.cacheNames().length) {
                throw new GridException("Replication send hub configuration mismatch: two send hubs share the same cache name(s), but the whole cache name lists are different [localCacheNames=" + Arrays.toString(gridDrSendHubAttributes2.cacheNames()) + ", remoteCacheNames=" + Arrays.toString(gridDrSendHubAttributes.cacheNames()) + ", remoteNode=" + gridNode.id() + ']');
            }
        }
    }

    @Override // org.gridgain.grid.kernal.processors.GridProcessorAdapter, org.gridgain.grid.kernal.GridComponent
    public void printMemoryStats() {
        X.println(">>>", new Object[0]);
        X.println(">>> Replication processor memory stats [grid=" + this.ctx.gridName() + ']', new Object[0]);
        if (this.rcvHub != null) {
            this.rcvHub.printMemoryStats();
        }
        if (this.sndHub != null) {
            this.sndHub.printMemoryStats();
        }
    }

    static {
        $assertionsDisabled = !GridDrProcessor.class.desiredAssertionStatus();
    }
}
