There are a lot of implementations out there, but one I would like to describe is using pure JDK concurrent framework classes: DelayedQueue and Delayed interface.
Let me start with simple (and empty) interface which defines the work item. I am skipping the implementation details like properties and methods as those are not important.
1 2 3 4 5 | package com.example.delayed; public interface WorkItem { // Some properties and methods here } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | package com.example.delayed; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class PostponedWorkItem implements Delayed { private final long origin; private final long delay; private final WorkItem workItem; public PostponedWorkItem( final WorkItem workItem, final long delay ) { this .origin = System.currentTimeMillis(); this .workItem = workItem; this .delay = delay; } @Override public long getDelay( TimeUnit unit ) { return unit.convert( delay - ( System.currentTimeMillis() - origin ), TimeUnit.MILLISECONDS ); } @Override public int compareTo( Delayed delayed ) { if ( delayed == this ) { return 0 ; } if ( delayed instanceof PostponedWorkItem ) { long diff = delay - ( ( PostponedWorkItem )delayed ).delay; return ( ( diff == 0 ) ? 0 : ( ( diff < 0 ) ? - 1 : 1 ) ); } long d = ( getDelay( TimeUnit.MILLISECONDS ) - delayed.getDelay( TimeUnit.MILLISECONDS ) ); return ( ( d == 0 ) ? 0 : ( ( d < 0 ) ? - 1 : 1 ) ); } } |
Now, we are mostly done! To complete the example, let's make sure that same work item won't be submitted twice to the work queue by implementing equals and hashCode (implemenation is pretty trivial and should not require any comments).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | public class PostponedWorkItem implements Delayed { ... @Override public int hashCode() { final int prime = 31 ; int result = 1 ; result = prime * result + ( ( workItem == null ) ? 0 : workItem.hashCode() ); return result; } @Override public boolean equals( Object obj ) { if ( this == obj ) { return true ; } if ( obj == null ) { return false ; } if ( !( obj instanceof PostponedWorkItem ) ) { return false ; } final PostponedWorkItem other = ( PostponedWorkItem )obj; if ( workItem == null ) { if ( other.workItem != null ) { return false ; } } else if ( !workItem.equals( other.workItem ) ) { return false ; } return true ; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | package com.example.delayed; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; public class WorkItemScheduler { private final long delay = 2000 ; // 2 seconds private final BlockingQueue< PostponedWorkItem > delayed = new DelayQueue< PostponedWorkItem >(); public void addWorkItem( final WorkItem workItem ) { final PostponedWorkItem postponed = new PostponedWorkItem( workItem, delay ); if ( !delayed.contains( postponed )) { delayed.offer( postponed ); } } public void process() { final Collection< PostponedWorkItem > expired = new ArrayList< PostponedWorkItem >(); delayed.drainTo( expired ); for ( final PostponedWorkItem postponed: expired ) { // Do some real work here with postponed.getWorkItem() } } } |
Enjoy!
3 comments:
I can't see how this example will work. In the compareTo method of the PostponedWorkItem class how does the instanceOf check produce the right result? If I schedule two WorkItem instances with a 20 second delay, but 10 seconds apart, they will both pass that check and be reported as equal even though they clearly aren't. Can someone help me by pointing out what I'm missing?
Thanks a lot for the very valid point. Your understanding is absolutely correct and implementation should work as you commented. But the use case described in this post targets following goal: usually, delay is the same for all work items and same work shouldn't be scheduled more then once within this delay. Examples could be: expensive calculations (as there is already calculation scheduled, no need to schedule another one), sending e-mails, deleting files ... It's a kind of throttling if you wish. Thank you.
The class is currently used ? What for? When? If the class was allowed to use which was the reason
Post a Comment