本教程操作环境:windows7系统、java10版,DELL G3电脑。
1.transfer概念
进行匹配交换数据,SynchronousQueue内部使用Transferer来交换元素。
(1) 传入元素e,是生产者(put方法),
(2) 传入null,是消费者(take方法)。
2.使用场景
(1)当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列即可。
(2)如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。这种情况下,匹配等待队列的队头,出队,返回相应数据。
3.实例
//TransferStack.transfer()方法 Etransfer(Ee,booleantimed,longnanos){ SNodes=null;//constructed/reusedasneeded //根据e是否为null决定是生产者还是消费者 intmode=(e==null)?REQUEST:DATA; //自旋+CAS,熟悉的套路,熟悉的味道 for(;;){ //栈顶元素 SNodeh=head; //栈顶没有元素,或者栈顶元素跟当前元素是一个模式的 //也就是都是生产者节点或者都是消费者节点 if(h==null||h.mode==mode){//emptyorsame-mode //如果有超时而且已到期 if(timed&&nanos<=0){//can'twait //如果头节点不为空且是取消状态 if(h!=null&&h.isCancelled()) //就把头节点弹出,并进入下一次循环 casHead(h,h.next);//popcancellednode else //否则,直接返回null(超时返回null) returnnull; }elseif(casHead(h,s=snode(s,e,h,mode))){ //入栈成功(因为是模式相同的,所以只能入栈) //调用awaitFulfill()方法自旋+阻塞当前入栈的线程并等待被匹配到 SNodem=awaitFulfill(s,timed,nanos); //如果m等于s,说明取消了,那么就把它清除掉,并返回null if(m==s){//waitwascancelled clean(s); //被取消了返回null returnnull; } //到这里说明匹配到元素了 //因为从awaitFulfill()里面出来要不被取消了要不就匹配到了 //如果头节点不为空,并且头节点的下一个节点是s //就把头节点换成s的下一个节点 //也就是把h和s都弹出了 //也就是把栈顶两个元素都弹出了 if((h=head)!=null&&h.next==s) casHead(h,s.next);//helps'sfulfiller //根据当前节点的模式判断返回m还是s中的值 return(E)((mode==REQUEST)?m.item:s.item); } }elseif(!isFulfilling(h.mode)){//trytofulfill //到这里说明头节点和当前节点模式不一样 //如果头节点不是正在撮合中 //如果头节点已经取消了,就把它弹出栈 if(h.isCancelled())//alreadycancelled casHead(h,h.next);//popandretry elseif(casHead(h,s=snode(s,e,h,FULFILLING|mode))){ //头节点没有在撮合中,就让当前节点先入队,再让他们尝试匹配 //且s成为了新的头节点,它的状态是正在撮合中 for(;;){//loopuntilmatchedorwaitersdisappear SNodem=s.next;//miss'smatch //如果m为null,说明除了s节点外的节点都被其它线程先一步撮合掉了 //就清空栈并跳出内部循环,到外部循环再重新入栈判断 if(m==null){//allwaitersaregone casHead(s,null);//popfulfillnode s=null;//usenewnodenexttime break;//restartmainloop } SNodemn=m.next; //如果m和s尝试撮合成功,就弹出栈顶的两个元素m和s if(m.tryMatch(s)){ casHead(s,mn);//popbothsandm //返回撮合结果 return(E)((mode==REQUEST)?m.item:s.item); }else//lostmatch //尝试撮合失败,说明m已经先一步被其它线程撮合了 //就协助清除它 s.casNext(m,mn);//helpunlink } } }else{//helpafulfiller //到这里说明当前节点和头节点模式不一样 //且头节点是正在撮合中 SNodem=h.next;//mish'smatch if(m==null)//waiterisgone //如果m为null,说明m已经被其它线程先一步撮合了 casHead(h,null);//popfulfillingnode else{ SNodemn=m.next; //协助匹配,如果m和s尝试撮合成功,就弹出栈顶的两个元素m和s if(m.tryMatch(h))//helpmatch //将栈顶的两个元素弹出后,再让s重新入栈 casHead(h,mn);//popbothhandm else//lostmatch //尝试撮合失败,说明m已经先一步被其它线程撮合了 //就协助清除它 h.casNext(m,mn);//helpunlink } } } } //三个参数:需要等待的节点,是否需要超时,超时时间 SNodeawaitFulfill(SNodes,booleantimed,longnanos){ //到期时间 finallongdeadline=timed?System.nanoTime()+nanos:0L; //当前线程 Threadw=Thread.currentThread(); //自旋次数 intspins=(shouldSpin(s)? (timed?maxTimedSpins:maxUntimedSpins):0); for(;;){ //当前线程中断了,尝试清除s if(w.isInterrupted()) s.tryCancel(); //检查s是否匹配到了元素m(有可能是其它线程的m匹配到当前线程的s) SNodem=s.match; //如果匹配到了,直接返回m if(m!=null) returnm; //如果需要超时 if(timed){ //检查超时时间如果小于0了,尝试清除s nanos=deadline-System.nanoTime(); if(nanos<=0L){ s.tryCancel(); continue; } } if(spins>0) //如果还有自旋次数,自旋次数减一,并进入下一次自旋 spins=shouldSpin(s)?(spins-1):0; //后面的elseif都是自旋次数没有了 elseif(s.waiter==null) //如果s的waiter为null,把当前线程注入进去,并进入下一次自旋 s.waiter=w;//establishwaitersocanparknextiter elseif(!timed) //如果不允许超时,直接阻塞,并等待被其它线程唤醒,唤醒后继续自旋并查看是否匹配到了元素 LockSupport.park(this); elseif(nanos>spinForTimeoutThreshold) //如果允许超时且还有剩余时间,就阻塞相应时间 LockSupport.parkNanos(this,nanos); } } //SNode里面的方向,调用者m是s的下一个节点 //这时候m节点的线程应该是阻塞状态的 booleantryMatch(SNodes){ //如果m还没有匹配者,就把s作为它的匹配者 if(match==null&& UNSAFE.compareAndSwapObject(this,matchOffset,null,s)){ Threadw=waiter; if(w!=null){//waitersneedatmostoneunpark waiter=null; //唤醒m中的线程,两者匹配完毕 LockSupport.unpark(w); } //匹配到了返回true returntrue; } //可能其它线程先一步匹配了m,返回其是否是s returnmatch==s; }
原文来自:https://www.py.cn
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
暂无评论内容