Active Job meets Amazon SQS
この記事はRuby on Rails Advent Calendar 2014の11日目の記事です。
今日はなんとなくActive Jobについて知りたくなったので、少し試してみました。その記録です。
Active Job について
Rails 4.2 で導入される、なんだか早くから巷で名前がでてたやつです。
Active Jobの基礎 — Ruby on Rails Guides
2 Active Jobの目的
Active Jobの主要な目的は、Railsアプリを即席で作成した直後でも使用できる、自前のジョブ管理インフラを持つことです。これにより、Delayed JobとResqueなどのように、さまざまなジョブ実行機能のAPIの違いを気にせずにジョブフレームワーク機能やその他のgemを搭載することができるようになります。バックエンドでのキューイング作業では、操作方法以外のことを気にせずに済みます。さらに、ジョブ管理フレームワークを切り替える際にジョブを書き直さずに済みます。
というわけで、何度か案件で使ったことのある、Amazon SQSもActive Jobのバックエンドとして使えないか調べてみることにしました。
Active Job meets Amazon SQS
ActiveJob::QueueAdapters によると、Active Jobは標準でも多くのキューイング・システムに対応しているみたいです。
しかし、残念ながら Amazon SQS は見当たりません。(もしかすると、どれかと組み合わせて使えるのかも知れませんが、今日時点で僕はそれを知りません)
QueueAdaptersのコードを覗いてみると、各アダプタはごく少ないコード量で実装されています。 なので、細かいことはさておき、とりあえずハックするだけなら簡単そうです。
https://github.com/rails/rails/tree/master/activejob/lib/active_job/queue_adapters
なければ作る、コードを読みつついじってれば理解も深まるというものでしょう。
2015/10/14 追記
現在はshoryukenというgemがあるのでそちらを使えばよさそうです。
ActiveJob-SQS-Example
というわけでかんたんなサンプルアプリを作りました。
https://github.com/takeyuweb/ActiveJob-SQS-Example
- Rails 4.2.0 pre2
- ControllerからActive Jobを使って60秒後のメール送信を予約
- Amazon SQSにキューイング
- フォームから送信された文字列を引数として渡す
- rakeコマンドでSQSから取りだして逐次処理
- 引数を受け取り処理を継続(メールを送信)
- mailcatcherでメールを受信
動作
フォーム送信すると、エンキューして「OK」表示。
このときの development.log を見ると、Active Job の Sqsアダプタ によって登録されたことがわかります。
Started POST "/hello/world?method=post" for 192.168.112.2 at 2014-12-10 18:28:20 +0000 Processing by HelloController#world as HTML Parameters: {"utf8"=>"✓", "authenticity_token"=>"qS0/WcjCDWR29bmmbIQGfJZnq3twFBXZ3GONUFSIR/bH70wFhLzkjWwliILEFT/XGZcsvOlVk30TC6LOozij7Q==", "message"=>"ActiveJob", "commit"=>"World!!", "method"=>"post"} [ActiveJob] Enqueued HelloJob (Job ID: b6b93e27-fb7b-435c-bcd8-18a599e00019) to Sqs(development_default) at 2014-12-10 18:29:20 UTC with arguments: "ActiveJob" Redirected to http://127.0.0.1:3000/ Completed 302 Found in 587ms (ActiveRecord: 0.0ms)
しばらくすると、登録したジョブがワーカーによって実行されたことが記録されました。
[ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019] Performing HelloJob from Sqs(development_default) with arguments: "ActiveJob" [ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019] Rendered example_mailer/hello.text.erb within layouts/mailer (1.8ms) [ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019] ExampleMailer#hello: processed outbound mail in 189.8ms [ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019] Sent mail to test@takeyu-web.com (175.9ms) [ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019] Date: Wed, 10 Dec 2014 18:29:21 +0000 From: from@example.com To: test@takeyu-web.com Message-ID: <54889101793f5_27123fe683d1b3d0973e4@localhost.localdomain.mail> Subject: Hello Mime-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 7bit Hello, ActiveJob World!! [ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019] Performed HelloJob from Sqs(development_default) in 391.23ms
タイムスタンプを見ると、確かに1分後に送信されていますね。
メールの方も、このように確かに届いていました。
以下、コードの説明です。
アダプタの自作について
実装する
実際に仕事で使うなら、詳細な仕様を抑えたり、エラーハンドリングなどについて把握する必要があると思います。
とりあえず今回はジョブを登録&実行してみたいだけなので…
ジョブ登録部分
- enqueue(job)
- enqueue_at(job, timestamp)
がそれぞれ実行時刻の指定あるなしに対応するようなので、バックエンドへの登録処理を実装すればよいようです。
コード
https://github.com/takeyuweb/ActiveJob-SQS-Example/blob/master/lib/sqs_adapter.rb
require 'aws-sdk-core' # message = params[:message] # immediately = params[:immediately].present? ? true : false # if immediately # HelloJob.perform_later(message) # enqueue(job) # else # HelloJob.set(wait: 1.minute).perform_later(message) # enqueue(job, timestamp) # end class SqsAdapter class << self def enqueue(job) sqs.send_message( queue_url: get_queue_url(job), message_body: MultiJson.dump(job.serialize) ) end def enqueue_at(job, timestamp) delay = timestamp.to_i - Time.current.to_i sqs.send_message( queue_url: get_queue_url(job), message_body: MultiJson.dump(job.serialize), delay_seconds: delay, ) end def sqs Aws::SQS::Client.new end def get_queue_url(job) sqs.create_queue(queue_name: job.queue_name)[:queue_url] end end class JobWrapper class << self def perform(job_data) ActiveJob::Base.execute job_data end end end end
ジョブ実行部分
バックエンドから受け取ったメッセージを ActiveJob::Base に渡すと、メッセージから対応するジョブクラスのインスタンスを生成し、#perform
を実行します。
ActiveJob::Base.execute(job_data) # HelloJob.perform(message)
コード
SQSからメッセージを取りだし、SqsAdapter::JobWrapper.perform へ渡すrakeタスクのサンプルです。
https://github.com/takeyuweb/ActiveJob-SQS-Example/blob/master/lib/tasks/sqs.rake
# ジョブ実行 # 実際にはエラーハンドリングなどが必要と思います。 namespace :sqs do namespace :worker do desc 'Starts a new sqs worker' task :start, ['queue_name', 'max_number_of_messages'] => :environment do |task, args| sqs = Aws::SQS::Client.new queue_name = [ ActiveJob::Base.queue_name_prefix, args.queue_name.presence || ActiveJob::Base.default_queue_name, ].compact.join('_') queue_url = sqs.get_queue_url(queue_name: queue_name)[:queue_url] begin num = (args.max_number_of_messages.presence || 10).to_i while true do resp = sqs.receive_message(queue_url: queue_url, max_number_of_messages: num) resp[:messages].each do |message| job_data = MultiJson.load(message[:body]) SqsAdapter::JobWrapper.perform(job_data) sqs.delete_message(queue_url: queue_url, receipt_handle: message[:receipt_handle]) end sleep 1 end rescue SignalException # C-c end end end end
アダプタを使うための指定
https://github.com/takeyuweb/ActiveJob-SQS-Example/blob/master/config/initializers/active_job.rb
ActiveJob::Base.queue_adapter = SqsAdapter
ActiveJob::Base.queue_adapter
にアダプタクラスをそのまま渡しちゃってよいようです。
まとめ
- 標準に含まれないバックエンドでも Active Job に対応させることはできる
- 仕事で使うにはまだわからないこと
- バックエンドへのジョブ登録・更新時に障害が発生するなどした際どうなる?
- 完了せず再実行のときどうすれば? など…
以上、今日のざっとやってみた、でした。