Dec. 6, 2008, 8:48 p.m.

Simple Named Job Deduplication

Our Problem

We would like content on our web site available in our search engine as soon after the save as possible. Our search engine is decentralized in that every front-end has a copy of the search index and searches locally. This architecture allows searches to scale quite horizontally, but does so at the cost of simple index updates.

With a centralized search index, we could just push a modification into the central server and be done with it. With our architecture, we need a worker machine to build a search index and distribute it to all of the front-end machines.

Historically, we just had a cron job that'd occasionally rebuild and ship the index. Later, we started trying to keep track of what had changed and doing incremental updates.

Eventually, I figured out it'd be less work and faster if we just sent object changes into the job queue and had the index builder pick these little changes up and ship them to the web servers. This worked quite well for a while.

This became suboptimal when a bunch of content editors were rapidly making changes on a small development system with a couple nodes running in VMWare. The actual index distribution would just kill the machine due to IO on what ended up being the same disks.

The Idea

I wanted to keep the rapid update properties while trying to reduce IO. The obvious thing would be to try to aggregate multiple index updates into a single index distribution.

The Implementation

The first thing that had to be done, of course, was to break the job into two parts:

  1. The index update.
  2. The index distribution.

Now the trick is to ensure that for every index update, there is at least one index distribution without there being an index distribution for every update.

The easy way to do this is to define a job to have a "run after" date. This works very well in things such as index distribution. In this case, we've built an index and we want to make sure that the results of that index build make it to production. The job we queue will do it, but if these jobs block on each other, then any other job that runs after the time the index build completed will do.

So where do we track the timestamps so a job knows it doesn't need to run? Well, memcached ends up being a perfect place for this.

We give each job a name and a "run after" parameter, and store the "last run" timestamp in memcached under the job name. Really simple, and allows us to create as many of these jobs as we need.

Ruby code for doing this looks something like this:

def run_after_cb(name, timestamp, ob, method, *args)
  k="jobts_#{name}"
  t=cache[k] rescue 0
  if t && t > timestamp
    # Ignored -- log or something
  else
    nt = Time.now.to_i
    ob.send(method, *args)
    begin
      cache[k] = nt
    rescue
      # Can't record a new date (next job will run even if unnecessary)
    end
  end
end

memcached is often the last thing I'd recommend for any sort of thing that isn't exactly a cache, but the semantics fit quite well here. This is treated as an optimization such that only when we know for sure that a distribution is redundant will we drop it.

Specifically, the index will be distributed under the following conditions:

  1. When the key is not in the cache (never seen, dropped, etc...)
  2. When the key is found, but the date is in the past.
  3. When any error occurs when trying to talk to memcached.

Example

Imagine five index updates, each requiring a distribution occurring in the following scenario:

Dedup Example

Shortly after completing the first index update, a distribution will start. Content updated in update 2 and update 3 will not be included in this update.

After dist 1 completes, dist 2 is ready to go for update 2. update 2 completed at t2 and the most recent update completed at t1, so we start dist 1.

Because dist 2 begins at t4, it naturally includes the effects of update 2 and update 3, but not update 4 which started before dist 2 began.

Now for a bit of imagination because I'm too lazy to draw this better.

Although it's not illustrated here, it should be clear that the next update would be dist 3 (queued by update 3 for updates after t3). That next update would be dropped because the effects of it have already been distributed.

Next would be dist 4 which would have been queued from update 4 and that one would not be dropped, but dist 5 would be.

In this example, we distributed our index three times for five updates. In practice, this helps quite a bit -- especially when things start getting slow and the distributions are backing up anyway.

blog comments powered by Disqus