java中SynchronousQueue的核心方法

本教程操作环境:windows7系统、java10版,DELL G3电脑。

1.transfer概念

进行匹配交换数据,SynchronousQueue内部使用Transferer来交换元素。

(1) 传入元素e,是生产者(put方法),

(2) 传入null,是消费者(take方法)。

2.使用场景

(1)当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列即可。

(2)如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。这种情况下,匹配等待队列的队头,出队,返回相应数据。

3.实例

//TransferStack.transfer()方法
Etransfer(Ee,booleantimed,longnanos){
SNodes=null;//constructed/reusedasneeded
//根据e是否为null决定是生产者还是消费者
intmode=(e==null)?REQUEST:DATA;
//自旋+CAS,熟悉的套路,熟悉的味道
for(;;){
//栈顶元素
SNodeh=head;
//栈顶没有元素,或者栈顶元素跟当前元素是一个模式的
//也就是都是生产者节点或者都是消费者节点
if(h==null||h.mode==mode){//emptyorsame-mode
//如果有超时而且已到期
if(timed&&nanos<=0){//can'twait
//如果头节点不为空且是取消状态
if(h!=null&&h.isCancelled())
//就把头节点弹出,并进入下一次循环
casHead(h,h.next);//popcancellednode
else
//否则,直接返回null(超时返回null)
returnnull;
}elseif(casHead(h,s=snode(s,e,h,mode))){
//入栈成功(因为是模式相同的,所以只能入栈)
//调用awaitFulfill()方法自旋+阻塞当前入栈的线程并等待被匹配到
SNodem=awaitFulfill(s,timed,nanos);
//如果m等于s,说明取消了,那么就把它清除掉,并返回null
if(m==s){//waitwascancelled
clean(s);
//被取消了返回null
returnnull;
}

//到这里说明匹配到元素了
//因为从awaitFulfill()里面出来要不被取消了要不就匹配到了

//如果头节点不为空,并且头节点的下一个节点是s
//就把头节点换成s的下一个节点
//也就是把h和s都弹出了
//也就是把栈顶两个元素都弹出了
if((h=head)!=null&&h.next==s)
casHead(h,s.next);//helps'sfulfiller
//根据当前节点的模式判断返回m还是s中的值
return(E)((mode==REQUEST)?m.item:s.item);
}
}elseif(!isFulfilling(h.mode)){//trytofulfill
//到这里说明头节点和当前节点模式不一样
//如果头节点不是正在撮合中

//如果头节点已经取消了,就把它弹出栈
if(h.isCancelled())//alreadycancelled
casHead(h,h.next);//popandretry
elseif(casHead(h,s=snode(s,e,h,FULFILLING|mode))){
//头节点没有在撮合中,就让当前节点先入队,再让他们尝试匹配
//且s成为了新的头节点,它的状态是正在撮合中
for(;;){//loopuntilmatchedorwaitersdisappear
SNodem=s.next;//miss'smatch
//如果m为null,说明除了s节点外的节点都被其它线程先一步撮合掉了
//就清空栈并跳出内部循环,到外部循环再重新入栈判断
if(m==null){//allwaitersaregone
casHead(s,null);//popfulfillnode
s=null;//usenewnodenexttime
break;//restartmainloop
}
SNodemn=m.next;
//如果m和s尝试撮合成功,就弹出栈顶的两个元素m和s
if(m.tryMatch(s)){
casHead(s,mn);//popbothsandm
//返回撮合结果
return(E)((mode==REQUEST)?m.item:s.item);
}else//lostmatch
//尝试撮合失败,说明m已经先一步被其它线程撮合了
//就协助清除它
s.casNext(m,mn);//helpunlink
}
}
}else{//helpafulfiller
//到这里说明当前节点和头节点模式不一样
//且头节点是正在撮合中

SNodem=h.next;//mish'smatch
if(m==null)//waiterisgone
//如果m为null,说明m已经被其它线程先一步撮合了
casHead(h,null);//popfulfillingnode
else{
SNodemn=m.next;
//协助匹配,如果m和s尝试撮合成功,就弹出栈顶的两个元素m和s
if(m.tryMatch(h))//helpmatch
//将栈顶的两个元素弹出后,再让s重新入栈
casHead(h,mn);//popbothhandm
else//lostmatch
//尝试撮合失败,说明m已经先一步被其它线程撮合了
//就协助清除它
h.casNext(m,mn);//helpunlink
}
}
}
}

//三个参数:需要等待的节点,是否需要超时,超时时间
SNodeawaitFulfill(SNodes,booleantimed,longnanos){
//到期时间
finallongdeadline=timed?System.nanoTime()+nanos:0L;
//当前线程
Threadw=Thread.currentThread();
//自旋次数
intspins=(shouldSpin(s)?
(timed?maxTimedSpins:maxUntimedSpins):0);
for(;;){
//当前线程中断了,尝试清除s
if(w.isInterrupted())
s.tryCancel();

//检查s是否匹配到了元素m(有可能是其它线程的m匹配到当前线程的s)
SNodem=s.match;
//如果匹配到了,直接返回m
if(m!=null)
returnm;

//如果需要超时
if(timed){
//检查超时时间如果小于0了,尝试清除s
nanos=deadline-System.nanoTime();
if(nanos<=0L){
s.tryCancel();
continue;
}
}
if(spins>0)
//如果还有自旋次数,自旋次数减一,并进入下一次自旋
spins=shouldSpin(s)?(spins-1):0;

//后面的elseif都是自旋次数没有了
elseif(s.waiter==null)
//如果s的waiter为null,把当前线程注入进去,并进入下一次自旋
s.waiter=w;//establishwaitersocanparknextiter
elseif(!timed)
//如果不允许超时,直接阻塞,并等待被其它线程唤醒,唤醒后继续自旋并查看是否匹配到了元素
LockSupport.park(this);
elseif(nanos>spinForTimeoutThreshold)
//如果允许超时且还有剩余时间,就阻塞相应时间
LockSupport.parkNanos(this,nanos);
}
}

//SNode里面的方向,调用者m是s的下一个节点
//这时候m节点的线程应该是阻塞状态的
booleantryMatch(SNodes){
//如果m还没有匹配者,就把s作为它的匹配者
if(match==null&&
UNSAFE.compareAndSwapObject(this,matchOffset,null,s)){
Threadw=waiter;
if(w!=null){//waitersneedatmostoneunpark
waiter=null;
//唤醒m中的线程,两者匹配完毕
LockSupport.unpark(w);
}
//匹配到了返回true
returntrue;
}
//可能其它线程先一步匹配了m,返回其是否是s
returnmatch==s;
}

原文来自:https://www.py.cn

© 版权声明
THE END
喜欢就支持一下吧
点赞9 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容