Elasticsearch を最近メインで使うようになり強力なデータストアだなぁと実感する日々ですが、大量のドキュメントを検索し、ページング処理をしたい時に、ちょっと面倒くさいなと思ったので、ActiveRecord の find_each
メソッドのような使い勝手を enum_for
を使って再現してみました。
利用している Gem は elasticsearch-persistence
です。
ちなみに、Rails で Elasticsearch を気軽に使うための公式ライブラリはいくつかあります。
-
elasticsearch-model
: ActiveRecord のモデルとの統合をしてくれる -
elasticsearch-rails
: Rails インテグレーション系のサポート -
elasticsearch-persistence
: モデルやテーブル関係なく、Elasticsearch の永続層を提供してくれる
今回は、データは Elasticsearch のみにあるので、ActiveRecord のモデル統合や Rails インテグレーションは必要ないので elasticsearch-persistence
のみを利用しました。
Elasticsearch のページング
Elasticsearch では 1 クエリあたり最大 10,000 件のドキュメントを返します。それ以降は、ページング処理が必要になります。ページングの方法はいくつかありますが、今回はシンプルに search_after
を使います。もし検索時点でのデータ整合性を保ちたい場合は Point In Time API を利用するといいでしょう。
リポジトリクラスを作る
elasticsearch-persistence
gem は version6 より前は ActiveRecord と Repository パターンをサポートしていたようですが、今は Repository パターンのみをサポート推奨しているようです。
確かに永続層を提供してくれる elasticsearch-persistence
gem の場合、ActiveRecord と密になるよりは Repository パターンで切り出して、モデルは別クラスで作った方が使い勝手が良さそうです。
こんな感じで Repository クラスとモデルクラスを作ってあげるだけで、Elasticsearch のドキュメントを Artist クラスオブジェクトとして自動的に変換してくれます。便利ですね。Repository の詳しい扱い方については上記のブログ記事を参照してください。
class ArtistRepository
include Elasticsearch::Persistence::Repository
include Elasticsearch::Persistence::Repository::DSL
klass Artist
end
class Artist
attr_accessor :name
def to_hash
{
id: id,
name: @name
}
end
end
find_each 風にしてみる
前置きが長くなりましたが、ここからページング処理を find_each
風に扱えるようにします。いきなり実装コードです。
class ArtistRepository
include Elasticsearch::Persistence::Repository
include Elasticsearch::Persistence::Repository::DSL
klass Artist
BATCH_SIZE = 10000
def find_each(query:)
if block_given?
find_in_batches(query: query) do |records|
records.each { |record| yield record }
end
else
enum_for(:find_each, query: query)
end
end
def find_in_batches(query:)
unless block_given?
return to_enum(:find_in_batches, query: query)
end
search_after = {}
# ページングでデータがなくなるまで取り続ける
loop do
params = {
query: query,
size: BATCH_SIZE,
sort: [
{ name: 'asc' } # 名前順にソートしている
]
}.merge(search_after)
batch = search(params)
yield batch
break if batch.size < BATCH_SIZE
# 今取得したドキュメントの最後の名前を search_after パラメータに指定する
search_after = { search_after: [batch.last.email] }
end
end
end
ちょっと長くなりましたが、これでこのように使えます。
query = { bool: { must: { match: { foo: 'bar' } } } }
ArtistRepository.new.find_each(query: query) to |artist|
puts artist.name
end
もし、10000 件以上のドキュメントが存在していても、自動で search_after
パラメータを指定してページングをしてくれます。便利ですね。find_each
風のメソッドを作るのに yield
/ enum_for
/ enum_to
を使いました。Ruby はこの手の処理を簡単に書けるので楽ですね。
Elasticsearch 関係なくもっとシンプルなサンプルコードはこちらです
class Batches
BATCH_SIZE = 2
def initialize
@data = (1..10).to_a
end
def find_each
if block_given?
find_in_batches do |records|
records.each { |record| yield record }
end
else
enum_for(:find_each)
end
end
def find_in_batches
unless block_given?
return to_enum(:find_in_batches)
end
@start = 0
loop do
batch = @data[@start, BATCH_SIZE]
yield batch
@start += BATCH_SIZE
break if batch.size < BATCH_SIZE
end
end
end
# 直接イテレート
Batches.new.find_each do |record|
puts record
end
# Enumerator オブジェクト受け取ってからイテレート
result = Batches.new.find_each
result.each do |record|
puts record
end