A SQLite background job system

I wanted a nice way to handle things outside of the usual request / response flow, as some things can be handled asynchronously, to do that I need to create some form of background job / queue system.

Similar to the checkouts at the supermarket, jobs go into a queue and the workers take the oldest job in the queue (the one at the front) to process. This could be one worker, or if the queue tends to be large several workers.

The Queue

Other background job systems i've seen such as Rails Active Job tend to use Redis as their queue mechanism, but I was looking for something simpler, i'm already using a SQLite database for my application so why not create a second DB file (to avoid lock contention) for the jobs/queue?

The schema for the jobs table is quite simple:

CREATE TABLE IF NOT EXISTS jobs (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  type TEXT,
  data TEXT,
  status TEXT DEFAULT 'pending',
  created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

When selecting a job I will tend to filter first by its status, workers are only interested in jobs that are currently 'pending' so at any point in time this would likely be a small selection of jobs. Next is to filter by 'type', as certain types are handled by different workers. Last the general order will be based on 'created_at', as I want to grab the oldest job which has therefore been waiting in the queue the longest.

The following index will help make the worker queries faster, avoiding having to scan the entire jobs table to execute a query.

CREATE INDEX IF NOT EXISTS idx_jobs_status_type_created_at ON jobs (status, type, created_at);

This is known as a composite index, as it includes a composition of several columns.

The Worker

The workers are quite simple, they run on a while loop with a timeout if no jobs are available (so not the spin the CPU in a tight loop for no reason).

The workers generally do three things to the queue.

They query the queue for the oldest job of their type that's pending.

They set that jobs status to 'processing', and after processing they set that jobs status to 'done'. If an error occurs during processing they can set that jobs status to 'failed'. Another process will periodically switch failed jobs back to pending, plus I can easily create a report on all failed jobs. What could be interesting is having a second table (errors) that links to failed jobs where I can store the stack trace and error information of a failed job, but i'll try that later.

With one worker, grabbing the oldest job and processing it is perfectly fine, however when multiple workers are trying to select their oldest job I saw multiple workers selecting the same job. This is because selecting the job, and updating it to 'processing' happen as separate commands at different times. There's a moment in between selecting and updating where another worker can also select the same job.

Unfortunately SQLite doesn't support the SELECT FOR UPDATE command which would do this in databases like PostgreSQL or MySQL. They're able to do that as they allow for row-level locking, where as SQLite only has table / database-level locking.

Luckily SQLite (and most SQL databases) have the concept of transactions, which allow you to group commands together which either all happen successfully, or not at all. This ensures data consistency, and avoids malformed data due to some commands executing and others not. I can use this to ensure that when I select the latest job its status can be set to 'processing' at the same time.

An example of the transaction I needed is:

BEGIN TRANSACTION;

SELECT * FROM jobs WHERE type = 'MY_JOB_TYPE' AND STATUS = 'pending' ORDER BY created_at LIMIT 1;

UPDATE jobs SET status = 'processing' WHERE id = ? AND status = 'pending';

COMMIT;

It's also possible to check a condition and ROLLBACK if needed instead of COMMIT for example:

await db.exec(`BEGIN TRANSACTION`);

try {
	const job = db.get(`SELECT * FROM jobs WHERE type = ? AND status = 'pending' ORDER BY created_at LIMIT 1`, type);

	if(job) {
		await db.run(`UPDATE jobs SET status = 'processing' WHERE id = ? AND status = 'pending'`, job.id);
		await db.exec(`COMMIT`);
		return job;
	}
	await db.exec(`ROLLBACK`);
} catch (error) {
  await db.exec(`ROLLBACK`);
}

Scaling to multiple workers

Ideally I want workers to scale up depending on the amount of jobs that need processing. To do this I simply run more instances of the worker program.

However, even with this transaction I started to see worker crashes, showing the error SQLITE_BUSY. To try and fix this I ran a few PRAGMA commands.

In SQLite, PRAGMA commands are a way to configure the database settings.

First was PRAGMA journal_mode = WAL; this turns on WAL mode (write ahead log) as appose to normal journal mode, which makes SQLite work better with concurrent readers.

The second was PRAGMA synchronous = 1; This adjusts how strict SQLite is at flushing to disk during transactions, it generally makes transactions run faster.

The last was PRAGMA busy_timeout = 5000; which sets the wait timeout to 5 seconds. This tells SQLite if the table is locked by another transaction retry the command again internally, if after 5 seconds you still can't run the command due to the table still being locked, then return a SQLITE_BUSY error.

Weirdly, even with this setting applied I was getting errors come back instantly. After a quick Google I found this fantastic answer from Igor Tandetnik on Stack Overflow that explains why busy_timeout was being ignored.

https://stackoverflow.com/questions/30438595/sqlite3-ignores-sqlite3-busy-timeout#30440711

It was due to deadlocking!

When the workers attempt to grab a job from the queue they first do a SELECT which is a read operation, and then attempt to do an UPDATE which is a write operation, this happens within the same transaction as seen earlier.

The answer above explains that BEGIN TRANSACTION assumes a read only lock until it attempts to do a write operation, it then tries to promote its lock to a write lock, but to do that it needs to wait for all current reads to complete. As we're doing a read and then a write in the same transaction SQLite detects that there's no way for the reads of other workers to complete, as they would also want to upgrade to a write lock, so it returns an error straight away.

This is why it was ignoring the busy timeout value.

To fix this I needed to specify that the transaction will almost always need a write lock, and to do that I changed the first command to:

BEGIN IMMEDIATE TRANSACTION;

After this the busy timeout worked as expected and even with 12 workers all calling the database at the same time I saw zero busy errors, and due to transactions multiple workers all successfully picked up unique jobs.

Setup and code

Here's a full example of the code using NodeJS.

// worker.js
import sqlite3 from 'sqlite3';
import { open } from 'sqlite';

const db = await open({
  filename: './jobs.db',
  driver: sqlite3.Database
});

await db.exec(`
  PRAGMA journal_mode = WAL;
  PRAGMA synchronous = 1;
  PRAGMA busy_timeout = 5000;

  CREATE TABLE IF NOT EXISTS jobs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    type TEXT,
    data TEXT,
    status TEXT DEFAULT 'pending',
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
  );

  CREATE INDEX IF NOT EXISTS idx_jobs_status_type_created_at ON jobs (status, type, created_at);
`);


async function getAndMarkJobAsProcessing(type) {
  await db.exec(`BEGIN IMMEDIATE TRANSACTION`);
  try {
	const job = await db.get(`SELECT * FROM jobs WHERE status = 'pending' AND type = ? ORDER BY created_at LIMIT 1`, type);
	if (job) {
		await db.run(`UPDATE jobs SET status = 'processing' WHERE id = ? AND status = 'pending'`, job.id);
		await db.exec(`COMMIT`);
		return job;
	}
	await db.exec(`ROLLBACK`);
  } catch (error) {
    await db.exec(`ROLLBACK`);
  }
}

async function markJobAsDone(id) {
  await db.run(`UPDATE jobs SET status = 'done' WHERE id = ?`, id);
}

async function markJobAsFailed(id) {
  await db.run(`UPDATE jobs SET status = 'failed' WHERE id = ?`, id);
}


// run the loop
while(true) {
  const job = await getAndMarkJobAsProcessing(`MY_JOB_TYPE`);
  
  if (job) {
	  console.log(`Processing job ${job.id}, ${job.type}, ${job.data}`);
	  try {
      // Add code to process the job...
      markJobAsDone(job.id);
      console.log(`Marking job ${job.id} as Done`);
    } catch(error) {
      markJobAsFailed(job.id);
      console.log(`Marking job ${job.id} as Failed`);
    }
  } else {
    // No pending jobs, wait 1 second before checking again
    await new Promise(resolve => setTimeout(resolve, 1000));
  }
}

The idea behind this was to create a simple background job processing queue that would allow for processing to happen asynchronously from the usual request / response flow of an HTTP request, with separate worker processing running along side the main application able to pick up and process jobs independently.

This setup runs on a single machine, potentially in different docker containers, but separate processes would do, so long as they're all attached to the same volume in order to talk to the SQLite jobs.db file.

I'll continue to make improvements to this and see how far I can push it going forward.