在解析Netty源码时,在解析NioEventLoop 创建过程中,有一段这样的代码。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
今天就围绕着加粗代码进行分析 。
首先来看children是什么?从前面的代码children = new EventExecutor[nThreads],可以看出children是EventExecutor数组。

而EventExecutor和NioEventLoop是什么关系呢?请看下图 。

接着分析chooser = chooserFactory.newChooser(children);这一行代码,chooserFactory的默认值为DefaultEventExecutorChooserFactory类,在newChooser中使用不同的策略来获取NioEventLoop。
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
// 判断数组的长度是否是2的幂次方
if (isPowerOfTwo(executors.length)) {
// 如果是2的倍数,则使用PowerOfTwoEventExecutorChooser
// 选择器策略
return new PowerOfTwoEventExecutorChooser(executors);
} else {
// 如果不是2的幂次方,则使用GenericEventExecutorChooser选择器策略
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
DefaultEventExecutorChooserFactory类中两种策略执行结果一样,都是采用轮询方式,如果数组长度是2的幂次方,则用求&的方式来计算要取数组下标的索引值,如果不是2的幂次方,则用求余的方式来计算数组下标的索引值,为什么要区分一下呢? 因为求 & 的方式比求余的方式效率更高,如果数组长度不是2的倍数,则不能使用求 & 的方式 。其实像HashMap,MpscChunkedArrayQueue源码中都用到了求&的方式,如果阅读过相关源码的小伙伴肯定比较熟悉了,但如果还有小伙伴不明白,我们可以来看一个例子,假如数组的长度为14。

当idx.getAndIncrement()的值为16和18时,和17和19时,计算出索引位置为0,1,则存在数组索引冲突,因此不符合轮询的条件。而在DefaultEventExecutorChooserFactory中使用 PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser两种策略,主要是为了提高轮询效率 。
这段代码先放一边。
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
先看后面的
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
这一段代码 。

因为NioEventLoop继承SingleThreadEventExecutor类,而SingleThreadEventExecutor类中实现了terminationFuture()方法。
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
public Future<?> terminationFuture() {
return terminationFuture;
}
从上面方法中得知,调用e.terminationFuture().addListener(terminationListener);这一行代码,实际上调用的是DefaultPromise的addListener()方法,进入addListener()方法 。
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
// 使用同步的方式,避免并发
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
光从上面两个方法,看不出什么东西,好像也看不清DefaultPromise的意图。 进入DefaultFutureListeners类中 。
final class DefaultFutureListeners {
private GenericFutureListener<? extends Future<?>>[] listeners;
private int size;
private int progressiveSize; // the number of progressive listeners
DefaultFutureListeners(
GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void add(GenericFutureListener<? extends Future<?>> l) {
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
final int size = this.size;
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
listeners[size] = l;
this.size = size + 1;
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void remove(GenericFutureListener<? extends Future<?>> l) {
final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
int size = this.size;
for (int i = 0; i < size; i ++) {
if (listeners[i] == l) {
int listenersToMove = size - i - 1;
if (listenersToMove > 0) {
System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
}
listeners[-- size] = null;
this.size = size;
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize --;
}
return;
}
}
}
public GenericFutureListener<? extends Future<?>>[] listeners() {
return listeners;
}
public int size() {
return size;
}
public int progressiveSize() {
return progressiveSize;
}
}
从DefaultFutureListeners代码中可以看出DefaultFutureListeners和GenericFutureListener的关系,在DefaultFutureListeners实例中有一个GenericFutureListener数组。从DefaultFutureListeners的构造方法中可以看出, 默认初始化GenericFutureListener数组长度为2,因此再来理解addListener0()方法的原理就很简单了。
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
如果向DefaultPromise添加一个listener,则用DefaultPromise的listener属性存储即可,如果向DefaultPromise添加两个listener,则用DefaultFutureListeners对象存储,但DefaultFutureListeners中listeners数组初始化长度为2,刚好存储添加的两个listener,如果向DefaultPromise添加的listener超过两个,则需要扩容DefaultFutureListeners的listener数组来存储了,扩容方式如上加粗代码,listener数组扩容为原来两倍,Netty这样做的原因,一方面是提升性能,另一方面为了节省存储空间吧。
既然listener已经存储好了,什么时候调用operationComplete()方法呢?接下来看DefaultPromise的setSuccess()和setFailure()方法。

当然,我们挑选setSuccess()看即可。 setFailure()方法和setSuccess()方法原理一样。
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
看源码多的小伙伴肯定一眼就知道这里又用到了AQS。
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
private volatile Object result;
设置result的值时,如果其值当前为为null或UNCANCELLABLE,是可以被设置成功的,默认情况下,result值为null。那什么情况下会为UNCANCELLABLE呢? 请看DefaultPromise中的setUncancellable()方法 。
public boolean setUncancellable() {
if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
return true;
}
Object result = this.result;
return !isDone0(result) || !isCancelled0(result);
}
将result的值设置为UNCANCELLABLE后有什么影响呢? 请看cancel()方法 。
public boolean cancel(boolean mayInterruptIfRunning) {
if (RESULT_UPDATER.get(this) == null &&
RESULT_UPDATER.compareAndSet(this, null, new CauseHolder(new CancellationException()))) {
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
当result的值必须为空时,cancel()方法才能调用成功,接着调用notifyListeners()方法,最终调用才Listener的operationComplete()方法 。如果 result的值被设置为UNCANCELLABLE后,cancel()方法将不会调用成功。 言归正传,继续看setValue0()的checkNotifyWaiters()方法 。
private synchronized boolean checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
return listeners != null;
}
如果waiters大于0,则调用notifyAll()方法。 notifyAll()方法是Object类中的方法,就是唤醒所有等待线程的意思,接下来看waiters值何时修改的呢? 请看下图。

在DefaultPromise类中有两个方法,incWaiters()和decWaiters()方法,这两个方法对waiters值做了修改,何时调用waiters的值呢?能举个例子不? 当 ChannelFuture cf = bootstrap.bind(9000).sync(); 的sync()方法调用时。

也就是DefaultPromise的sync()方法被调用时。 waiters的值会++,同时会触发当前线程进入wait()。
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
checkDeadLock();
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
因此在 setValue0() 方法的notifyListeners调用之前,先要唤醒正在等待的线程,接着调用notifyListeners()方法 。接下来进入notifyListeners()方法 。
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
大家可能会想,上述这段代码什么意思?先来看executor()方法。
protected EventExecutor executor() {
return executor;
}
executor的值为GlobalEventExecutor。

因此inEventLoop()方法实际上是调用GlobalEventExecutor的inEventLoop()方法 。
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
判断的依据就是thread是否是当前线程,那GlobalEventExecutor中的thread又在什么时候初始化的呢? 看DefaultPromise的safeExecute()方法 。
final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
private final AtomicBoolean started = new AtomicBoolean();
private static void safeExecute(EventExecutor executor, Runnable task) {
try {
executor.execute(task);
} catch (Throwable t) {
rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
}
}
因为知道executor为GlobalEventExecutor,因此这里的execute方法实际为GlobalEventExecutor中的方法。
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
addTask(task);
if (!inEventLoop()) {
startThread();
}
}
private void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
taskQueue.add(task);
}
private void startThread() {
// 保证同一时间,只有一个线程在执行消费任务操作
if (started.compareAndSet(false, true)) {
final Thread t = threadFactory.newThread(taskRunner);
// Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
// classloader.
// See:
// - https://github.com/netty/netty/issues/7290
// - https://bugs.openjdk.java.net/browse/JDK-7008595
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
t.setContextClassLoader(null);
return null;
}
});
// Set the thread before starting it as otherwise inEventLoop() may return false and so produce
// an assert error.
// See https://github.com/netty/netty/issues/4357
thread = t;
t.start();
}
}
大家发现没有,调用GlobalEventExecutor的execute方法,并没有直接执行任务的run()方法,而是将任务添加到taskQueue队列中,接着调用threadFactory的newThread()方法,创建一个新线程,新线程中执行TaskRunner任务,在TaskRunner的run()方法中,从队列中取出任务,并执行其run()方法 ,看 TaskRunner的源码 。
final class TaskRunner implements Runnable {
@Override
public void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
try {
// 执行任务的run()方法
task.run();
} catch (Throwable t) {
logger.warn("Unexpected exception from the global event executor: ", t);
}
if (task != quietPeriodTask) {
continue;
}
}
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
// Terminate if there is no task in the queue (except the noop task).
if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
// Mark the current thread as stopped.
// The following CAS must always success and must be uncontended,
// because only one thread should be running at the same time.
boolean stopped = started.compareAndSet(true, false);
assert stopped;
// Check if there are pending entries added by execute() or schedule*() while we do CAS above.
if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
// A) No new task was added and thus there's nothing to handle
// -> safe to terminate because there's nothing left to do
// B) A new thread started and handled all the new tasks.
// -> safe to terminate the new thread will take care the rest
break;
}
// There are pending tasks added again.
if (!started.compareAndSet(false, true)) {
// startThread() started a new thread and set 'started' to true.
// -> terminate this thread so that the new thread reads from taskQueue exclusively.
break;
}
// New tasks were added, but this worker was faster to set 'started' to true.
// i.e. a new worker thread was not started by startThread().
// -> keep this thread alive to handle the newly added entries.
}
}
}
}
上面这段代码还是令人费解的,先来看takeTask()方法 。
public Runnable takeTask() {
BlockingQueue<Runnable> taskQueue = this.taskQueue;
for (;;) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
Runnable task = null;
try {
task = taskQueue.take();
} catch (InterruptedException e) {
// Ignore
}
return task;
} else {
// 第一种情况,如果ScheduledFutureTask创建到代码执行到这一行为止,时间没有超过deadlineNanos,
// 则delayNanos = deadlineNanos - (currentTime - createTime)
// 第二种情况,如果ScheduledFutureTask是被移除了,并重新加入到scheduledTaskQueue中
// 则 delayNanos = deadlineNanos - (currentTime - 加入到scheduledTaskQueue中的时间)
long delayNanos = scheduledTask.delayNanos();
Runnable task;
if (delayNanos > 0) {
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// Waken up.
return null;
}
} else {
task = taskQueue.poll();
}
if (task == null) {
fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}
if (task != null) {
return task;
}
}
}
}
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (scheduledTaskQueue == null) {
return null;
}
return scheduledTaskQueue.peek();
}
上面 long delayNanos = scheduledTask.delayNanos();这一行代码,先看delayNanos()的计算规则。
ScheduledFutureTask{
public long delayNanos() {
return Math.max(0, deadlineNanos() - nanoTime());
}
public long deadlineNanos() {
return deadlineNanos;
}
// ScheduledFutureTask实例创建时间
private static final long START_TIME = System.nanoTime();
// 当前时间减去创建时间
public static long nanoTime() {
return System.nanoTime() - START_TIME;
}
}
那scheduledTask的值是何时初始化的呢?


