Scalability is something that is deemed easily achievable in the current time. Heroku and AWS can give us that extra power required to scale our apps with a few clicks and we are good to go. But true scalability lies in code. One common approach to achieving fast response times is to delegate time-consuming tasks to a (group of) background worker(s) that can possibly work in parallel and knock off the heavy slabs taking the load off the web process.

What we usually overlook is the load these job workers need to handle. At our organization, we enqueue background jobs from a lot of after_save callbacks. When you reach a complex schema with objects saving their children, it's hard to keep track of how many jobs even a small change could trigger. And a lot of times, most of these jobs are duplicate ones caused by the saving of the same object from different links.

How do we solve this? It can't be easy tracking all those callbacks and prevent multiple jobs, right? The solution, as it turned out, was much simpler. A custom job adapter that could knock away those duplicate jobs keeping only one copy per job. We were able to decrease our job counts by more than 50% with this simple adapter. How did we make this adapter? Let's check out the original delayed jobs adapter first:

Great, looks simple. All we need is to override enqueue and enqueue_at methods to find duplicate jobs and skip enqueueing them. If you study the format of the handler in the delayed_jobs table, it's pretty easy to find duplicate jobs. All we need is to check for the job class and the passed arguments. Here's an example of how simply this can be implemented.

def already_queued?  
  query = Delayed::Job.where("handler ilike '%#{self.class}%'")
                      .where(:locked_at => nil, :locked_by => nil)
  query = update_query_for_arguments(query, arguments)
  query.any?
end

def update_query_for_arguments(query, arguments)  
  arguments.each do |a|
    query = query.where("handler ilike '%#{a.try(:to_global_id) || a}%'")
  end
  query
end  

I have left update_query_for_arguments to be very simple, but it can be extended to support array and hash arguments if you need it.

Finally, time for our shiny new adapter that would automatically discard the duplicate jobs coming in.

module ActiveJob  
  module QueueAdapters
    class UniqDelayedJobAdapter < ActiveJob::QueueAdapters::DelayedJobAdapter
      class << self
        alias super_enqueue enqueue
        alias super_enqueue_at enqueue_at

        def enqueue(job)
          return if job.try(:already_queued?)
          super_enqueue(job)
        end

        def enqueue_at(job, timestamp)
          return if job.try(:already_queued?)
          super_enqueue_at(job, timestamp)
        end
      end
    end
  end
end  

Finally, enable the adapter at a convenient location (e.g. application.rb or one of the environment specific initializers) and you are good to go:

config.active_job.queue_adapter = :uniq_delayed_job  

Now you can enqueue to your job queue without worrying about duplicates eating up your hours for the worker.

MyJob.perform_later(my_arg) # Enqueued MyJob to delayed_jobs  
MyJob.perform_later(my_arg) # No longer enqueues another job unless the previous one had already started processing