読者です 読者をやめる 読者になる 読者になる

元フリーエンジニアライフ

Ruby on Rails とか MovableType とかAWSやってるフリーランスウェブエンジニアの記録でした。現在は法人成りしてIT社長。

Active Job meets Amazon SQS

この記事はRuby on Rails Advent Calendar 2014の11日目の記事です。

今日はなんとなくActive Jobについて知りたくなったので、少し試してみました。その記録です。

Active Job について

Rails 4.2 で導入される、なんだか早くから巷で名前がでてたやつです。

Active Jobの基礎 — Ruby on Rails Guides

2 Active Jobの目的

Active Jobの主要な目的は、Railsアプリを即席で作成した直後でも使用できる、自前のジョブ管理インフラを持つことです。これにより、Delayed JobとResqueなどのように、さまざまなジョブ実行機能のAPIの違いを気にせずにジョブフレームワーク機能やその他のgemを搭載することができるようになります。バックエンドでのキューイング作業では、操作方法以外のことを気にせずに済みます。さらに、ジョブ管理フレームワークを切り替える際にジョブを書き直さずに済みます。

というわけで、何度か案件で使ったことのある、Amazon SQSもActive Jobのバックエンドとして使えないか調べてみることにしました。

http://aws.amazon.com/jp/sqs/

Active Job meets Amazon SQS

f:id:uzuki05:20141211043124p:plain

ActiveJob::QueueAdapters によると、Active Jobは標準でも多くのキューイング・システムに対応しているみたいです。

しかし、残念ながら Amazon SQS は見当たりません。(もしかすると、どれかと組み合わせて使えるのかも知れませんが、今日時点で僕はそれを知りません)

QueueAdaptersのコードを覗いてみると、各アダプタはごく少ないコード量で実装されています。 なので、細かいことはさておき、とりあえずハックするだけなら簡単そうです。

https://github.com/rails/rails/tree/master/activejob/lib/active_job/queue_adapters

なければ作る、コードを読みつついじってれば理解も深まるというものでしょう。

2015/10/14 追記

現在はshoryukenというgemがあるのでそちらを使えばよさそうです。

phstc/shoryuken · GitHub

ActiveJob-SQS-Example

というわけでかんたんなサンプルアプリを作りました。

https://github.com/takeyuweb/ActiveJob-SQS-Example

  • Rails 4.2.0 pre2
  • ControllerからActive Jobを使って60秒後のメール送信を予約
    • Amazon SQSにキューイング
    • フォームから送信された文字列を引数として渡す
  • rakeコマンドでSQSから取りだして逐次処理
    • 引数を受け取り処理を継続(メールを送信)
  • mailcatcherでメールを受信

動作

http://127.0.0.1:3000/

フォーム送信すると、エンキューして「OK」表示。

Job

このときの development.log を見ると、Active Job の Sqsアダプタ によって登録されたことがわかります。

Started POST "/hello/world?method=post" for 192.168.112.2 at 2014-12-10 18:28:20 +0000
Processing by HelloController#world as HTML
  Parameters: {"utf8"=>"✓", "authenticity_token"=>"qS0/WcjCDWR29bmmbIQGfJZnq3twFBXZ3GONUFSIR/bH70wFhLzkjWwliILEFT/XGZcsvOlVk30TC6LOozij7Q==", "message"=>"ActiveJob", "commit"=>"World!!", "method"=>"post"}
[ActiveJob] Enqueued HelloJob (Job ID: b6b93e27-fb7b-435c-bcd8-18a599e00019) to Sqs(development_default) at 2014-12-10 18:29:20 UTC with arguments: "ActiveJob"
Redirected to http://127.0.0.1:3000/
Completed 302 Found in 587ms (ActiveRecord: 0.0ms)

しばらくすると、登録したジョブがワーカーによって実行されたことが記録されました。

[ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019] Performing HelloJob from Sqs(development_default) with arguments: "ActiveJob"
[ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019]   Rendered example_mailer/hello.text.erb within layouts/mailer (1.8ms)
[ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019]
ExampleMailer#hello: processed outbound mail in 189.8ms
[ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019]
Sent mail to test@takeyu-web.com (175.9ms)
[ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019] Date: Wed, 10 Dec 2014 18:29:21 +0000
From: from@example.com
To: test@takeyu-web.com
Message-ID: <54889101793f5_27123fe683d1b3d0973e4@localhost.localdomain.mail>
Subject: Hello
Mime-Version: 1.0
Content-Type: text/plain;
 charset=UTF-8
