`
rainsilence
  • 浏览: 158839 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

用Java实现Promise的难点总结

阅读更多

自Promise被纳入ECMAScript6标准后,各大浏览器几乎都实现了Promise标准。

以下是ecmascript6里promise的典型用法,为了解决一个线程的任务完成后,再去执行另一个任务。

var promise = new Promise(function(resolve, reject) {
      $.ajax({
           success: function() {
                 resolve();
           },
           failure: function() {
                 reject();
           }
      })
});

promise.then(function() {
       //调用了resolve或者reject之后
});

 

Java实现Promise相比较ECMAScript6来说,有些不同。

Javascript不管怎么样都是单线程的,即使ajax在浏览器内核层面表现出多线程,但是执行到js端还是单线程的。而在java上实现promise机制则复杂的多。具体表现在:

1)即使在上一个线程执行任务后,下一个任务可以在同一个线程中执行,但是加入任务的操作不得不是同步的

Netty4源码:io.netty.util.concurrent.DefaultPromise

    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listener == null) {
            throw new NullPointerException("listener");
        }

        if (isDone()) {
            notifyLateListener(listener);
            return this;
        }
        //加入时必须同步
        synchronized (this) {
            if (!isDone()) {
                if (listeners == null) {
                    listeners = listener;
                } else {
                    if (listeners instanceof DefaultFutureListeners) {
                        ((DefaultFutureListeners) listeners).add(listener);
                    } else {
                        final GenericFutureListener<? extends Future<V>> firstListener =
                                (GenericFutureListener<? extends Future<V>>) listeners;
                        listeners = new DefaultFutureListeners(firstListener, listener);
                    }
                }
                return this;
            }
        }

        notifyLateListener(listener);
        return this;
    }

 

2)在多个平行任务(线程)结束后,再去执行一个任务的case (Promise.all)的时候,判断任务是否结束的计数操作也不得不是同步的。

Netty源码:io.netty.channel.group.DefaultChannelGroupFuture

    private final ChannelFutureListener childListener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            boolean success = future.isSuccess();
            boolean callSetDone;
            // 判断所有的task是否已经结束也必须同步
            synchronized (DefaultChannelGroupFuture.this) {
                if (success) {
                    successCount ++;
                } else {
                    failureCount ++;
                }

                callSetDone = successCount + failureCount == futures.size();
                assert successCount + failureCount <= futures.size();
            }

            if (callSetDone) {
                if (failureCount > 0) {
                    List<Map.Entry<Channel, Throwable>> failed =
                            new ArrayList<Map.Entry<Channel, Throwable>>(failureCount);
                    for (ChannelFuture f: futures.values()) {
                        if (!f.isSuccess()) {
                            failed.add(new DefaultEntry<Channel, Throwable>(f.channel(), f.cause()));
                        }
                    }
                    setFailure0(new ChannelGroupException(failed));
                } else {
                    setSuccess0();
                }
            }
        }
    };

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics