まっしろけっけ

めもてきなやーつ

Sidekiq について基本と1年半運用してのあれこれ

はじめに

実際に運用していた時に非同期にしていた主な処理は下記のようなものがあります。

  • iOS Android の push 通知の送信処理
  • ログの作成
  • 様々な外部 API の呼び出し
  • 非同期で更新しても問題ないデータの更新

Sidekiq is なに

sidekiqは非同期処理を実現する gem
他にも Ruby で非同期処理を実現できる有名な gem には
resque や delayed_job 等がある。

sidekiq.org

Enterprise版等もありますが、
今回はOSS版を使用している前提でのお話しです。

他の非同期処理が可能な gem との簡単な比較

FAQ · mperham/sidekiq Wiki · GitHub
この内容は結構真実を語っていることを最近知った

Sidekiq
  • Redis
  • マルチスレッド
  • リトライ処理あり
  • おしゃれなダッシュボード
Resque
  • Redis
  • ジョブごとにフォーク
  • リトライ処理なし
  • Sidekiqに比べると簡素なダッシュボード
Delayed Job
  • DB(専用テーブル作成)
  • リトライ処理あり
  • delay method が便利
  • 基本的にDBを使うので導入簡単

実装例

Redis は既にセットアップ済みとする

sidekiq を追加

$ vi Gemfile
+ gem 'sidekiq'

$ bundle install

config を追加

$ vi config/sidekiq.yml
:verbose: true
:pidfile: ./tmp/pids/sidekiq.pid
:logfile: ./log/sidekiq.log
:concurrency: 10 # worker process 数
:queues: # 処理するキュー名
  - default
  - user

env による設定の変更方法

production:
  :concurrency: 25
staging:
  :concurrency: 15

下記のように記述することで priority が設定できる

:queues:
  - [user, 2]
  - [default, 1]

worker file を作成

# UserWorker を作成
$ bundle exec rails g sidekiq:worker User
      create  app/workers/user_worker.rb
      create  test/workers/user_worker_test.rb

user_worker を編集
※今回は例としてわかりやすい処理にしています。

$ vim app/workers/user_worker.rb
- def perform(*args)
- # Do something
+ sidekiq_options queue: :user # キュー名指定がない場合は default になる
+ def perform(id, name)
+   user = User.find(id)
+   user.update(name: name)

キューを積む

$ bundle exec rails c
irb> UserWorker.perform_async(1, "test")
=> "f212bd8bae56c79467494ec8"
irb> User.find(1) # この状態ではまだ record が更新されていない
  User Load (0.3ms)  SELECT  `users`.* FROM `users` WHERE `users`.`id` = 1 LIMIT 1
=> #<User id: 1, name: "1", created_at: "2015-10-11 13:13:39", updated_at: "2015-10-11 13:13:50">

Sidekiq を起動する

$ bundle exec sidekiq -C config/sidekiq.yml -d
# ちょっと時間をおいて
$ bundle exec rails c
irb> User.find(1) # name が更新されていることが確認できる
  User Load (0.3ms)  SELECT  `users`.* FROM `users` WHERE `users`.`id` = 1 LIMIT 1
=> #<User id: 1, name: "test", created_at: "2015-10-11 13:13:39", updated_at: "2015-10-11 13:17:14">

これで簡単な実装例はおしまいです。

Sidekiq の enqueue の解説

Sidekiq が Redis にどのように queue を追加するかを解説する。
Sidekiq は使う method によって使う Redis のデータ型が違う

perform_asyncの場合

「セット型」と「リスト型」を使う。

1.セット型で queues という key で各ワーカーのキュー名を member として登録する
(実装例の場合は member に user を指定する)

2.リストの先頭に queue:キュー名 をキーにして string を追加する。
(実装例の場合は queue:user というキー名になる)

3.string は { class: worker class 名, arg: perform_asyncの引数配列, retry: retry機能が有効かのフラグ, queue: キュー名, jid: SecureRandom.hex(12) で生成された job id, created_at: 作成日, enqueued_at: キューに追加した時間 } この Hash を json にしたもの

以下が実装例での string の例

# Hash
{ class: "UserWorker", args: [1, "test"], retry: true, queue: "user", jid: 6284ea2d3637756fc121b8f7, created_at: 1444571406.7752862, enqueued_at: 1444571406.7754705 }

# Json化
"{\"class\":\"UserWorker\",\"args\":[1,\"test\"],\"retry\":true,\"queue\":\"user\",\"jid\":\"6284ea2d3637756fc121b8f7\",\"created_at\":1444571406.7752862,\"enqueued_at\":1444571406.7754705}"

perform_in の場合

「ソート済みセット型」を使う

1. ソート済みセット型で schedule という key で score に 実行時間を指定して member を登録する

2. member は { class: worker class 名, arg: perform_asyncの引数配列, retry: retry機能が有効かのフラグ, queue: キュー名, jid: SecureRandom.hex(12) で生成された job id, created_at: 作成日 } この Hash を json にしたもの

以下が実装例での member の例

# Hash
{ class: "UserWorker", args: [1, "test"], retry: true, queue: "user", jid: 6284ea2d3637756fc121b8f7, created_at: 1444571406.7752862 }

# Json化
"{\"class\":\"UserWorker\",\"args\":[1,\"test\"],\"retry\":true,\"queue\":\"user\",\"jid\":\"6284ea2d3637756fc121b8f7\",\"created_at\":1444571406.7752862}"

Redis について

Sidekiq を使う上で欠かせない Redis ですが、今回の記事で Redis のことまで深く説明すると
だいぶ長い記事になるので別記事として後日公開します。

工夫した点

ここからが本題の知見と言ってもいいかなという部分になります。

1. Redis の構成

当時開発していた Rails のアプリケーションでは、
session, sidekiq, cache に Redis を使用していました。
それぞれ別々の Redis にデータを保存していました。

別々にした理由は下記

  • Redis はオンメモリ型のKVSなので容量的な制約が大きい(実メモリの約半分程度)
  • 複数の用途で使い容量が設定によっては消えて欲しくないものが消える危険
  • 1つの Redis で運用をし後々分割しようと思った際にデータを分けるのが面倒
構成

レプリケーションを組み,Redis Sentinelで自動フェイルオーバーするように設定を行う。
アプリケーション側は下記の記事で書きましたが, gem の redis-sentinel を導入

shiro-16.hatenablog.com

redis-sentinel を導入すると Rails は Redis Sentinel から現在の master 情報を取得し接続、
障害により接続に失敗した場合に 「Redis Sentinel から master 情報を取得し接続」を繰り返し
master が切り替わったタイミングで正常に接続ができるようになるという感じ

2. 引数は出来るだけ少なく

perform_async 等の引数は出来るだけ少なくしましょうという話

例として Twitter でファボられた時に push を送る処理を実装
(実際には1ユーザに複数のtokenが紐づく可能性がある)

# ダメな例
# token: device_token or registration_id
# message: push 内容
def perform(token, message)
   FavoritePush.send(token, message)
end

# 良い例
# id: favorite情報を保存している record の id
def perform(id)
  favorite = Favorite.find(id)
  message = FavoritePush.message(favorite)
  FavoritePush.send(favorite.tweet.user.token, message)
end
何が良いのか?

Redis 
 record ( token  push )
console id
SQL primary key  index 使
DB Slave  Redis primary key  index 使ry
13()

 

3. 判定処理等を worker 側に持たせる

今回は2の例に push を送信するかどうかを判定する処理を追加する

# ダメな例
def perform(token, message)
  FavoritePush.send(token, message)
end
# キューを積む際に
# PushWorker.perform_async(favorite.tweet.user.token, FavoritePush.message(favorite)) if favorite.tweet.user.send_push?


# 良い例
def perform(id)
  favorite = Favorite.find(id)
  return unless favorite.tweet.user.send_push?
  message = FavoritePush.message(favorite)
  FavoritePush.send(favorite.tweet.user.token, message)
end
何が良いのか?

今回の場合はシンプルな判定だが、もっと複雑な判定を行う場面は多い
そうなった場合に特に有効

・キューを積む際に判定するより同期処理側の処理速度は速くなる
・今回の場合は message を生成する処理も worker 側で行うので同期処理側の処理速度は速くなる
・console 等からキューを積む場合に判定内容を気にしなくて良くなる

※ Redis に積むキューの数自体は増えてしまうので Redis の使用量は増えてしまいます

4. retry 機能に注意する


Sidekiq  retry 
push 
def perform(id)
  favorite = Favorite.find(id)
  return unless favorite.tweet.user.send_push?
  message = FavoritePush.message(favorite)
  FavoritePush.send(favorite.tweet.user.token, message)
  favorite.update(send_push: true)
end

retry  favorite.update 
retry 
 favorite.update  push 

 update  worker  push 

 retry  off 

5. record が削除されている場合も考慮する

Sidekiq だけの話ではないですが、
非同期処理なので処理が実行される際には既に対象の record が削除されている可能性も考えなければなりません。
4.の retry の話を念頭に置いて ActiveRecord::RecordNotFound 等の例外を上手く処理する必要があります。

6. batch 


3. worker 
batch  record 
 worker 

) batch 
# ダメな例
User.find_each do |user|
  # 複雑な処理

  UserWorker.perform_async(user.id, result)
end
class UserWorker
  def perform(id, result)
    user = User.find(id)
    user.update(result: result)
  end
end

# いい例
User.select(:id).find_each do |user|
  UserWorker.perform_async(user.id)
end

class UserWorker
  def perform(id)
    user = User.find(id)
    # 複雑な処理

    user.update(result: result)
  end
end
良い点

・時間がかかる複雑な処理をマルチスレッドで行うことで処理速度が上がる

deploy

deployに関しては下記のページを参考にするのが良さそう
Deployment · mperham/sidekiq Wiki · GitHub

その他

Best Practices · mperham/sidekiq Wiki · GitHub
ここら辺も合わせて読むと良いかと

最後に




 Sidekiq 使
 Sidekiq  Delayed Job 使




沿

Sidekiq  Redis 

 Sidekiq