从上图中得知, deadlineNanos的值来源于ScheduledFutureTask的deadlineNanos()方法,且在GlobalEventExecutor中传入的值为1秒。 而deadlineNanos的值与ScheduledFutureTask START_TIME静态变量初始化,以及ScheduledFutureTask实例化时间有关。 他们之间的关系为。

假如SCHEDULE_QUIET_PERIOD_INTERVAL 的值为1秒,如果ScheduledFutureTask的START_TIME变量初始化到ScheduledFutureTask实例化花了0.5秒 ,则deadlineNanos的值为1.5秒。

其实 netty 这么做原因就是要保证 delayNanos 有充足的1秒时间,排除掉ScheduledFutureTask初始化的时间,可能是一种更加精准的考虑吧。 在addTask() 方法中将任务加入到taskQueue中。

根据上图第一种情况当执行到long delayNanos = scheduledTask.delayNanos(); 这一行代码的时间小于deadlineNanos时,和第二种情况,当执行到long delayNanos = scheduledTask.delayNanos(); 大于deadlineNanos时。

对应执行代码行如上图所示 。 Netty这么做的目的可能是。

addTask()方法,将任务添加到taskQueue队列中,和takeTask()方法从队列中获取任务,两者之间是异步的。 可能会存在taskQueue.poll()方法比taskQueue.add()方法先执行,因此这里设置一个deadlineNanos参数,来等待任务吧。
接下来看fetchFromScheduledTaskQueue()这个方法 。
public void fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
taskQueue.add(scheduledTask);
scheduledTask = pollScheduledTask(nanoTime);
}
}
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
// 如果超过了deadlineNanos时间还没有从taskQueue队列中获取任务
// 则将scheduledTask任务添加到taskQueue中
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}
如果taskQueue中已经没有元素,则将scheduledTaskQueue队列中的元素添加到taskQueue队列中。

