Elasticsearch-rails

I give a general overview of my understanding of / experience with elasticsearch-rails, a ruby wrapper for indexing and searching data in elasticsearch.

Elasticsearch-logo-icon-lg

Overview of elasticsearch-rails

Elasticsearch is a search server based on Lucene. It provides a distributed, multitenant-capable full-text search engine with a RESTful web interface and schema-free JSON documents. Elasticsearch is developed in Java.

Apache Lucene is a free open source information retrieval software library, originally written in Java, but ported to numerous other languages. At the core of Lucene's logical architecture is the idea of a document containing fields of text. This flexibility allows Lucene's API to be independent of the file format. Text from PDFs, HTML, Microsoft Word, and OpenDocument documents, as well as many others (except images), can all be indexed as long as their textual information can be extracted.

Shay Banon, creator of elasticsearch and its predecessor, Compass, released the first version of Elasticsearch in February 2010, and since then is used by countless companies and has raised over $100mm from awesome investors (NEA, Benchmark, Index Ventures, etc).

[Indicative companies using elasticsearch]

General Architecture

Definitions: (source: elastticsearch glossary)

Cluser: A cluster consists of one or more nodes which share the same cluster name. Each cluster has a single master node which is chosen automatically by the cluster and which can be replaced if the current master node fails.

Node: A node is a running instance of elasticsearch which belongs to a cluster. Multiple nodes can be started on a single server for testing purposes, but usually you should have one node per server. At startup, a node will use unicast (or multicast, if specified) to discover an existing cluster with the same cluster name and will try to join that cluster.

Shard: A shard is a single Lucene instance. It is a low-level “worker” unit which is managed automatically by elasticsearch. An index is a logical namespace which points to primary and replica shards. Other than defining the number of primary and replica shards that an index should have, you never need to refer to shards directly. Instead, your code should deal only with an index. Elasticsearch distributes shards amongst all nodes in the cluster, and can move shards automatically from one node to another in the case of node failure, or the addition of new nodes.

Index: An index is like a database in a relational database. It has a mapping which defines multiple types. An index is a logical namespace which maps to one or more primary shards and can have zero or more replica shards.

mapping: A mapping is like a schema definition in a relational database. Each index has a mapping, which defines each type within the index, plus a number of index-wide settings. A mapping can either be defined explicitly, or it will be generated automatically when a document is indexed.

Setting up elasticsearch ( using homebrew):

brew install elasticsearch
brew info elasticsearch
<path-to-elasticsearch>/bin/elasticsearch
curl http://localhost:9200

Setting up ES with Rails

Gemfile.rb

gem 'elasticsearch-model'
gem 'elasticsearch-rails'


The client:

Elasticsearch-rails ultimately depends on Faraday as its HTTP client library through the lower-level elasticsearch-ruby gem. To initialize a rails elasticsearch client, supply the valid ES server address to Elastcsearch::Model client object in an initializer.

config/initializers/elasticsearch.rb

# urls are in elasticsearch.yml in config and loaded into rails environment
case Rails.env
  when 'staging'
    Elasticsearch::Model.client = Elasticsearch::Client.new({url: ENV['ES_STAGING_URL'], logs: true})
  when 'production'
    Elasticsearch::Model.client = Elasticsearch::Client.new({url: ENV['ES_PRODUCTION_URL'], logs: true})
  else
    Elasticsearch::Model.client = Elasticsearch::Client.new({url: 'http://localhost:9200', logs: true})
end

The Module

app/concerns/searchable.rb

module Searchable
  extend ActiveSupport::Concern

  included do
    include Elasticsearch::Model
    # include Elasticsearch::Model::Callbacks if @@callbackable

    index_name "#{self.table_name}_#{Rails.env}"
    document_type self.table_name

    def self.search(query, options = {})
      # Must override spree / ransack #search method
      __elasticsearch__.search(query, options)
    end

     # avoid re-using same method calls
    def index_document(options={})
      __elasticsearch__.index_document(options)
    end

    def delete_document(options={})
       __elasticsearch__.delete_document(options) rescue nil
    end

    def as_indexed_json(options={})
      klass = self.class
      es_attributes = klass.mappings.to_hash[klass.document_type.to_sym][:properties].keys
      attributes.symbolize_keys.pick(*es_attributes).as_json
    end   
 end
end

The Model

app/models/episode.rb

class Episode < ActiveRecord::Base
  include Searchable

    settings do
      mappings do
        indexes :title, boost => 5000, type: 'string'
        indexes :summary, :analyzer => 'snowball'
        indexes :published_at, :type => 'date', index: 'not_analyzed'
      end
    end

    after_save :update_or_remove_index

    def update_or_remove_index
       if visible then index_document else delete_document end rescue nil # rescue a deleted document if not indexed
    end
