归档
使用示例
- https://github.com/zengxf/small-frame-demo/blob/master/multi-thread/reactive-test/reactor-demo/src/main/java/cn/zxf/reactor_demo/jdk/PubSubTest.java
JDK 版本
openjdk version "17" 2021-09-14
OpenJDK Runtime Environment (build 17+35-2724)
OpenJDK 64-Bit Server VM (build 17+35-2724, mixed mode, sharing)
原理
关键类
java.util.concurrent.SubmissionPublisher
public class SubmissionPublisher<T> implements Publisher<T>, AutoCloseable {
BufferedSubscription<T> clients;
final ReentrantLock lock;
volatile boolean closed;
boolean subscribed;
Thread owner;
volatile Throwable closedException;
final Executor executor;
final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
final int maxBufferCapacity;
public SubmissionPublisher() {
this(ASYNC_POOL, Flow.defaultBufferSize(), null);
}
public SubmissionPublisher(
Executor executor, int maxBufferCapacity,
BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler
) {
...
this.lock = new ReentrantLock();
this.executor = executor;
this.onNextHandler = handler;
this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
}
}
订阅
java.util.concurrent.SubmissionPublisher
public void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
ReentrantLock lock = this.lock;
int max = maxBufferCapacity;
Object[] array = new Object[max < INITIAL_CAPACITY ? max : INITIAL_CAPACITY];
BufferedSubscription<T> subscription = new BufferedSubscription<T>(
subscriber, executor, onNextHandler, array, max
);
lock.lock();
try {
if (!subscribed) {
subscribed = true;
owner = Thread.currentThread();
}
for (BufferedSubscription<T> b = clients, pred = null;;) {
if (b == null) {
Throwable ex;
subscription.onSubscribe();
if ((ex = closedException) != null)
subscription.onError(ex);
else if (closed)
subscription.onComplete();
else if (pred == null)
clients = subscription;
else
pred.next = subscription;
break;
}
BufferedSubscription<T> next = b.next;
if (b.isClosed()) {
b.next = null;
if (pred == null)
clients = next;
else
pred.next = next;
}
else if (subscriber.equals(b.subscriber)) {
b.onError(new IllegalStateException("Duplicate subscribe"));
break;
}
else
pred = b;
b = next;
}
} finally {
lock.unlock();
}
}
java.util.concurrent.SubmissionPublisher.BufferedSubscription
static final class BufferedSubscription<T> implements Subscription, ForkJoinPool.ManagedBlocker {
long timeout;
int head;
int tail;
final int maxCapacity;
volatile int ctl;
Object[] array;
final Subscriber<? super T> subscriber;
final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
Executor executor;
BufferedSubscription<T> next;
BufferedSubscription(
Subscriber<? super T> subscriber,
Executor executor,
BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler,
Object[] array,
int maxBufferCapacity
) {
this.subscriber = subscriber;
this.executor = executor;
this.onNextHandler = onNextHandler;
this.array = array;
this.maxCapacity = maxBufferCapacity;
}
}
提交数据
java.util.concurrent.SubmissionPublisher
public int submit(T item) {
return doOffer(item, Long.MAX_VALUE, null);
}
private int doOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
if (item == null) throw new NullPointerException();
int lag = 0;
boolean complete, unowned;
ReentrantLock lock = this.lock;
lock.lock();
try {
Thread t = Thread.currentThread(), o;
BufferedSubscription<T> b = clients;
if ((unowned = ((o = owner) != t)) && o != null)
owner = null;
if (b == null)
complete = closed;
else {
complete = false;
boolean cleanMe = false;
BufferedSubscription<T> retries = null, rtail = null, next;
do {
next = b.next;
int stat = b.offer(item, unowned);
...
} while ((b = next) != null);
...
}
} finally {
lock.unlock();
}
...
}
java.util.concurrent.SubmissionPublisher.BufferedSubscription
final int offer(T item, boolean unowned) {
Object[] a;
int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
int t = tail, i = t & (cap - 1), n = t + 1 - head;
if (cap > 0) {
boolean added;
if (n >= cap && cap < maxCapacity)
added = growAndOffer(item, a, t);
else if (n >= cap || unowned)
added = QA.compareAndSet(a, i, null, item);
else {
QA.setRelease(a, i, item);
added = true;
}
if (added) {
tail = t + 1;
stat = n;
}
}
return startOnOffer(stat);
}
final int startOnOffer(int stat) {
int c;
if (((c = ctl) & (REQS | ACTIVE)) == REQS &&
((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0)
tryStart();
...
return stat;
}
final void tryStart() {
try {
Executor e;
ConsumerTask<T> task = new ConsumerTask<T>(this);
if ((e = executor) != null)
e.execute(task);
} ...
}
final void consume() {
Subscriber<? super T> s;
if ((s = subscriber) != null) {
subscribeOnOpen(s);
long d = demand;
for (int h = head, t = tail;;) {
int c, taken; boolean empty;
if (((c = ctl) & ERROR) != 0) {
closeOnError(s, null);
break;
}
else if ((taken = takeItems(s, d, h)) > 0) {
head = h += taken;
d = subtractDemand(taken);
}
...
else if (t == (t = tail)) {
if ((empty = (t == h)) && (c & COMPLETE) != 0) {
closeOnComplete(s);
break;
}
...
}
}
}
}
final int takeItems(Subscriber<? super T> s, long d, int h) {
Object[] a;
int k = 0, cap;
if ((a = array) != null && (cap = a.length) > 0) {
int m = cap - 1, b = (m >>> 3) + 1;
int n = (d < (long)b) ? (int)d : b;
for (; k < n; ++h, ++k) {
Object x = QA.getAndSet(a, h & m, null);
...
else if (!consumeNext(s, x))
break;
}
}
return k;
}
java.util.concurrent.SubmissionPublisher.ConsumerTask
static final class ConsumerTask<T> extends ForkJoinTask<Void>
implements Runnable, CompletableFuture.AsynchronousCompletionTask
{
final BufferedSubscription<T> consumer;
ConsumerTask(BufferedSubscription<T> consumer) {
this.consumer = consumer;
}
...
public final void run() { consumer.consume(); }
}
关闭
背压
- 看代码或调试时,没发现
publisher
暂停的代码,可用 JConsole 查看线程栈
...
java.base@17/jdk.internal.misc.Unsafe.park(Native Method)
... .locks.LockSupport.park(LockSupport.java:211)
... .SubmissionPublisher$BufferedSubscription.block(SubmissionPublisher.java:1495)
... .ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3463)
... .ForkJoinPool.managedBlock(ForkJoinPool.java:3434)
... .SubmissionPublisher$BufferedSubscription.awaitSpace(SubmissionPublisher.java:1462)
...
java.util.concurrent.SubmissionPublisher
private int doOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
int lag = 0;
...
try {
Thread t = Thread.currentThread(), o;
if ((unowned = ((o = owner) != t)) && o != null)
...
if (retries != null || cleanMe)
lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
} ...
...
}
private int retryOffer(
T item, long nanos,
BiPredicate<Subscriber<? super T>, ? super T> onDrop,
BufferedSubscription<T> retries, int lag,
boolean cleanMe
) {
for (BufferedSubscription<T> r = retries; r != null;) {
BufferedSubscription<T> nextRetry = r.nextRetry;
r.nextRetry = null;
if (nanos > 0L)
r.awaitSpace(nanos);
...
}
...
return lag;
}
java.util.concurrent.SubmissionPublisher.BufferedSubscription
static final class BufferedSubscription<T> implements Subscription, ForkJoinPool.ManagedBlocker {
final void awaitSpace(long nanos) {
if (!isReleasable()) {
ForkJoinPool.helpAsyncBlocker(executor, this);
if (!isReleasable()) {
timeout = nanos;
try {
ForkJoinPool.managedBlock(this);
} ...
}
}
}
@Override
public final boolean block() {
...
while (!isReleasable()) {
...
else if (waiter == null)
waiter = Thread.currentThread();
...
else
LockSupport.park(this);
}
...
}
final int takeItems(Subscriber<? super T> s, long d, int h) {
...
if (waiting != 0)
signalWaiter();
...
}
final void signalWaiter() {
Thread w;
waiting = 0;
if ((w = waiter) != null)
LockSupport.unpark(w);
}
}