上述 pollScheduledTask()方法有点像优先级队列,因为peek()方法取队头元素,但元素并没有弹出队列,如果时间还未达到,是不能从优先级队列中取出元素的,因此在pollScheduledTask()方法中,加了一行scheduledTask.deadlineNanos() <= nanoTime作为条件,如果时间还未达到,返回的是空,只有时间达到了,才能从scheduledTaskQueue中取到元素。
在fetchFromScheduledTaskQueue()方法中写了一个while()循环,只要scheduledTaskQueue中有元素满足条件,则都加到taskQueue队列中,请看下面例子。

自己写了一个MyGlobalEventExecutor继承GlobalEventExecutor,当然在netty源码中GlobalEventExecutor是final类型的,为了方便测试,我修改了netty源码相关内容,重新编译了netty源码,因此在我的代码中是可以实现的。 在GlobalEventExecutor的构造函数中, 向scheduledTaskQueue中添加了一个quietPeriodTaskNew任务。在GlobalEventExecutor构造函数中也添加了一个quietPeriodTask任务。因此scheduledTaskQueue中已经有两个ScheduledFutureTask任务。

测试例子
public static void main(String[] args) throws Exception {
GlobalEventExecutor INSTANCE = new MyGlobalEventExecutor();
Promise<String> promise = new DefaultPromise<>(INSTANCE);
promise.addListener(new GenericFutureListener<Future<? super String>>() {
@Override
public void operationComplete(Future<? super String> future) throws Exception {
Object s = future.get();
System.out.println(new Date() + "listner1---promise的future返回值:" + s);
}
});
promise.setSuccess(Thread.currentThread().getName() + " promise set ");
}
打断点查看循环情况

