Package org.gridgain.internal.dcr.table
Class ReplicationSubscriber
java.lang.Object
org.gridgain.internal.dcr.table.ReplicationSubscriber
- All Implemented Interfaces:
Flow.Subscriber<org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>>>
public class ReplicationSubscriber
extends Object
implements Flow.Subscriber<org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>>>
Continuous query subscriber that pipes replication events into a DataStreamer.
Backpressure flow:
- ContinuousQuery is requested one batch at a time.
- Events from the batch are enqueued in
pendingItemsafter flush-point and schema checks, then submitted to the publisher from the same CQ thread. - When the DataStreamer is slow and demand drops to zero,
DcrStreamerPublisher.trySubmit(T)returnsfalseandonNext(org.apache.ignite.table.TableRowEventBatch<java.util.Map.Entry<org.apache.ignite.table.Tuple, org.apache.ignite.table.Tuple>>)returns without requesting the next CQ batch, effectively pausing the ContinuousQuery. - When the DataStreamer recovers and calls
request(n),DcrStreamerPublisherfires theonDemandAvailablecallback which callsrequestMoreFromCq(), resuming CQ delivery. The nextonNext(org.apache.ignite.table.TableRowEventBatch<java.util.Map.Entry<org.apache.ignite.table.Tuple, org.apache.ignite.table.Tuple>>)drains any buffered items before processing new ones.
Thread-safety: pendingItems and flushPointReached are accessed only from
the thread that delivers onNext(org.apache.ignite.table.TableRowEventBatch<java.util.Map.Entry<org.apache.ignite.table.Tuple, org.apache.ignite.table.Tuple>>) calls (the CQ thread, or – if the CQ delivers
synchronously on the caller of request(1) – the cqRequestExecutor thread).
All calls to cqSubscription.request(1) are dispatched via cqRequestExecutor,
so the DataStreamer thread never becomes the onNext delivery thread and can never
create a concurrent onNext invocation.
-
Constructor Summary
ConstructorsConstructorDescriptionReplicationSubscriber(TableManager tableManager, SchemaSyncPolicy<Map.Entry<org.apache.ignite.table.Tuple, org.apache.ignite.table.Tuple>> schemaSyncPolicy, @Nullable Instant flushPoint, CompletableFuture<Void> finishFuture, DcrMetricSource metricSource, org.apache.ignite.internal.hlc.ClockService clockService, Executor cqRequestExecutor) Constructor. -
Method Summary
Modifier and TypeMethodDescriptionvoidflushPoint(Instant flushPoint) voidvoidvoidonNext(org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple, org.apache.ignite.table.Tuple>> sourceEventBatch) voidonSubscribe(Flow.Subscription subscription) voidstop()Stop replication subscriber.
-
Constructor Details
-
ReplicationSubscriber
public ReplicationSubscriber(TableManager tableManager, SchemaSyncPolicy<Map.Entry<org.apache.ignite.table.Tuple, org.apache.ignite.table.Tuple>> schemaSyncPolicy, @Nullable @Nullable Instant flushPoint, CompletableFuture<Void> finishFuture, DcrMetricSource metricSource, org.apache.ignite.internal.hlc.ClockService clockService, Executor cqRequestExecutor) Constructor.- Parameters:
tableManager- Table manager.schemaSyncPolicy- Schema sync policy.flushPoint- Flush point.finishFuture- Finish future.metricSource- Metric source.clockService- Clock service.cqRequestExecutor- Executor used to dispatchcqSubscription.request(1)calls, decoupling the DataStreamer thread from CQ delivery.
-
-
Method Details
-
flushPoint
-
onSubscribe
- Specified by:
onSubscribein interfaceFlow.Subscriber<org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>>>
-
onNext
public void onNext(org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple, org.apache.ignite.table.Tuple>> sourceEventBatch) - Specified by:
onNextin interfaceFlow.Subscriber<org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>>>
-
onError
- Specified by:
onErrorin interfaceFlow.Subscriber<org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>>>
-
onComplete
public void onComplete()- Specified by:
onCompletein interfaceFlow.Subscriber<org.apache.ignite.table.TableRowEventBatch<Map.Entry<org.apache.ignite.table.Tuple,org.apache.ignite.table.Tuple>>>
-
stop
public void stop()Stop replication subscriber.
-