Using Postgres as a task queue for rowdy Gophers

November 29, 2022
manav@ente.io

We love Postgres. It plays at the core (alongwith Go) of our conception of using "boring software" to make rock solid infrastructure.

But if all one has a hammer, everything starts looking like a nail 🤖

In this case, the nail that I wanted to hammer was keeping a queue of pending tasks that can then be picked up by a bunch of goroutines (or Gophers, as Rob Pike endearingly refers to them in his talk on how Concurrency is not Parallelism).

But we don't just have one server, we have multiple machines, so native Go communication mechanisms like channels wouldn't suffice to prevent these Gophers from stepping on each others toes; they're not in the same address space and can't see each other.

Plus, every once in a while the Docker containers that run our Go code will restart as we roll out updates. These Gophers would all sink alongwith the container, and we'd lose sight of what remains doing.

Enough of these gruesome metaphors of dying Gophers though (So that's where the name "Gru" comes from!). We needed a persistent queue. And the hammer I had was Postgres.

But turns out that SQL is not a great fit for keeping track of queues. Or at least, not in the general case. This is why there is a plethora of queuing solutions out there. From a simple Redis to a monstrous Kafka, a poison can be had of your choosing.

But I like my hammer! More seriously though, a corollary to the ideology of using boring software is to also keep the number of moving parts to a minimum. And the queue I wanted to implement was not for an arbitrary general case, it was for a more specific case.

So this is a story of what I had to do to meet that goal.


Instead of coming up with an hypothetical example to tell the story, I'll continue with a simplified version of the original problem I'd needed to solve. Some of the details might be superfluous, but in lieu of the extra details we'll get a more concrete and realistic illustration of where Postgres as a queue is a reasonable choice.

If you're just looking for the code to copy paste 😅, scroll down to the end to the part with UPDATE...SKIP LOCKED.

So, the problem statement.

We replicate encrypted user data across three cloud providers. Why three? We'd started with two – 1 hot, 1 cold. But after seeing the latencies for the cold one, we realized that just the cold one would result in a long delay in case of disaster recovery, so then we figured we needed another hot one.

So two hot buckets, and one cold one. And our client apps upload to the primary hot bucket.

Replication is then done by goroutines running on our servers, and is conceptually quite simple - download the file from the primary hot bucket (Backblaze "b2"), and upload it to the other two buckets ("wasabi", and Scaleway "scw").

There are many more details, like using object locks to ensure files cannot be deleted even in case of credential leaks. But more on all that in a separate post coming soon!

These goroutines need to know which files are pending replication. And since the replication can take time for larger files, so they also need to ensure that another Gopher doesn't come along at the same time and starts replicating the same file.

Also, it is possible that a Gopher might be able to upload a file to only one of the replicas. The buckets are in different clouds, and there can be transient failures in one of them. In such cases the Gopher should mark that it was uploaded in one of the places, and leave the second upload for some other Gopher to come back later and pick up, when the weather permits.

Finally, the server might restart in the middle of the upload, and eventually such uncompleted uploads should get picked up again.

Listing down the requirements:

  1. Keep track of which files need to be uploaded, and where.
  2. Lock a file that is currently being uploaded.
  3. Clean up stray locks on unexpected restarts.
  4. Do all of this scalably so that we can have many Gophers running in parallel.
  5. As much as possible, make it fast.

Quite a bit.

No bother. Let's start simple, and create a SQL table that suffices for 1 and 2.

CREATE TABLE object_copies (
    object_key TEXT PRIMARY KEY,
    b2         BIGINT,
    wasabi     BIGINT,
    scw        BIGINT,
    lock       BIGINT)

In S3 parlace, files are referred to as "objects", and are stored in bucket after being associated with unique keys. Thus, the table is named object_copies, and the primary key is object_key.

Then there are three fields, one for each of the replicas. We actually need just a boolean value to indicate whether or not the file has been yet added to that replica. But hey, let's splurge a bit and store the timestamp when we added them. Felt cute, might delete later.

Finally, there is a field (lock) to indicate whether the file is currently undergoing replication. This one needs to be a timestamp, a boolean won't suffice, because by having a timestamp we can go back and clean out stale entries that are too old (we'll see how in a bit).

Given this table, a Gopher can find the next file that needs replication by using the following query. This'll select rows where one or more of the copies hasn't been added yet.