我觉得fetchFromScheduledTaskQueue() 方法的while()循环应该是考虑到这种情况吧,如果获取到正常任务,则调用其run()方法,最终调用的就是notifyListenersNow()方法。

关于notifyListenersNow()方法的实现逻辑,后面来分析。先分析,如果取到的任务是ScheduledFutureTask,其run()方法的实现逻辑。
public void run() {
assert executor().inEventLoop();
try {
if (periodNanos == 0) {
if (setUncancellableInternal()) {
V result = task.call();
setSuccessInternal(result);
}
} else {
// check if is done as it may was cancelled
if (!isCancelled()) {
task.call();
if (!executor().isShutdown()) {
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = nanoTime() - p;
}
if (!isCancelled()) {
// scheduledTaskQueue can never be null as we lazy init it before submit the task!
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
assert scheduledTaskQueue != null;
scheduledTaskQueue.add(this);
}
}
}
}
} catch (Throwable cause) {
setFailureInternal(cause);
}
}
正常情况下,executor()并没有被关闭也没有被取消,因此isCancelled() && executor().isShutdown() 都为false,则只和periodNanos相关了,如果periodNanos = 0 ,则ScheduledFutureTask将从scheduledTaskQueue中移除掉,如果 periodNanos 不等于0 , 则重新计算 deadlineNanos 的值,并将原ScheduledFutureTask从taskQueue中移动到scheduledTaskQueue中。 当然deadlineNanos的计算也分两种情况,
第一种情况,当periodNanos 大于0 ,假设为5秒,ScheduledFutureTask 初始化开始时间为 18:00:00,初始化耗时为1秒,ScheduledFutureTask对象创建时间为18:00:01秒,delay 时间为1秒(也就是GlobalEventExecutor 的 SCHEDULE_QUIET_PERIOD_INTERVAL参数默认为1 秒),则deadlineNanos = 2 秒,如果periodNanos为5秒,则第一次调用ScheduledFutureTask的run方法后,deadlineNanos = 7 秒,如果在18:00:07秒之前调用GlobalEventExecutor的takeTask()方法 。 则会走下图中的1 的情况,如果在18:00:07秒之后调用takeTask()方法,则会走下图中2的情况。
takeTask方法图

第二种情况,当periodNanos 小于0 ,假设为-5秒,ScheduledFutureTask 初始化开始时间为 18:00:00,初始化耗时为1秒,ScheduledFutureTask对象创建时间为18:00:01秒,delay 时间为1秒(也就是GlobalEventExecutor 的 SCHEDULE_QUIET_PERIOD_INTERVAL参数默认为1 秒),则deadlineNanos = 2 秒,如果执行到下图中的

deadlineNanos = nanoTime() - p;这一行代码的时间为18:00:08秒,则deadlineNanos = 8 + 5 = 13,也就是说在18:00:13秒之前,会执行takeTask方法图的第一种情况,否则会执行takeTask方法图第二种情况。 这可能是代码上的含义,从字面意义上来说呢?periodNanos不为0的情况,如果当前线程从队列中取不到任务,当前线程则会退出循环,当其他线程向taskQueue中添加任务,则又会调用TaskRunner的run()方法,此时又会调用taskTask()方法从taskQueue中取元素,如果从队列中获取元素的时间小于deadlineNanos,则会调用调用taskTask()方法的这一行代码task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); 方法来获取队列中的任务,如果大于deadlineNanos,则会调用taskTask()方法的这一行代码task = taskQueue.poll();来获取队列中的任务 。
第三种情况,如果periodNanos等于0时,会出现什么情况呢?

