Oracle/Hibernate de-queuing
or how to use SELECT … FOR UPDATE SKIP LOCKED without any ROWNUM or FETCH FIRST ROWS, but rather scroll() and setFetchSize() in order to process a job queue in multi-thread
This is a common problem: you have a queue of events that you want to process, like for example, application storing the e-mails to send, and a background job reading them, send the e-mail, and update the status from ‘to-do’ to ‘done’, when successful. There are some message queue specialized software, but a SQL table can be preferred, especially when the queuing is done by the database application: the same database means same Atomicity, Consistency, and Durability. And you want this robustness because you don’t want to miss an e-mail to send, and you don’t want to send duplicate e-mails.
But you can see that I’ve mentioned only the ACD of the ACID properties. You want a special thing about Isolation here. Because normal isolation would mean that you lock the rows that you process. But then, your de-queuing cannot be multi-threaded because all the others will wait on it. Oracle has the FOR UPDATE SKIP LOCKED exactly for that purpose: you don’t wait, you don’t stop, you just skip the row. This was documented lately (11g if I remember well) but it is there for a long time for internal use by Advanced Queuing.
In my opinion, even if it is documented we should take it with care, especially:
- use it for what it is designed for — an AQ-like case
- keep it simple and don’t tweak the limitations
If you want to get an idea about the limitations, your google search should find those two links:
Jonathan Lewis:
AskTOM:
In Jonathan Levis blog, you can see the problem when you want to use ROWNUM so that multiple threads can dequeue a small subset of rows to process. ROWNUM is evaluated before the FOR UPDATE. And then, two concurrent jobs will read the same rows. The first thread will take all of them and the second one will discard them. What you want is to limit the rows fetched after verifying that they are not locked by another session.
In the second link, Connor Mc Donalds shows how to do it properly: not limiting the query result, but only fetching the first rows. Connor’s example is in PL/SQL. My post here is to show the correct way from Hibernate.
Then you will add ‘Vlad’ to your google search an find:
Vlad Mihalcea shows how to do it in general, but the implementation of SKIP LOCKED ROWS is very different from one database to the other. And, as explained above, with Oracle we should not mention a ‘ROWNUM<’ or ‘FETCH FIRST ROWS’ or ‘row_number()’ to limit the result of a SKIP LOCKED. Actually, it can be tempting to mixing FOR UPDATE SKIP LOCKED with ROWNUM for row limitation, ORDER BY, and maybe even DBMS_RANDOM. But that’s too many non-deterministic operations that are very dangerous, especially for something as critical as job de-dequeuing.
The good example
I’ve built a small example with a simple MESSAGE table with an id and a flag. The rows are inserted with flag=0 and each thread will update the flag with its thread number.
Here is my Message class:
class Message {
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
private Long id;
private Integer flag;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Integer getFlag() {
return flag;
}
public void setFlag(Integer flag) {
this.flag = flag;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Message)) return false;
Message message = (Message) o;
return Objects.equals(getId(), message.getId());
}
@Override
public int hashCode() {
return Objects.hash(getId());
}
}
And here is my de-queuing code:
s=sf.openSession();
Message message;
Transaction x;
x=s.beginTransaction();ScrollableResults i = s.createQuery("from HibernateSkipLocked$Message where flag=0").setLockMode(LockModeType.PESSIMISTIC_WRITE).setHint("javax.persistence.lock.timeout",LockOptions.SKIP_LOCKED).setFetchSize(5).scroll(ScrollMode.FORWARD_ONLY);
int limit=5;while( i.next() && limit-->0) {
message=(Message)i.get(0);
try { Thread.sleep(1000); } catch (Exception e) {}
message.setFlag(thread);
System.out.println(message.getId()+" -> "+message.getFlag()+ " "+new Timestamp(System.currentTimeMillis()));
}x.commit();
s.close();
And here are the what/why…
This will generate the FOR UPDATE:
.setLockMode(LockModeType.PESSIMISTIC_WRITE
This will generate the SKIP LOCKED:
.setHint("javax.persistence.lock.timeout",LockOptions.SKIP_LOCKED)
This will fetch 5 rows maximum for each fetch call (as my goal is to process 5 rows I don’t want the cursor to go further — as all fetched rows will be locked):
.setFetchSize(5)
This will get an iterator on the cursor. Do not use .list() because then all rows will be read and locked by the first thread and the next thread will have nothing to process:
.iterate()
This reads only 5 rows from the result:
int limit=5;
while( i.hasNext() && limit-->0)
That, combined with the fetch size, will ensure that we do one fetch call that finds 5 unlocked rows, lock them, return them. And we read and process those rows and stop (close the cursor) without another fetch call.
If you are not familiar with the LockModeType and LockOptions here is the query generated by Hibernate:
select
hibernates0_.id as col_0_0_
from
message hibernates0_
where
hibernates0_.flag=0 for update
skip locked
The simpler the better here: FOR UPDATE SKIP LOCKED.
Here is a screenshot of the result, with my System.out.println in yellow showing the ids updated by thread 1 ( -> 1 ) and thread 2 ( ->2 ) at the same time.
Now let’s show what happens if we don’t do that correctly
The bad example without SKIP LOCKED
Without SKIP LOCKED, the first thread locks the rows and the second one to read them waits on it — the threads are finally serialized:
java.util.Iterator i = s.createQuery("from HibernateSkipLocked$Message where flag=0").setLockMode(LockModeType.PESSIMISTIC_WRITE).setFetchSize(5).iterator();
The ugly example with list() instead of scroll()
There’s a common misconception that the result is fully read when the query is executed. That’s not the normal behavior of the database. Where there’s no blocking operation (like GROUP BY or ORDER BY) the rows are read-only when needed. Yes, SQL RDBMS does lazy reads! Except when you explicitly read all rows to put them in a list, with list(). But in this case, the first thread locks all rows, even if it processes only 5 of them. And the second thread then skips all of them:
java.util.Iterator i = s.createQuery("from HibernateSkipLocked$Message where flag=0").setLockMode(LockModeType.PESSIMISTIC_WRITE).setHint("javax.persistence.lock.timeout",LockOptions.SKIP_LOCKED).setFetchSize(5).list().iterator();
Order By
You may want to process rows in order. That should be rare because the goal of multi-threaded de-queuing is to process quickly so the order does not really matter. If you need a first-in-first-out then maybe you don’t want to multi-thread. And then add an ORDER BY but remove the SKIP LOCKED.
Be very careful if you add an ORDER BY to the HQL query above. You will see a warning and the SELECT separated from the FOR UPDATE SKIP LOCKED:
WARN: HHH000444: Encountered request for locking however dialect reports that database prefers locking be done in a separate select (follow-on locking); results will be locked after initial query executesHibernate:
select
hibernates0_.id as id1_0_,
hibernates0_.flag as flag2_0_
from
message hibernates0_
where
hibernates0_.flag=0
order by
hibernates0_.id
Hibernate:
select
id
from
message
where
id =? for update
skip locked
But then, that defeats the goal of multi-threading because we want to skip in the initial SELECT.
Then the idea is to fall back to a native query, here doing a last-in-first-out
ScrollableResults i = s.createNativeQuery("select * from message where flag=0 order by id desc for update skip locked",Message.class).setFetchSize(5).scroll(ScrollMode.FORWARD_ONLY);
Here is the result where the thread 2 has taken id 14 to 10 and thread 1 then got id 9 to 5:
A few additional thoughts
While talking about the order, maybe you want an index on the flag that you use in the predicate. That’s useful if you don’t delete the rows processed but just update them. Because after a while a full table scan will have to read a big history of rows before finding those to process. Except if you can partition so that the processed ones are moved to another partition. Deleted rows leave the space for the new inserts and that’s really what we want here.
You can also plan the shuffling at the time of insertion. For example, adding a round-robin number (from a modulo or a cycle sequence) and each thread will be assigned a batch number.
Another consideration to keep in mind with queuing tables: the statistics. The queue is volatile by nature. You will probably prefer to lock the statistics. And if an unexpected de-queuing outage had made the queue grow larger than normal, then think about a SHRINK operation to lower the high water mark.
Updated 21-JUN-2019
You can see when looking at my Java code style, that I’m not a developer. So please read the feedback from Vlad Michalcea:
I’ve updated this post to use scroll() instead if iterate() and the nice thing is that it works with native queries as well.