Implementing a custom Redis and in-memory bloom filter

In our email marketing and delivery products (GoDaddy Email Marketing and Mad Mimi) we deal with lots of data and work with some interesting data structures like bloom filters. We made an optimization that involved replacing an old bloom filter built in-memory and stored on Amazon S3 with a combination of a Redis bloom filter and an in-memory bloom filter. In this blog post we’ll go through the reasoning for this change as well as the details of the bloom filter implementation we landed on. Let’s first start with a brief introduction to bloom filters.

What is a bloom filter?

A Bloom filter is a space-efficient probabilistic data structure, designed to test whether an element is a member of a set. Because of its probabilistic nature, it can guess if an element is in a set with a certain precision or tell for sure if an element is not in a set. That is an important detail to design around as we’ll see later. If you’re curious about the math involved, check out this blog post for more details.

What is the real problem we are solving?

In our email delivery products, each plan places limit on the number of unique contacts our customers can send emails to in a billing cycle. An interesting abuse scenario happens when a customer uploads a list of email addresses, sends a campaign to that list, deletes the list, and then imports another list with different email addresses and sends another campaign. We call this scenario “deleting and replacing” and to prevent it we need to keep a history of contacts that have received emails in a billing cycle.

The naive solution

The naive solution would be to check against the history of delivered emails in a billing cycle. While that might work for smaller data sets, it causes a performance problem when dealing with billions of contacts. That is where the opportunity for using the bloom filter data structure emerges.

Initial bloom filter implementation

Initially, we used the C-implementation from bloomfilter-rb by building a bloom filter in-memory and uploading it to Amazon S3.

There were issues with this approach, the two most important ones being:

With the re-design, we need a solution that will solve these problems.

Bloom filter implementation

Our bloom filter will have as a dependency our User model. Let’s say the User model has three attributes: id, max_contacts and billing_cycle_started_at:

User = Struct.new(:id, :max_contacts, :billing_cycle_started_at)
user = User.new(1, 500, Time.new(2018, 8, 01, 10, 0, 0, 0))

Here is our bloom filter implementation:

require 'zlib'

class BloomFilter

  # http://www.igvita.com/2008/12/27/scalable-datasets-bloom-filters-in-ruby/
  # 10 bits for 1% error approximation
  # ~5 bits per 10 fold reduction in error approximation
  BITS_PER_ERROR_RATE = {
    1    => 10,
    0.1  => 15,
    0.01 => 20
  }
  HASH_FUNCTIONS_COEFICIENT = 0.7 # Math.log(2)

  attr_reader :error_rate

  def initialize(user, error_rate: )
    @user = user
    @error_rate = error_rate
  end

  def indexes_for(key)
    hash_functions.times.map { |i| Zlib.crc32("#{key.to_s.strip.downcase}:#{i+seed}") % size }
  end

  def hash_functions
    @hash_functions ||= (bits * HASH_FUNCTIONS_COEFICIENT).ceil.to_i
  end

  def seed
    @seed ||= since.to_i
  end

  def since
    @since ||= @user.billing_cycle_started_at
  end

  def size
    @size ||= bits * @user.max_contacts
  end

  def bits
    @bits ||= BITS_PER_ERROR_RATE.fetch(error_rate)
  end

  def fingerprint
    @fingerprint ||= [@user.id, @user.max_contacts, seed].join('.')
  end
end

The most important part of the bloom filter is the method that generates the indexes for a given key, indexes_for(key).

Here’s an example usage:

bloom_filter = BloomFilter.new(user, error_rate: 1)

bloom_filter.indexes_for('user1@example.com')
# [2872, 110, 3108, 2498, 4409, 751, 2861]

bloom_filter.indexes_for('user2@example.com')
# [3992, 2262, 1788, 1970, 3185, 4135, 4957]

As a hashing function we use CRC32 with a custom seed per user that is the billing_cycle_started_at and the number of hashing functions based on the error rate (in this example we use an error rate of 1%).

For the bloom filter to return consistent hashing indexes during a user’s billing cycle, the input parameters it depends on (error_rate, @user.billing_cycle_started_at and @user.max_contacts) should not change for the billing cycle until it gets reset. That is the fingerprint that, as we’ll see later, we’ll use as a redis key for the Redis bloom filter.

Redis bloom filter

Redis supports getbit and setbit operations for the String type that make the individual updates simple. There is a special data type for bloom filters called rebloom if you want to explore it, but here we’ll just use a standard Redis type.

Here is our Redis bloom filter implementation:

require 'redis'

class RedisBloomFilter
  MAX_TTL = 31 * 24 * 60 * 60 # max days in a month

  def initialize(user)
    @user = user
  end

  def insert(keys)
    existing_indexes = redis.pipelined do
      keys.each do |key|
        bloom.indexes_for(key).map { |index| redis.setbit(filter_key, index, 1) }
      end
    end

    new_keys_count = keys.length.times.count { |i|
      existing_indexes[i * bloom.hash_functions, bloom.hash_functions].include?(0)
    }

    total = redis.incrby(counter_key, new_keys_count)

    if total == new_keys_count
      redis.expire(filter_key, MAX_TTL.to_i)
      redis.expire(counter_key, MAX_TTL.to_i)
    end
  end

  def count
    redis.get(counter_key).to_i
  end

  def include?(key)
    values = redis.pipelined do
      bloom.indexes_for(key).map { |index| redis.getbit(filter_key, index) }
    end

    !values.include?(0)
  end

  def field
    redis.get(filter_key)
  end

  private

  def redis
    @redis ||= Redis.new
  end

  def bloom
    @bloom ||= BloomFilter.new(@user, error_rate: 1)
  end

  def filter_key
    @filter_key ||= "bloom:filter:#{key_suffix}"
  end

  def counter_key
    @counter_key ||= "bloom:counter:#{key_suffix}"
  end

  def key_suffix
    @key_suffix ||= bloom.fingerprint
  end