end

Boost: Process of enhancing the relevancy of a document or field.
analyzer: Specifies how to both break indexed (analyzed) fields when a document is indexed and process query strings (This gets into the nitty gritty details on lucene)

**Note: not_analyzed indexes are set when using exact matches

Indexer

lib/indexer.rb

class Indexer
  include Elasticsearch::Model

  class << self
    def perform(model)
      # controlling the 'knowledge' of what should be imported in separate class
      case model.table_name
        when Article.table_name
          Article.import force: true, scope: 'published'
        when Episode.table_name
          Episode.import force: true, scope: 'visible'
        else
          raise "Invalid argument, please pass a valid model (Aritcle or Episode)"
      end
    end
  end
end

Importing Data

Initially, the files must be mass imported. We put this into rake tasks wrapped within a begin rescue block for logging any potential errors. If a significant change happens on the mapping or as_indexed_json, updating the index calls a delete and import anyway, so the only way to update an index is to recreate it.

lib/tasks/elasticsearch.rake

namespace :elasticsearch do
  namespace :import do
    CLIENT = Elasticsearch::Model.client

    task :all => [:episodes, :articles]

    task :episodes => :environment do
      episode_task = Proc.new { Indexer.perform(Episode) }
      invoke_task &episode_task
      puts "Done importing Episodes"
    end

    task :articles => :environment do
      keyword_task = Proc.new { Indexer.perform(Article) }
      invoke_task &keyword_task
      puts "Done importing Keywords"
    end

    def invoke_task &block
      begin
        puts "starting import..."
        yield
        puts "Done successfully!"
      rescue Exception => e
        puts "Failure!"
        puts e.message, e.backtrace
        Rails.logger.info "#{e.message}", "#{e.backtrace}"
      end
    end
  end
end

Workflow Example

Episode.import force: true # force: true deletes and creates the index with new mapping
Episode.search("*")
[add to this]

Debugging Queries with Marvel

http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/_finding_exact_values.html
http://localhost:9200/_plugin/marvel/sense/index.html?load_from=http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/snippets/080_Structured_Search/05_Term_number.json

Searching

app/services/search_service.rb

class SearchService  
  class << self

    def search(query_or_payload, models=[Episode, Article], options={})
      models.map! { |model| classify(model) } 
      search_proc = Proc.new { Elasticsearch::Model.search(query_or_payload, models, {size: 100}) }
      invoke_search &search_proc
    end

    def find_all_published_at(date)
      raise 'Must pass a date argument' if not date.is_a?(Date) 

      query = {filter: { range: { published_at: { gte: date, lt: date + 1.day } } } }.to_json
      search_proc = Proc.new { Tt::Episode.search(query, {size: 100}) }
      response = invoke_search &search_proc
      if response.is_a?(Array) then [] else response.records.to_a end
    end

    private

    def classify(model)
      # trying to rescue string params as well as searching non-indexed models
      ar_model = if model.is_a?(Class) then model else find_by_table(model) end
      if ar_model.respond_to?(:__elasticsearch__) then ar_model else raise "This is not an indexed model" end
    end

    def invoke_search &block
      begin
        yield
      rescue Exception => e
        Rails.logger.error("ELASTICSEARCH ERROR: #{e.message}")
        Rails.logger.error(e.backtrace.join("\n"))
        return []
      end
    end

  end
end

Rspec Testing

# Gemfile
  group :test do
    gem 'elasticsearch-extensions'
  end
require 'elasticsearch/extensions/test/cluster/tasks'

RSpec.configure do |config|
  # Snipped other config.
  config.before :each, elasticsearch: true do
    Elasticsearch::Extensions::Test::Cluster.start(port: 9200) unless Elasticsearch::Extensions::Test::Cluster.running?
  end

  config.after :suite do
    Elasticsearch::Extensions::Test::Cluster.stop(port: 9200) if Elasticsearch::Extensions::Test::Cluster.running?
  end
end
describe 'Searching for a user', elasticsearch: true do
  before do
    # Create and destroy Elasticsearch indexes
    # between tests to eliminate test pollution
    User.__elasticsearch__.create_index! index: User.index_name

    # There are two options for how you create your objects
    # 1. Create your objects here and they should be synchronised 
    # through the Elasticsearch::Model callbacks
    User.create!
    # 2. Call import on the model which should reindex 
    # anything you've "let!"
    User.import

    # Sleeping here to allow Elasticsearch test cluster
    # to index the objects we created
    sleep 1
  end

  after do
    User.__elasticsearch__.client.indices.delete index: User.index_name
  end
end