本教程操作环境:windows7系统、java10版,DELL G3电脑。
1.主要属性
//用于控制并发的锁 privatefinaltransientReentrantLocklock=newReentrantLock(); //优先级队列 privatefinalPriorityQueue<E>q=newPriorityQueue<E>(); //用于标记当前是否有线程在排队(仅用于取元素时) privateThreadleader=null; //条件,用于表示现在是否有可取的元素 privatefinalConditionavailable=lock.newCondition();
从属性我们可以知道,延时队列主要使用优先级队列来实现,并辅以重入锁和条件来控制并发安全。
2.延迟实例
下单完成,10分钟后,如果没有支付,发送消息,提醒支付。
发消息好解决,这里主要讨论延时的问题,Java里,有个DelayQueue,可以实现延时的功能。
publicclassTt{ privateDelayQueue<OrderInfo>messageQueue=newDelayQueue<OrderInfo>();//消息队列 publicstaticvoidmain(String[]args){ Ttt=newTt(); OrderInfooi=newOrderInfo("1",newDate().getTime()+3000);//加3000,表示延时3秒 OrderInfooi2=newOrderInfo("2",newDate().getTime()+3000); t.messageQueue.offer(oi);//当接收到订单后,往队列里放一条消息 t.messageQueue.offer(oi2); System.out.println(newDate().toString()+"启动,并加入了两条消息");//模拟接收到了两个订单 //这里是消费线程,随程序运行启动 t.handle(); } privatevoidhandle(){ ThreadPoolHolder.getThreadPool().submit(newRunnable(){ @Override publicvoidrun(){ while(true){ try{ OrderInfomessage=messageQueue.take();//take方法,有消息(延时时间<0的消息)取消息,没有,阻塞住。 //这里查一遍订单..符合需求发消息,不符合不发 System.out.println(newDate().toString()+".....发送消息id"+message.getId()); }catch(InterruptedExceptione){ e.printStackTrace(); } } } }); } }
//消息体,必须实现Delayed接口 publicclassOrderInfoimplementsDelayed{ privateStringid; publicOrderInfo(Stringid,longexcuteTime){ this.id=id; this.excuteTime=excuteTime; } privatelongexcuteTime;//延迟时长,这个是必须的属性因为要按照这个判断延时时长。 publicStringgetId(){ returnid; } @Override publiclonggetDelay(TimeUnitunit){ returnunit.convert(excuteTime,TimeUnit.NANOSECONDS)-unit.convert(System.currentTimeMillis(),TimeUnit.NANOSECONDS); } @Override publicintcompareTo(Delayedo){ OrderInfomsg=(OrderInfo)o; returnInteger.valueOf(this.id)>Integer.valueOf(msg.id)?1 :(Integer.valueOf(this.id)<Integer.valueOf(msg.id)?-1:0); } }
原文来自:https://www.py.cn
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
暂无评论内容