元フリーエンジニアライフ

Ruby on Rails とか MovableType とかAWSやってるフリーランスウェブエンジニアの記録でした。現在は法人成りしてIT社長。

Active Job meets Amazon SQS

この記事はRuby on Rails Advent Calendar 2014の11日目の記事です。

今日はなんとなくActive Jobについて知りたくなったので、少し試してみました。その記録です。

Active Job について

Rails 4.2 で導入される、なんだか早くから巷で名前がでてたやつです。

Active Jobの基礎 — Ruby on Rails Guides

2 Active Jobの目的

Active Jobの主要な目的は、Railsアプリを即席で作成した直後でも使用できる、自前のジョブ管理インフラを持つことです。これにより、Delayed JobとResqueなどのように、さまざまなジョブ実行機能のAPIの違いを気にせずにジョブフレームワーク機能やその他のgemを搭載することができるようになります。バックエンドでのキューイング作業では、操作方法以外のことを気にせずに済みます。さらに、ジョブ管理フレームワークを切り替える際にジョブを書き直さずに済みます。

というわけで、何度か案件で使ったことのある、Amazon SQSもActive Jobのバックエンドとして使えないか調べてみることにしました。

http://aws.amazon.com/jp/sqs/

Active Job meets Amazon SQS

f:id:uzuki05:20141211043124p:plain

ActiveJob::QueueAdapters によると、Active Jobは標準でも多くのキューイング・システムに対応しているみたいです。

しかし、残念ながら Amazon SQS は見当たりません。(もしかすると、どれかと組み合わせて使えるのかも知れませんが、今日時点で僕はそれを知りません)

QueueAdaptersのコードを覗いてみると、各アダプタはごく少ないコード量で実装されています。 なので、細かいことはさておき、とりあえずハックするだけなら簡単そうです。

https://github.com/rails/rails/tree/master/activejob/lib/active_job/queue_adapters

なければ作る、コードを読みつついじってれば理解も深まるというものでしょう。

2015/10/14 追記

現在はshoryukenというgemがあるのでそちらを使えばよさそうです。

phstc/shoryuken · GitHub

ActiveJob-SQS-Example

というわけでかんたんなサンプルアプリを作りました。

https://github.com/takeyuweb/ActiveJob-SQS-Example

  • Rails 4.2.0 pre2
  • ControllerからActive Jobを使って60秒後のメール送信を予約
    • Amazon SQSにキューイング
    • フォームから送信された文字列を引数として渡す
  • rakeコマンドでSQSから取りだして逐次処理
    • 引数を受け取り処理を継続(メールを送信)
  • mailcatcherでメールを受信

動作

http://127.0.0.1:3000/

フォーム送信すると、エンキューして「OK」表示。

Job

このときの development.log を見ると、Active Job の Sqsアダプタ によって登録されたことがわかります。

Started POST "/hello/world?method=post" for 192.168.112.2 at 2014-12-10 18:28:20 +0000
Processing by HelloController#world as HTML
  Parameters: {"utf8"=>"✓", "authenticity_token"=>"qS0/WcjCDWR29bmmbIQGfJZnq3twFBXZ3GONUFSIR/bH70wFhLzkjWwliILEFT/XGZcsvOlVk30TC6LOozij7Q==", "message"=>"ActiveJob", "commit"=>"World!!", "method"=>"post"}
[ActiveJob] Enqueued HelloJob (Job ID: b6b93e27-fb7b-435c-bcd8-18a599e00019) to Sqs(development_default) at 2014-12-10 18:29:20 UTC with arguments: "ActiveJob"
Redirected to http://127.0.0.1:3000/
Completed 302 Found in 587ms (ActiveRecord: 0.0ms)

しばらくすると、登録したジョブがワーカーによって実行されたことが記録されました。

[ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019] Performing HelloJob from Sqs(development_default) with arguments: "ActiveJob"
[ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019]   Rendered example_mailer/hello.text.erb within layouts/mailer (1.8ms)
[ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019]
ExampleMailer#hello: processed outbound mail in 189.8ms
[ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019]
Sent mail to test@takeyu-web.com (175.9ms)
[ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019] Date: Wed, 10 Dec 2014 18:29:21 +0000
From: from@example.com
To: test@takeyu-web.com
Message-ID: <54889101793f5_27123fe683d1b3d0973e4@localhost.localdomain.mail>
Subject: Hello
Mime-Version: 1.0
Content-Type: text/plain;
 charset=UTF-8