SELECT object_key FROM object_copies
WHERE wasabi IS NULL OR scw IS NULL

The value of the field for the primary hot storage (in this case b2) doesn't matter since it'll always be there - the code that gets the row into this table in the first place will ensure that.

But it is not redundant. For the purpose of keeping this post short I'll not go further into it, but having it there allows us to swap primary / secondary replicas, add entirely new replicas with different cloud providers in the future, and even allows the possiblity of selective replication, since all the replicas are symmetrically reflected in the schema.

A Gopher is quite small and can't carry multiple files, and the previous query would give the poor soul all the burden in the world. So let's modify the query to only find one item by adding a LIMIT clause.

SELECT object_key FROM object_copies
WHERE wasabi IS NULL OR scw IS NULL
LIMIT 1

When we get an item, we also want to set the lock field to tell other Gophers to keep away from this one. Let's do that by wrapping our query in an update.

UPDATE object_copies SET lock = now_utc_micro_seconds()
WHERE object_key = (
    SELECT object_key FROM object_copies
    WHERE wasabi IS NULL OR scw IS NULL
    LIMIT 1)

That's not quite correct though. We should also ignore rows that've already been locked.

UPDATE object_copies SET lock = now_utc_micro_seconds()
WHERE object_key = (
    SELECT object_key FROM object_copies
    WHERE (wasabi IS NULL OR scw IS NULL) AND lock IS NULL
    LIMIT 1)

Ah, but that's still not correct.

Two Gophers might run the same query at the same time, grab the same row before the first one can notice that the other one has also updated the lock. You see, despite it's appearance to us as a nested query, for the the database these are effectively two sequential operations - a SELECT followed by an UPDATE. That is, roughly speaking, this is what Postgres sees:

BEGIN TRANSACTION;

SELECT object_key FROM ...;

UPDATE object_copies SET lock = ...;

COMMIT TRANSACTION;

And if two Gophers were to run this at the same time, we could end up with an interleaving like this:

SELECT 1 -- Gopher 1
SELECT 2 -- Gopher 2 also selects the same row
UPDATE 1
UPDATE 2 -- And both update the same row

What we want here is a way to tell Postgres to "lock" the row it selects for the duration of the transaction.

This is a different "lock" than our field. What I'm talking of here is a database row level lock that Postgres can place on a row to prevent other transactions from touching it. It is is different from our conceptual, application level lock that we place using the lock column.

I guess I could rename the field to gopher_lock to make it less confusing, but I doubt my colleagues will find my humor funny when they look back at this code 😶‍🌫️ so let's keep calling our field lock.

SQL provides for such row-level locking with the FOR UPDATE hint we can pass to our SELECT statement. This will cause the rows that are selected to also be locked for the duration of the transaction, as if they'd been updated not just selected.

UPDATE object_copies SET lock = now_utc_micro_seconds()
WHERE object_key = (
    SELECT object_key FROM object_copies
    WHERE (wasabi IS NULL OR scw IS NULL) AND lock IS NULL
    LIMIT 1
    FOR UPDATE)

Some of you might be wondering whether this transaction that I'm speaking of, if that is in the room with us?

After all, we didn't write the BEGIN and END TRANSACTION anywhere?

Indeed, it is there. Postgres wraps our UPDATE ... (SELECT ...) in an implicitly generated transaction when we run that command (or any command / query for that matter).

With the row level lock, the select + update will be "atomic", and is now functionally correct.

But it has a more insidiuous problem now - Lock contention.

Imagine we have 30 Gophers working in parallel. At any moment in time there will be multiple of them that'll be running this statement to find the next task. Given the deterministic nature of the queries, they'll all end up with wanting the same row, but only one of them will get it, and rest will all sit there idle, twiddling their thumbs. Gopher thumbs are quite small and not really comfortable to twiddle either.

One might imagine (as I had) that this problem can be solved, if not in theory at least in practice, by having some form of randomness in the order in which rows are returned. So on a big enough table, each Gopher will end up getting a different row first instead of contending over the same row.

Unfortunately, there isn't a magical ORDER BY RANDOM in SQL.

There is a seemingly similar ORDER BY RANDOM(), but that is unusably inefficient - it always does a full table scan, generating a random number for each row, sorts the full table that way, and then returns it. Disk grindingly CPU meltingly inefficient.

