例題: キャッシュつきの HTTP GET
例として ●指定された URL から HTML を取ってきて、画面に出力する。今回は簡単のために title のみ抽出して出力する ●同じ URL に何度も HTTP アクセスすると迷惑なので、一度取得したデータはメモリにキャッシュしておく というアプリケーションを実装してみようと思う。 このとき後者のメモリ上のキャッシュというのは明らかに﹁状態﹂なので、プロセスの出番である。URL と title のペアを保持する HashDict を持ったプロセスを作っておいて保持させる。ファーストステップ: HTTPクライアントを用意
まずは指定された URL から HTML を取ってきて title を抽出するモジュールを書こう。これがないと始まらない。defmodule WebArchive.Fetcher do
def fetch(url) do
HTTPotion.start
HTTPotion.get(url)
end
def process_response(%{status_code: 200, body: body}) do
body
|> to_string
end
def extract_title(url) do
[{"title", [], [title]}] = fetch(url)
|> process_response
|> Floki.find "title"
title
end
end
HTTP クライアントには HTTPotion、HTML の parse に Floki を使った。
iex> WebArchive.Fetcher.extract_title("https://twitter.com/")
"Twitterへようこそ - ログインまたは新規登録"
このように、extract_title/1
で twitter.com の title 文字列が返ってくる。
泥沼に構わず突っ込む
さて、このコードで特徴的なのは正常系のことしか気にしてないところである。
def process_response(%{status_code: 200, body: body}) do
body
|> to_string
end
例えば HTTP GET 後の処理は process_response/1
に渡すが、パターンマッチでステータスコード 200 の時しかみてない。従って 200 以外が返ってくると、FunctionClauseError でクラッシュする。
iex(5)> WebArchive.Fetcher.extract_title("http://example.com/404")
** (FunctionClauseError) no function clause matching in WebArchive.Fetcher.process_response/1
url
のバリデーションをしてないので、例えば :cat
とか URL 文字列ではない値を渡すと HTTPotion が例外を吐く。それから、取得したデータが HTML じゃなかったら Floki が何かしら例外を起こすかもしれないし、HTML を解析した結果 title 要素がなかった場合も例外が出るかもしれない。
・・・などなど考えれば異常系はいろいろ可能性としてあるし悩みは尽きないが、いいや!そういうことは考えない。泥水に構わず突っ込め、というのはそういうことだ。
そう、クラッシュしたら、Supervisor に起こして貰うんだ。
状態を保持するサーバーを作って Supervisor で監視
さて、このクライアントにキャッシュの機能をつけることを考えよう。 先にも述べた通りキャッシュは﹁状態﹂なので、状態を保持したサーバープロセスを作ることにする。iex(5)> WebArchive.Server.extract_title("https://twitter.com/") # HTTP アクセス
"Twitterへようこそ - ログインまたは新規登録"
iex(6)> WebArchive.Server.extract_title("https://twitter.com/") # キャッシュから返る
"Twitterへようこそ - ログインまたは新規登録"
iex(12)> WebArchive.Server.extract_title(:cat) # クラッシュ!
10:18:04.900 [error] GenServer WebArchive.Server terminating
Last message: {:extract_title, :cat}
State: #PID<0.153.0>
...
iex(13)> WebArchive.Server.extract_title("https://twitter.com/") # HTTP GET してしまう
"Twitterへようこそ - ログインまたは新規登録"
Supervison Tree でプロセスのライフサイクルを分ける
こういうときは状態を扱う部分をサーバーから分離してまた別のプロセスにする。ここでは Stash と名付けよう。異なるプロセスに分離することでライフサイクルを分けるんである。そして二つのプロセスはメッセージパッシングでやりとりをする。 そして、Supervisor も別に用意して親子関係を作り、監視させる。![4f82f5998c4d538b44579ddcab5b000e2b1137d4.png 4f82f5998c4d538b44579ddcab5b000e2b1137d4.png](https://qiita-image-store.s3.amazonaws.com/0/1834/5c69a9c6-81c0-6495-4fd1-89d02cf5ce58.png)
実装
では実装をみていく。Stash (Agent)
Stash は GenServer で作ってもよいが、HashDict を保持するだけの役割でタスクは持たない。OTP の中でも状態を扱うだけに特化したフレームワークである Agent を使うとより簡単に書ける。defmodule WebArchive.Stash do
def start_link do
Agent.start_link(fn -> HashDict.new end)
end
def save_title(pid, url, title) do
Agent.update pid, fn(dict) -> Dict.put(dict, url, title) end
end
def get_title(pid, url) do
Agent.get pid, fn(dict) -> dict[url] end
end
end
Server (GenServer)
Server には、先に作った Fetcher とこの Stash を使ってキャッシュ機能つきのextract_title/1
を実装する。
こいつはキャッシュ保持用の HashDict を Stash に追いやったしステートレス〜と言いたいところだが、実は Stash プロセスにメッセージパッシングするためその pid を初期化時に受け取り保持し続ける必要があるのでやっぱり状態があるのだった。
というわけで、(1) 何かしらのタスク、ここでは extract
_title/1
と (2) 状態の保持、(1) (2) を両方請け負うので、この実装には順当に GenServer を使う。Stash の pid は GenServer の状態保持の仕組みを使って引き回す。
defmodule WebArchive.Server do
use GenServer
def start_link(stash_pid) do
GenServer.start_link(__MODULE__, stash_pid, name: __MODULE__)
end
def extract_title(url) do
GenServer.call __MODULE__, {:extract_title, url}
end
def handle_call({:extract_title, url}, _from, stash_pid) do
{:reply, fetch_title(stash_pid, url), stash_pid}
end
defp fetch_title(stash_pid, url) do
case WebArchive.Stash.get_title(stash_pid, url) do
nil ->
title = WebArchive.Fetcher.extract_title(url)
:ok = WebArchive.Stash.save_title(stash_pid, url, title)
title
title -> title
end
end
end
fetch_title/2
の中で Fetcher と Stash を使って、Stash にあったらそれを返却なかったら HTTP GET、という処理をしている。もうちょい関数型っぽく書けそうなきもするが。
Supervisor
これで Stash と Server はできたので、Supervisor を用意する。実装は﹃Programming Elixir﹄に載ってるものを参考にした。 特に難しいことはやってないが、SubSupervisor が Stash の pid を知る必要がある都合上、Stash → SubSupervisor の順に起動しないといけない。そこで、普通はsupervise/3
の第一引数にモジュールを指定するところそこではやらず、明示的に start_child/3
を呼んでいる。
defmodule WebArchive.Supervisor do
use Supervisor
def start_link do
res = {:ok, sup} = Supervisor.start_link(__MODULE__, [])
{:ok, stash_pid} =
Supervisor.start_child(sup, worker(WebArchive.Stash, []))
Supervisor.start_child(sup, supervisor(WebArchive.SubSupervisor, [stash_pid]))
res
end
def init(_) do
supervise [], strategy: :one_for_one
end
end
サブの Supervisor の方は何の変哲もない。
defmodule WebArchive.SubSupervisor do
use Supervisor
def start_link(stash_pid) do
Supervisor.start_link(__MODULE__, stash_pid)
end
def init(stash_pid) do
supervise [ worker(WebArchive.Server, [stash_pid]) ], strategy: :one_for_one
end
end
Application
これで部品は揃ったので、アプリケーションのエントリポイントを書く。
defmodule WebArchive do
use Application
def start(_type, _args) do
WebArchive.Supervisor.start_link
end
end
親の Supervisor を起動するだけですな。
これで親 Supervisor → Stash → SubSupervisor → Server の順に起動する。完成。
iex(15)> WebArchive.Server.extract_title("https://twitter.com/")
"Twitterへようこそ - ログインまたは新規登録"
iex(16)> WebArchive.Server.extract_title(:cat)
** (exit) exited in: GenServer.call(WebArchive.Server, {:extract_title, :cat}, 5000)
** (EXIT) an exception was raised:
...
iex(17)> WebArchive.Server.extract_title("https://twitter.com/")
"Twitterへようこそ - ログインまたは新規登録"
クラッシュさせてもアプリケーションは終了せず、また、クラッシュ後の extract_title/1
もキャッシュから返ってくる。ほんとにそうなってるか知りたい場合は extract_title/1
で HTTP GET してくるところにデバッグメッセージを追加すれば良いだろう。
おまけ: Task の async/await で並行処理
せっかくキャッシュ付きで HTTP リクエストを飛ばす仕組みを作ったので、複数のサイトからまとめてタイトルを取ってくる API も作ってみよう。
iex> WebArchive.Server.extract_titles(["https://twitter.com/", "https://github.com/", "https://kaizenplatform.com/"])
["Twitterへようこそ - ログインまたは新規登録",
"GitHub · Build software better, together.",
"Kaizen Platform, Inc."]
Task
の async/await を使えば簡単に実装できる。
defmodule WebArchive.Server
use GenServer
...
def extract_titles(urls) when is_list(urls) do
urls
|> Enum.map(&(Task.async(fn -> extract_title(&1) end)))
|> Enum.map(&(Task.await/1))
end