Content-Transfer-Encoding: 7bit

Hello, ActiveJob World!!

[ActiveJob] [HelloJob] [b6b93e27-fb7b-435c-bcd8-18a599e00019] Performed HelloJob from Sqs(development_default) in 391.23ms

タイムスタンプを見ると、確かに1分後に送信されていますね。

メールの方も、このように確かに届いていました。

MailCatcher

以下、コードの説明です。

アダプタの自作について

実装する

実際に仕事で使うなら、詳細な仕様を抑えたり、エラーハンドリングなどについて把握する必要があると思います。

とりあえず今回はジョブを登録&実行してみたいだけなので…

ジョブ登録部分

  • enqueue(job)
  • enqueue_at(job, timestamp)

がそれぞれ実行時刻の指定あるなしに対応するようなので、バックエンドへの登録処理を実装すればよいようです。

コード

https://github.com/takeyuweb/ActiveJob-SQS-Example/blob/master/lib/sqs_adapter.rb

require 'aws-sdk-core'

# message = params[:message]
# immediately = params[:immediately].present? ? true : false
# if immediately
#   HelloJob.perform_later(message) # enqueue(job)
# else
#   HelloJob.set(wait: 1.minute).perform_later(message) # enqueue(job, timestamp)
# end

class SqsAdapter
  class << self
    def enqueue(job)
      sqs.send_message(
          queue_url: get_queue_url(job),
          message_body: MultiJson.dump(job.serialize)
      )
    end

    def enqueue_at(job, timestamp)
      delay = timestamp.to_i - Time.current.to_i
      sqs.send_message(
          queue_url: get_queue_url(job),
          message_body: MultiJson.dump(job.serialize),
          delay_seconds: delay,
      )
    end

    def sqs
      Aws::SQS::Client.new
    end

    def get_queue_url(job)
      sqs.create_queue(queue_name: job.queue_name)[:queue_url]
    end
  end

  class JobWrapper
    class << self
      def perform(job_data)
        ActiveJob::Base.execute job_data
      end
    end
  end
end

ジョブ実行部分

バックエンドから受け取ったメッセージを ActiveJob::Base に渡すと、メッセージから対応するジョブクラスのインスタンスを生成し、#performを実行します。

ActiveJob::Base.execute(job_data) # HelloJob.perform(message)
コード

SQSからメッセージを取りだし、SqsAdapter::JobWrapper.perform へ渡すrakeタスクのサンプルです。

https://github.com/takeyuweb/ActiveJob-SQS-Example/blob/master/lib/tasks/sqs.rake

# ジョブ実行
# 実際にはエラーハンドリングなどが必要と思います。

namespace :sqs do
  namespace :worker do
    desc 'Starts a new sqs worker'
    task :start, ['queue_name', 'max_number_of_messages'] => :environment do |task, args|
      sqs = Aws::SQS::Client.new
      queue_name = [
          ActiveJob::Base.queue_name_prefix,
          args.queue_name.presence || ActiveJob::Base.default_queue_name,
      ].compact.join('_')
      queue_url = sqs.get_queue_url(queue_name: queue_name)[:queue_url]

      begin
        num = (args.max_number_of_messages.presence || 10).to_i
        while true do
          resp = sqs.receive_message(queue_url: queue_url,
                                     max_number_of_messages: num)
          resp[:messages].each do |message|
            job_data = MultiJson.load(message[:body])
            SqsAdapter::JobWrapper.perform(job_data)
            sqs.delete_message(queue_url: queue_url,
                               receipt_handle: message[:receipt_handle])
          end
          sleep 1
        end
      rescue SignalException
        # C-c
      end
    end
  end
end

アダプタを使うための指定

https://github.com/takeyuweb/ActiveJob-SQS-Example/blob/master/config/initializers/active_job.rb

ActiveJob::Base.queue_adapter = SqsAdapter

ActiveJob::Base.queue_adapterにアダプタクラスをそのまま渡しちゃってよいようです。

まとめ

  • 標準に含まれないバックエンドでも Active Job に対応させることはできる
  • 仕事で使うにはまだわからないこと
    • バックエンドへのジョブ登録・更新時に障害が発生するなどした際どうなる?
    • 完了せず再実行のときどうすれば? など…

以上、今日のざっとやってみた、でした。