ConduitとPersistentを使って高パフォーマンスなDB処理を目指す
Iteratee/Enumerator系の話題がHaskell界隈で騒がれて久しい (日本語の紹介記事)。これらはHaskellで細粒度のリソース制御ができるストリーム処理の方法で、例外処理もやりやすい、という触れ込みだった。曰く、ストリーム処理はリソースの消費量の点で(オンメモリに全てを置くよりも)優れているが、hGetContents::IO Handle->Stringのような遅延IOはファイルのクローズなどリソースの制御が難しい。それを改良したのがIteratee/Enumerator系のライブラリだそうだ。
Conduit
iteratee/enumeratorの改良を試みているのが Conduit (コンディット?)だ(日本語の紹介記事)。 最近でもずっと改良がつづいているようで(参考:tanakhさんの発表資料)、例えばSource/Conduit/Sinkの型がPipeに統一されたりと、概念的に整理されてきている段階なのかな、という印象を受ける。ConduitはWebフレームワークYesod (おそらく主流の一つ) で使われており、きっと今後も期待できるライブラリだ。
Persistent
Persistentは、Yesodアプリのバックエンド部分で使うライブラリだ。 オブジェクト指向言語におけるO/Rマッパー(…よりもシンプルで頑健かつ安全だと期待している…)のように、HaskellにおいてDBアクセスをシームレスに行うライブラリだ。
Persistentには selectSource という関数が用意されており、Conduitを用いたストリーム処理でDBの読み出しが書ける。
ごく簡単な例
ほぼPersistentの最初の例からの写しだけど、こんな感じに:
{-# LANGUAGE QuasiQuotes, TemplateHaskell, TypeFamilies, OverloadedStrings #-} {-# LANGUAGE GADTs, FlexibleContexts #-} import Database.Persist import Database.Persist.Sqlite import Database.Persist.TH import Control.Monad.IO.Class (liftIO) import System.IO (stdout) import Data.ByteString.Char8 (pack) import Data.Text.Internal (Text) import Data.Conduit import qualified Data.Conduit.Binary as CB share [mkPersist sqlSettings, mkMigrate "migrateAll"] [persist| Person name String age Int Maybe deriving Show |] type SqlSource v = Source (ResourceT (SqlPersist IO)) v conduit_test :: IO () conduit_test = withSqliteConn ":memory:" $ runSqlConn $ do runMigration migrateAll johnId <- insert $ Person "John Doe" $ Just 35 janeId <- insert $ Person "Jane Doe" Nothing runResourceT $ mapOutput (pack . (++"\n") . show) (selectSource [] [] :: SqlSource (Entity Person)) $$ CB.sinkHandle stdout return () main = conduit_test
[persist| … |]で囲まれた部分がDBのスキーマ記述にあたるDSLで、Haskellのデータ型に展開されると同時にsqliteのテーブルが作られる。 Template HaskellやQuasi-Quotation を使って実現されている。シンプルでDRYな感じに好感が持てる。
runResourceT から始まる行が Conduit部分だ。 $$ は Source と Sink を接続する演算子であり、接続された物体がResourceT型を持つモナドを、runResourceTで実行する.
Souceは、selectSource で Entity Person型のデータを無条件にselectしたものだ。
さらに、mapOutputで、Entity Person型のsourceからの出力を、ByteString型に変換している。
Sinkは、CB.sinkHandle という関数で、 ByteString型の入力を全てstdoutに流し込む。
いわゆるリストを使った遅延IOより少しだけ複雑だが、さらにモナド変換子を重ねるための抽象化のため、最初はとっつきにくい印象だ。しかし Source, Conduit, Sink の型がすべて Pipe i o m r 型 の型シノニムになっているため、じつのところ非常にすっきりしているように思う。
Pipe i -- 入力の型 o -- 出力の型 m -- 下にあるモナド (IOや ReaderT r IO など。上記では SqlPersist IOというモナド) r -- モナドの計算結果の型
Pipeはモナドになっているので、お馴染みのdo構文でsourceやsinkを合成できるのも良い感じだ。
例外処理の扱いなども興味深いがまたの機会に。
型エラーの怪
ところで、上の例を作ったとき、型エラーを解消するのにちょっと時間がかかってしまった。
pack . (++"\n") . show :: Show a => a -> ByteString
で、データベースから取り出したデータをshowしByteString型に変換しているが、最初に書いたときはここで、
(++"\n") . show
と、packを忘れてしまっていた。 もちろんこの型エラーはGHCによって報告された:
persisttest.hs:32:14: Couldn't match expected type `[Char]' with actual type `Data.ByteString.Internal.ByteString' (略)
しかし、この上の行には、何やら別の型エラー2つが出ていた。
persisttest.hs:23:16: No instance for (Control.Monad.Trans.Control.MonadBaseControl IO m0) arising from a use of `withSqliteConn' (略) persisttest.hs:27:15: No instance for (Control.Monad.Trans.Control.MonadBaseControl IO m1) arising from a use of `insert' (略)
通常、こういった型変数がらみの"No instance for ..."というエラーは,型注釈が足りていないために起こる。 (show . read) が良い例で、どの型クラスのインスタンスを選んでよいかわからないとコンパイラ様がお怒りになる。
しかし 上の ByteString と Char の型ミスマッチを解消すれば、この型エラーは消える。順番に withSqliteConn や insert に型注釈を加えていた私は釈然としない。
原因はたぶん、 型エラーのせいで 型が分からなくなった runResourceT の呼び出し部分に (fresh な) 型変数 m a を割り当てるためだ。
runSqlConn の引数もまた SqlPersist m a 型なので結局 型変数 mを具体化できず、インスタンスを特定できないのだ。
上の例では、型推論による型mの伝播方向が単方向である(Entity型が決まると、associated type ([persist|...|]の部分から生成される) によりモナドの型SqlPersist IOが決まり、それが全体に伝播する)。
このため、型を具体化している源流で型エラーが起こると、連鎖的に別の"No instance for"エラーが引き起こされるのだった。
結局、モナド変換子をあまり型変数で抽象化しまくるのも問題だということか。
しかしこの場合なら、ただ
runSqlConn :: SqlPersist IO a -> Connection -> IO a
としておくだけで余計な型エラーを抑制できる。場合によっては使えそうなテクニックかもしれない