Class ReplicationSubscriber

java.lang.Object
org.gridgain.internal.dcr.table.ReplicationSubscriber
All Implemented Interfaces:
Flow.Subscriber<org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>>>

public class ReplicationSubscriber extends Object implements Flow.Subscriber<org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>>>
Continuous query subscriber that pipes replication events into a DataStreamer.

Backpressure flow:

  1. ContinuousQuery is requested one batch at a time.
  2. Events from the batch are enqueued in pendingItems after flush-point and schema checks, then submitted to the publisher from the same CQ thread.
  3. When the DataStreamer is slow and demand drops to zero, DcrStreamerPublisher.trySubmit(T) returns false and onNext(org.apache.ignite.table.TableRowEventBatch<java.util.Map.Entry<org.apache.ignite.table.Tuple, org.apache.ignite.table.Tuple>>) returns without requesting the next CQ batch, effectively pausing the ContinuousQuery.
  4. When the DataStreamer recovers and calls request(n), DcrStreamerPublisher fires the onDemandAvailable callback which calls requestMoreFromCq(), resuming CQ delivery. The next onNext(org.apache.ignite.table.TableRowEventBatch<java.util.Map.Entry<org.apache.ignite.table.Tuple, org.apache.ignite.table.Tuple>>) drains any buffered items before processing new ones.

Thread-safety: pendingItems and flushPointReached are accessed only from the thread that delivers onNext(org.apache.ignite.table.TableRowEventBatch<java.util.Map.Entry<org.apache.ignite.table.Tuple, org.apache.ignite.table.Tuple>>) calls (the CQ thread, or – if the CQ delivers synchronously on the caller of request(1) – the cqRequestExecutor thread). All calls to cqSubscription.request(1) are dispatched via cqRequestExecutor, so the DataStreamer thread never becomes the onNext delivery thread and can never create a concurrent onNext invocation.

  • Constructor Details

    • ReplicationSubscriber

      public ReplicationSubscriber(TableManager tableManager, SchemaSyncPolicy<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>> schemaSyncPolicy, @Nullable @Nullable Instant flushPoint, CompletableFuture<Void> finishFuture, DcrMetricSource metricSource, org.apache.ignite.internal.hlc.ClockService clockService, Executor cqRequestExecutor)
      Constructor.
      Parameters:
      tableManager - Table manager.
      schemaSyncPolicy - Schema sync policy.
      flushPoint - Flush point.
      finishFuture - Finish future.
      metricSource - Metric source.
      clockService - Clock service.
      cqRequestExecutor - Executor used to dispatch cqSubscription.request(1) calls, decoupling the DataStreamer thread from CQ delivery.
  • Method Details

    • flushPoint

      public void flushPoint(Instant flushPoint)
    • onSubscribe

      public void onSubscribe(Flow.Subscription subscription)
      Specified by:
      onSubscribe in interface Flow.Subscriber<org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>>>
    • onNext

      public void onNext(org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>> sourceEventBatch)
      Specified by:
      onNext in interface Flow.Subscriber<org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>>>
    • onError

      public void onError(Throwable throwable)
      Specified by:
      onError in interface Flow.Subscriber<org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>>>
    • onComplete

      public void onComplete()
      Specified by:
      onComplete in interface Flow.Subscriber<org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>>>
    • stop

      public void stop()
      Stop replication subscriber.