为什么必须将父类中构造函数GlobalEventExecutor添加的scheduledTaskQueue().add(quietPeriodTask);这一行代码移除掉呢?如果不移除掉。

当scheduledTaskQueue中只有我们手动添加的quietPeriodTaskNew时,而quietPeriodTaskNew的periodNanos为0。

为什么呢? 因为在scheduledTask的run()方法中,走了下面这一行代码。

并没有将ScheduledFutureTask添加到scheduledTaskQueue中,则peekScheduledTask()方法中获取到的scheduledTaskQueue队列中的任务为空,因此进入一直等待。 要测试效果也很容易。

分析完safeExecute()方法后,再来分析notifyListeners()方法 。

在notifyListeners()方法中,上面加红框代码,是什么意思呢? 依然还是来看一个例子。

在GenericFutureListener的operationComplete()方法中,又加入新的Listener,此时新加入的Listener的operationComplete方法依然被调用。

InternalThreadLocalMap在之前的博客 Netty源码性能分析 - ThreadLocal PK FastThreadLocal 分析过,也可以直接将其当成ThreadLocal来看,但有一点还是在疑问? 想了很久没有想明白 。
public class TestPromise2 {
public static void main(String[] args) throws Exception {
NioEventLoopGroup loopGroup = new NioEventLoopGroup();
EventLoop next = loopGroup.next();
Promise<String> promise = (Promise) next.terminationFuture();
promise.addListener(new GenericFutureListener<Future<? super String>>() {
@Override
public void operationComplete(Future<? super String> future) throws Exception {
Object s = future.get();
System.out.println(new Date() + "listner1---promise的future返回值:" + s);
((DefaultPromise) future).addListener(new GenericFutureListener<Future<? super String>>() {
@Override
public void operationComplete(Future<? super String> future) throws Exception {
Object s = future.get();
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
System.out.println(new Date() + "listner1 inner ---promise的future返回值:" + s + ", stackDepth = " + stackDepth);
((DefaultPromise) future).addListener(new GenericFutureListener<Future<? super String>>() {
@Override
public void operationComplete(Future<? super String> future) throws Exception {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
Object s = future.get();
System.out.println(new Date() + "listner1 inner inner ---promise的future返回值:" + s + ", stackDepth ="+stackDepth);
}
});
}
});
}
});
promise.setSuccess("promise set ");//设置返回结果 并且通知所有Listeners执行回调方法operationComplete方法
}
}

从上例子中,我嵌套了两层,但是threadLocals.futureListenerStackDepth()的值始终是0,并没有随着嵌套的层数增加,而增加。为什么源码中stackDepth < MAX_LISTENER_STACK_DEPTH) 要做这个限制呢?

为什么打印两次stackDepth 的值都是0,感兴趣的小伙伴可以去打断点试试 。 还是解释一下吧。


因此在operationComplete()方法的内部调用addListener()并没有我要想像中的,继续调用notifyListenersNow()中的notifyListeners0()方法,而是发现notifyingListeners为true,则直接返回了。 相应的futureListenerStackDepth值也被设置为原来的值, 所以嵌套调用的逻辑就是







我弄了很多的图片,主要想通过打断点的过程来分析notifyListenersNow()方法的执行过程,notifyListenersNow()这个方法看上去简单,但包含的情况还是很多的,因此有兴趣的小伙伴,可以自己写例子去分析为好 。 notifyListener0()方法的内部就是直接调用回调方法operationComplete()了。
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
总结
关于DefaultPromise 源码解析又告一段落了,在分析DefaultPromise 源码过程中,心情也是一波三折, 有些东西,感觉自己懂了,但是一测试,发现又不是那么回事,可能还是测试用例不全面,随着Netty源码的分析深入,我相信这些问题都会得到解决,这篇博客中也有一些疑问没有解决,可能在后面再遇到类似问题,再来补全博客了吧。 下一篇博客见。
源码地址:
https://github.com/quyixiao/test_netty.git
https://gitee.com/quyixiao/netty-netty-4.1.38.Final.git


















