本教程操作环境:windows7系统、java10版,DELL G3电脑。
1.主要属性
//用于控制并发的锁 privatefinaltransientReentrantLocklock=newReentrantLock(); //优先级队列 privatefinalPriorityQueue<E>q=newPriorityQueue<E>(); //用于标记当前是否有线程在排队(仅用于取元素时) privateThreadleader=null; //条件,用于表示现在是否有可取的元素 privatefinalConditionavailable=lock.newCondition();
从属性我们可以知道,延时队列主要使用优先级队列来实现,并辅以重入锁和条件来控制并发安全。
2.延迟实例
下单完成,10分钟后,如果没有支付,发送消息,提醒支付。
发消息好解决,这里主要讨论延时的问题,Java里,有个DelayQueue,可以实现延时的功能。
publicclassTt{
privateDelayQueue<OrderInfo>messageQueue=newDelayQueue<OrderInfo>();//消息队列
publicstaticvoidmain(String[]args){
Ttt=newTt();
OrderInfooi=newOrderInfo("1",newDate().getTime()+3000);//加3000,表示延时3秒
OrderInfooi2=newOrderInfo("2",newDate().getTime()+3000);
t.messageQueue.offer(oi);//当接收到订单后,往队列里放一条消息
t.messageQueue.offer(oi2);
System.out.println(newDate().toString()+"启动,并加入了两条消息");//模拟接收到了两个订单
//这里是消费线程,随程序运行启动
t.handle();
}
privatevoidhandle(){
ThreadPoolHolder.getThreadPool().submit(newRunnable(){
@Override
publicvoidrun(){
while(true){
try{
OrderInfomessage=messageQueue.take();//take方法,有消息(延时时间<0的消息)取消息,没有,阻塞住。
//这里查一遍订单..符合需求发消息,不符合不发
System.out.println(newDate().toString()+".....发送消息id"+message.getId());
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
});
}
}
//消息体,必须实现Delayed接口
publicclassOrderInfoimplementsDelayed{
privateStringid;
publicOrderInfo(Stringid,longexcuteTime){
this.id=id;
this.excuteTime=excuteTime;
}
privatelongexcuteTime;//延迟时长,这个是必须的属性因为要按照这个判断延时时长。
publicStringgetId(){
returnid;
}
@Override
publiclonggetDelay(TimeUnitunit){
returnunit.convert(excuteTime,TimeUnit.NANOSECONDS)-unit.convert(System.currentTimeMillis(),TimeUnit.NANOSECONDS);
}
@Override
publicintcompareTo(Delayedo){
OrderInfomsg=(OrderInfo)o;
returnInteger.valueOf(this.id)>Integer.valueOf(msg.id)?1
:(Integer.valueOf(this.id)<Integer.valueOf(msg.id)?-1:0);
}
}
原文来自:https://www.py.cn
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END

















































暂无评论内容