Content-Transfer-Encoding: 7bit

Hello, ActiveJob World!!

[ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019] Performed HelloJob from Sqs(development_default) in 391.23ms

タイムスタンプを見ると、確かに1分後に送信されていますね。

メールの方も、このように確かに届いていました。

MailCatcher

以下、コードの説明です。

アダプタの自作について

実装する

実際に仕事で使うなら、詳細な仕様を抑えたり、エラーハンドリングなどについて把握する必要があると思います。

とりあえず今回はジョブを登録&実行してみたいだけなので…

ジョブ登録部分

  • enqueue(job)
  • enqueue_at(job, timestamp)

がそれぞれ実行時刻の指定あるなしに対応するようなので、バックエンドへの登録処理を実装すればよいようです。

コード

https://github.com/takeyuweb/ActiveJob-SQS-Example/blob/master/lib/sqs_adapter.rb

require 'aws-sdk-core'

# message = params[:message]
# immediately = params[:immediately].present? ? true : false
# if immediately
#   HelloJob.perform_later(message) # enqueue(job)
# else
#   HelloJob.set(wait: 1.minute).perform_later(message) # enqueue(job, timestamp)
# end

class SqsAdapter
  class << self
    def enqueue(job)
      sqs.send_message(
          queue_url: get_queue_url(job),
          message_body: MultiJson.dump(job.serialize)
      )
    end

    def enqueue_at(job, timestamp)
      delay = timestamp.to_i - Time.current.to_i
      sqs.send_message(
          queue_url: get_queue_url(job),
          message_body: MultiJson.dump(job.serialize),
          delay_seconds: delay,
      )
    end

    def sqs
      Aws::SQS::Client.new
    end

    def get_queue_url(job)
      sqs.create_queue(queue_name: job.queue_name)[:queue_url]
    end
  end

  class JobWrapper
    class << self
      def perform(job_data)
        ActiveJob::Base.execute job_data
      end
    end
  end
end

ジョブ実行部分

バックエンドから受け取ったメッセージを ActiveJob::Base に渡すと、メッセージから対応するジョブクラスのインスタンスを生成し、#performを実行します。

ActiveJob::Base.execute(job_data) # HelloJob.perform(message)
コード

SQSからメッセージを取りだし、SqsAdapter::JobWrapper.perform へ渡すrakeタスクのサンプルです。

https://github.com/takeyuweb/ActiveJob-SQS-Example/blob/master/lib/tasks/sqs.rake

# ジョブ実行
# 実際にはエラーハンドリングなどが必要と思います。

namespace :sqs do
  namespace :worker do
    desc 'Starts a new sqs worker'
    task :start, ['queue_name', 'max_number_of_messages'] => :environment do |task, args|
      sqs = Aws::SQS::Client.new
      queue_name = [
          ActiveJob::Base.queue_name_prefix,
          args.queue_name.presence || ActiveJob::Base.default_queue_name,
      ].compact.join('_')
      queue_url = sqs.get_queue_url(queue_name: queue_name)[:queue_url]

      begin
        num = (args.max_number_of_messages.presence || 10).to_i
        while true do
          resp = sqs.receive_message(queue_url: queue_url,
                                     max_number_of_messages: num)
          resp[:messages].each do |message|
            job_data = MultiJson.load(message[:body])
            SqsAdapter::JobWrapper.perform(job_data)
            sqs.delete_message(queue_url: queue_url,
                               receipt_handle: message[:receipt_handle])
          end
          sleep 1
        end
      rescue SignalException
        # C-c
      end
    end
  end
end

アダプタを使うための指定

https://github.com/takeyuweb/ActiveJob-SQS-Example/blob/master/config/initializers/active_job.rb

ActiveJob::Base.queue_adapter = SqsAdapter

ActiveJob::Base.queue_adapterにアダプタクラスをそのまま渡しちゃってよいようです。

まとめ

  • 標準に含まれないバックエンドでも Active Job に対応させることはできる
  • 仕事で使うにはまだわからないこと
    • バックエンドへのジョブ登録・更新時に障害が発生するなどした際どうなる?
    • 完了せず再実行のときどうすれば? など…

以上、今日のざっとやってみた、でした。