end

The RedisBloomFilter uses the BloomFilter implementation to produce the indexes that it manipulates in Redis. It also implements a counter of how many unique elements are added to the filter by increasing the count when it detects a unique insert. Using an error rate of 1% for the bloom filter means that the count can be for 1% lower than the actual count and in our case that is totally fine as we allow for a bigger grace overage to customer plans. It uses redis pipelined that sends operations in batch to avoid latency and improve performance by about 5-6 times. It also sets a TTLs on the keys to expire them after a month and it exposes the field for the in-memory filter.

Here’s an example usage:

redis_bloom_filter = RedisBloomFilter.new(user)

redis_bloom_filter.insert(['user1@example.com', 'user2@example.com'])

redis_bloom_filter.count
# => 2

redis_bloom_filter.include?('user1@example.com')
# => true

redis_bloom_filter.include?('user2@example.com')
# => true

redis_bloom_filter.include?('user3@example.com')
# => false

In-memory Bloom filter

With the Redis implementation we solved half of the problem. We have a way to concurrently and quickly add elements to the bloom filter in Redis, but we still need a way to check if a bloom filter could accept a given set of elements without actually inserting the elements in the filter. This is useful when we want to prevent a list import before importing the list or stop a campaign from sending before starting it.

To achieve that, we need an in-memory filter that we can initialize with the state of the Redis bloom filter and bitarray can help us with that. We have an important PR that changes the storage representation i.e. the bits order in bitarray to match the way Redis stores them internally and a way to initialize a bitarray with a given field. To test it, you can fetch the BitArray that includes that patch from here.

Here is the implementation of the in-memory bloom filter:

class TemporaryBloomFilter

  def initialize(user)
    @user = user
    @bloom = BloomFilter.new(@user, error_rate: 1)
    @redis_filter = RedisBloomFilter.new(@user)
    @count = @redis_filter.count
  end

  def count
    @count
  end

  def insert(keys)
    keys.each do |key|
      previous_indexes = @bloom.indexes_for(key).map { |index|
        value = bit_array[index]
        bit_array[index] = 1
        value
      }
      @count += 1 if previous_indexes.include?(0)
    end
  end

  def include?(key)
    !@bloom.indexes_for(key).map { |index| bit_array[index] }.include?(0)
  end

  def over_limit?
    plan_over_limit_count > 0
  end

  def plan_over_limit_count
    @count - @user.plan_contacts
  end

  private

  def bit_array
    @bit_array ||= prepare_bit_array
  end

  def prepare_bit_array
    field = @redis_filter.field.to_s
    current_field_length = field.length
    max_field_length = (@bloom.size / 8 + 1)

    if current_field_length < max_field_length
      field += "\0" * (max_field_length - current_field_length)
    end

    BitArray.new(@bloom.size, field)
  end
end

And an example usage:

temporary_bloom_filter = TemporaryBloomFilter.new(user)

temporary_bloom_filter.insert(['user3@example.com', 'user4@example.com', 'user5@example.com'])

temporary_bloom_filter.count
# => 5

temporary_bloom_filter.include?('user5@example.com')
# => true

temporary_bloom_filter.include?('user6@example.com')
# => false

Performance

Ruby’s in-memory implementation is few times slower than the C-implementation in bloomfilter-rb, but still fast enough as it can process 1 million items in 5-10 seconds both calculating hash functions and doing BitArray inserts.

total_items = 1_000_000
t1 = Time.now
bf = BloomFilter.new(user, error_rate: 1)
ba = BitArray.new(total_items)
total_items.times do |i|
  bf.indexes_for("user#{i}@example.com").each do |j|
    ba[j] = true
  end
end
t2 = Time.now
puts t2-t1

# => 7.485282645

Redis performance is pretty solid as well. It can handle around 70-80k operations per second and when using pipelined mode for our batches of 350, we get 5-6 times more operations:

$ redis-benchmark -q -n 100000 -P 350
PING_INLINE: 373134.31 requests per second
PING_BULK: 421940.94 requests per second
SET: 369003.69 requests per second
GET: 396825.38 requests per second
INCR: 344827.59 requests per second
LPUSH: 362318.84 requests per second
LPOP: 389105.06 requests per second
SADD: 353356.91 requests per second
SPOP: 361010.81 requests per second
LPUSH (needed to benchmark LRANGE): 370370.34 requests per second
LRANGE_100 (first 100 elements): 61050.06 requests per second
LRANGE_300 (first 300 elements): 17494.75 requests per second
LRANGE_500 (first 450 elements): 11043.62 requests per second
LRANGE_600 (first 600 elements): 7965.59 requests per second
MSET (10 keys): 202839.75 requests per second

Conclusion

This custom implementation of a bloom filter turned out pretty solid and robust in our production environment. We have a Kibana dashboard monitoring the bloom filter updates over time giving us much better insights than our previous implementation.


Author