Scalability is something that is deemed easily achievable in the current time. Heroku and AWS can give us that extra power required to scale our apps with a few clicks and we are good to go. But true scalability lies in code. One common approach to achieving fast response times is to delegate time-consuming tasks to a (group of) background worker(s) that can possibly work in parallel and knock off the heavy slabs taking the load off the web process.
What we usually overlook is the load these job workers need to handle. At our organization, we enqueue background jobs from a lot of after_save
callbacks. When you reach a complex schema with objects saving their children, it’s hard to keep track of how many jobs even a small change could trigger. And a lot of times, most of these jobs are duplicate ones caused by the saving of the same object from different links.
How do we solve this? It can’t be easy tracking all those callbacks and prevent multiple jobs, right? The solution, as it turned out, was much simpler. A custom job adapter that could knock away those duplicate jobs keeping only one copy per job. We were able to decrease our job counts by more than 50% with this simple adapter. How did we make this adapter? Let’s check out the original delayed jobs adapter first:
Great, looks simple. All we need is to override enqueue
and enqueue_at
methods to find duplicate jobs and skip enqueueing them. If you study the format of the handler in the delayed_jobs
table, it’s pretty easy to find duplicate jobs. All we need is to check for the job class and the passed arguments. Here’s an example of how simply this can be implemented.
def already_queued?
query = Delayed::Job.where("handler ilike '%#{self.class}%'")
.where(:locked_at => nil, :locked_by => nil)
query = update_query_for_arguments(query, arguments)
query.any?
end
def update_query_for_arguments(query, arguments)
arguments.each do |a|
query = query.where("handler ilike '%#{a.try(:to_global_id) || a}%'")
end
query
end
I have left update_query_for_arguments
to be very simple, but it can be extended to support array and hash arguments if you need it.
Finally, time for our shiny new adapter that would automatically discard the duplicate jobs coming in.
module ActiveJob
module QueueAdapters
class UniqDelayedJobAdapter < ActiveJob::QueueAdapters::DelayedJobAdapter
class << self
alias super_enqueue enqueue
alias super_enqueue_at enqueue_at
def enqueue(job)
return if job.try(:already_queued?)
super_enqueue(job)
end
def enqueue_at(job, timestamp)
return if job.try(:already_queued?)
super_enqueue_at(job, timestamp)
end
end
end
end
end
Finally, enable the adapter at a convenient location (e.g. application.rb
or one of the environment specific initializers) and you are good to go:
config.active_job.queue_adapter = :uniq_delayed_job
Now you can enqueue to your job queue without worrying about duplicates eating up your hours for the worker.
MyJob.perform_later(my_arg) # Enqueued MyJob to delayed_jobs
MyJob.perform_later(my_arg) # No longer enqueues another job unless the previous one had already started processing