That said, there are workarounds to achieve a random ordering. But they're just that, workarounds, I didn't find any of them satisfying, especially given that there is a simpler way.

Now is the moment to introduce the hero of our story -- SKIP LOCKED.

SKIP LOCKED is a special hint we can give to Postgres to tell it to just move on if it finds a row that has been locked by someone else. This normally wouldn't make sense for database queries (since it will give an inconsistent view of the data), but it is exactly the thing we need if we're trying ot use the database as a queue. In fact, the queue use case is the reason why this hint was introduced in the first place.

Armed with this bit of trickery, let us chant our spell again.

UPDATE object_copies SET lock = now_utc_micro_seconds()
WHERE object_key = (
    SELECT object_key FROM object_copies
    WHERE (wasabi IS NULL OR scw IS NULL) AND lock IS NULL
    LIMIT 1
    FOR UPDATE SKIP LOCKED)

Beautiful. There is one small garnish we need to add - when we run this query, our Gopher would also need to know the object_key and which of the two replicas needs syncing. For this, we can add a RETURNING clause to the `UPDATE to return all relevant fields:

UPDATE object_copies SET lock = now_utc_micro_seconds()
WHERE object_key = (
    SELECT object_key FROM object_copies
    WHERE (wasabi IS NULL OR scw IS NULL) AND lock IS NULL
    LIMIT 1
    FOR UPDATE SKIP LOCKED)
RETURNING object_key, wasabi, scw

Our query is now complete.

In case you're wondering where all of this magic is documented, stop (wondering).

It is all there in the excellent PostgreSQL documentation.

We do need help from other sources to connect the various parts sometimes, e.g. this post about SKIP LOCKED was very helpful to me. But the details are all there in the Postgres documentation itself, so RTFM.


So are we done here?

Let's look back at our original list of requirements:

  1. Keep track of which files need to be uploaded, and where.
  2. Lock a file that is currently being uploaded.
  3. Clean up stray locks on unexpected restarts.
  4. Do all of this scalably so that we can have many Gophers running in parallel.
  5. As much as possible, make it fast.

Items 1 and 2 are done, and so is 4.

This post is already too long, so I won't delve into details for #3, but I'll give a brief overview.

  1. Clean up stray locks on unexpected restarts.

One approach to handle this cleanup would be to obviate the need for a cleanup (duh). We can do that by explicitly starting a transaction in which our UPDATE/SELECT statement runs, but keep the transaction running as we do the replication. This way, if the replication were to not complete because of some reason, the transaction would never be committed and the next Gopher that comes around will find the same row in its original state, and pick it up.

In fact, this way, we wouldn't need the lock field at all.

That said, for now I've gone with an external cleanup. There is a periodic "cron" goroutine that clears out any entries where the value of the lock field is older than a day. Seems to be working fine for now, but I can foresee myself revisiting this some day and switching to the cleaner(!) first approach.

Update: We did indeed end up deploying a variant of this that uses a long running transaction instead of using the lock field. Hopefully we'll get time to do a follow up post with details of that some time in the future, but meanwhile I felt it prudent to mention that here. Using the long running transaction ended up being both lesser code, and also conceptually cleaner. To avoid confusion, I've only added this update paragraph, but have kept the original post unchanged otherwise.


So let us focus on requirement 5 for the rest of the post.

  1. As much as possible, make it fast.

As with anything to do with databases, there is a straightforward answer on how to achieve that – add an index 😬

Hell, let's add two 🥶

CREATE INDEX ON object_copies (wasabi) WHERE wasabi IS NULL;
CREATE INDEX ON object_copies (scw) WHERE scw IS NULL;

This is not the only possibility - there are other indicies we could've used too. For example, we could've added a single combined indexed with the same WHERE clause as our actual SELECT query.

But by adding separate indexes, we allow for the possibility of future new columns for new replicas being added to the same table, without invalidating the existing indexes. The newer columns will get their own indexes.

Another practical reason is that we have reporting dashboards that show the number of unreplicated files in each replica, and having these separate indexes per replica make those reporting queries fast.

One thing to note here is the WHERE clause in the CREATE INDEX.

The idea behind that being that usually there are only going to be a small number of new files (relative to the total number of files) that are not replicated yet. So by creating an index on WHERE wasabi is NULL / WHERE scw is NULL, we don't waste Postgres' time (and disk) asking it to keep an index on rows that have already been replicated.

So does this work? Let's ask Postgres!

EXPLAIN
UPDATE object_copies SET lock = now_utc_micro_seconds()
WHERE object_key = (
    SELECT object_key FROM object_copies
    WHERE (wasabi IS NULL OR scw IS NULL) AND lock IS NULL
    LIMIT 1 FOR UPDATE SKIP LOCKED)
RETURNING object_key, wasabi, scw

This is our final query as before, just wrapped in an EXPLAIN, requesting Postgres to tell us about what it's doing behind the scenes.

                                    QUERY PLAN
---------------------------------------------------------------------------------------
 Update on object_copies  (...)
   InitPlan 1 (returns $1)
     ->  Limit  (...)
           ->  LockRows  (...)
                 ->  Seq Scan on object_copies object_copies_1  (...)
                       Filter: (((wasabi IS NULL) OR (scw IS NULL)) AND (lock IS NULL))
   ->  Index Scan using object_copies_pkey on object_copies  (...)
         Index Cond: (object_key = $1)

From the Seq Scan, we can see that Postgres did not use our newly created indices. Hmm.

At this point come to mind the two excellent tips provided by pgMustard in their clearly titled post - Why is Postgres not using my index?! (the exclamation mark is my own 🤓)

There are two main reasons that Postgres will not use an index. Either it can’t use the index, or it doesn’t think using the index will be faster.

Their post goes on two show how you can differentiate between the two reasons.

Let's start by ruling out reason 1. This is easy - we tell Postgres to imagine a world in which sequential scans are very costly

SET enable_seqscan = off

and then run our EXPLAIN again

                                     QUERY PLAN
--------------------------------------------------------------------------------------
 Update on object_copies  (...)
   InitPlan 1 (returns $1)
     ->  Limit  (...)
           ->  LockRows  (...)
                 ->  Bitmap Heap Scan on object_copies object_copies_1  (...)
                       Recheck Cond: ((wasabi IS NULL) OR (scw IS NULL))
                       Filter: (lock IS NULL)
                       ->  BitmapOr  (...)
                             ->  Bitmap Index Scan on object_copies_wasabi_idx  (...)
th=0)
                                   Index Cond: (wasabi IS NULL)
                             ->  Bitmap Index Scan on object_copies_scw_idx  (...)
0)
                                   Index Cond: (scw IS NULL)
   ->  Index Scan using object_copies_pkey on object_copies  (...)
         Index Cond: (object_key = $1)

It used our index! We don't need to understand all the details of what the QUERY PLAN is telling us, but we can already see it is using Bitmap Index Scan and Bitmap Heap Scan and other fancy stuff instead of a plain old sequential scan.

If you need to understand more of what the query plan is trying to tell us, the Postgres docs for EXPLAIN are a great resource.

So the reason Postgres wasn't using our indicies earlier was because it doesn’t think using the index will be faster. Why could that be?

There isn't any data in the table! Let us add some data. First, let us reset enable_seqscan,

SET enable_seqscan = on

Then let us add some rows to the table,

INSERT into object_copies (object_key)
SELECT generate_series(1, (10^6)::int)::text

and run the EXPLAIN again.

Unfortunately, this time we'll find that it went back to using a sequential scan.

If you're not seeing the results that we're discussing, then try to run

ANALYZE

and re-run the EXPLAIN.

ANALYZE causes Postgres to refresh its statistics about what the table contains.

Maybe we need to add more data. What's a million rows anyways, a table for ants?

But no, that's not the issue here. In addition to the amount of data, Postgres also considers the distribution of data. Currently, all our rows have all both wasabi and scw set to NULL, so using the indicies we'd created wouldn't help in any way.

Let us try to modify the data in the table to reflect the sort of scenario we'll face in production -- most rows have been replicated, but here and there are a few files that are left.

UPDATE object_copies SET wasabi = 1, scw = 1;
UPDATE object_copies SET wasabi = NULL WHERE mod(object_key::int, 41) = 0;
UPDATE object_copies SET scw = NULL WHERE mod(object_key::int, 61) = 0;

Now let us run EXPLAIN again.

Indeed, this time it uses the indices!

So with that, our work here is done.


Hopefully this was useful to you. And if not useful, at least you found my bumbling around to the final solution enjoyable.

Till next time, happy hacking!