ScalaでAkka Streamを使ってでかいデータをいい感じに処理する

Akka Streamなんて、大規模データ処理でしか使うこと無いんだろうなーと思いきや結構使う機会があるので書いてみる。

例えば

私はSynologyのNASを使ってるんですけど、撮りためたデジカメの写真をGoogleフォトにアップロードしたいと思ってたんですね。自然言語で画像を検索できますし、万が一のバックアップの目的もある(リサイズされるとは言えど)。しかし、どうもSynologyのSyncというクラウドストレージとの同期アプリはGoogleフォトに対応していないらしく、しかもデジカメのデータって何十GBとあるので、何も考えずボケーっとアップロードしてるとプロバイダから怒られるんですよ(怒られました)。

だから仕方ないのでローカルでGoogleフォトが無劣化で保存できる程度に画像をシュリンクさせとこうと思ったわけです。するどアップロードする総量が減るからね。そんで、

  • 指定されたディレクトリからJPGを再帰的に取得し
  • 指定したサイズよりも大きければリサイズし
  • EXIFから撮影日時を取得して
  • ファイル名のケツに撮影日時を付与して(ファイル名が重ならないように)指定したディレクトリに保存

というちょっとしたプログラムをScalaで書いてました。

問題

Scalaって簡単にラムダ式をチェーンで組み合わせていい感じに処理できますよね。Better filesと組み合わせればこんな感じですよ。

def traverse(src: String, dest: String) = {
    val srcDir = File(src)
    val destDir = File(dest)

    srcDir
      .listRecursively
      .filter(_.extension.map(_.toLowerCase()).contains(".jpg"))
      .foreach(resizeAndCopy(_, destDir))
}

resizeAndCopyの実装は今回の話題とはあんまり関係ないので省略。

でもこれだとシングルスレッドでしか処理してくれないので、私のマシンがRyzen 1700で8コア16スレッドあっても1/16くらいのCPUリソースしか使ってくれません。これは悲しい。宝の持ち腐れ。なにより私はすべてのCPUコアをフルロードさせるのが三度の飯よりも好きな人間ですからね。だからこうする。

def traverse(src: String, dest: String) = {
    val srcDir = File(src)
    val destDir = File(dest)

    srcDir
      .listRecursively
      .filter(_.extension.map(_.toLowerCase()).contains(".jpg"))
      .toList
      .toPar
      .foreach(resizeAndCopy(_, destDir))
}

toParって書くだけで並列コレクションになるとかいい時代になったもんだ…。と思いきや、このコードは場合によってはOutOfMemoryで死にます。かつ、最初のコードもそうなのですが、最初の一個が出てくるまで時間がかかります。この二つの問題は、toListで一回リストを作っているからです。膨大な量のFileをリストで保持してるのが原因です。でもIterator(上掲コードのfilterの戻り値の型)はtoParを持ってないので、List以外でtoParできるもの何か無いのか…と適当にIDEでメソッドを探すと、おっ、toStreamに変換してすればtoPar使えるからこれでいけそうじゃん?と思ってやるでしょ?でもダメ。ちゃんとドキュメント読まないからこうなる。

リスト、キュー、ストリーム等、その他のコレクションは、要素を順番にアクセスしなければいけないという意味で本質的に逐次的だ。それらのコレクションは、似ている並列コレクションに要素をコピーすることで、並列版に変換される。

並列コレクションへの変換 | Scala Documentation

toParすれば何でもOK、みたいに思えちゃうんだけど、ここで使ってるのは並列「コレクション」だということを忘れてはならない。StreamやSeqがコレクションを経由せずそのままParにならないのは、順序保証するのが面倒だからでしょう。たぶん…。というか、Streamの用途を考えたらこれはリストやコレクションに出来ないからStreamになってるわけで、StreamをtoParするのは本質的に危険とも言えますね。じゃあなんでStreamがtoPar持ってるのかと言うと、そもそもtoParはParallelizableのメソッドで、SeqLikeがParallelizableをmixinしてて、Seq, LinerSeqを経由してStreamがtoParを使えることになってるっぽい。ParallelizeをStreamから切り離すならすべての末端のコレクションがぞれぞれParallelizableをmixinしなければならないわけで、これはちょっと面倒くさそうだな。しかもStreamでtoParしたいことだって絶対無いわけじゃないだろうし。まあいいや。

というわけで、でかいソースからデータをちょっとずつ読み出してすぐに並列処理を始めるというのも簡単そうで難しいんですよね。過去、こういうことは何度か経験しました。

Akka Stream

なんかこれだけのためにAkka Streamを使うのはちょっと大げさな感もあるのですが、Akka Streamは小規模なプログラムでもそこまで苦もなく書けるみたいなので書いてみました。

private[this] implicit val actorSystem = ActorSystem()
private[this] implicit val materializer = ActorMaterializer()
private[this] implicit val context = actorSystem.dispatcher

def traverse(src: String, dest: String) = {
  val srcDir = File(src)
  val destDir = File(dest)
  val parallelism = 16

  val source = Source{
    srcDir.listRecursively.filter(f => f.extension.map(_.toLowerCase()).contains(".jpg") ).toStream
  }

  val sink: Sink[File, Future[Done]] =
    Sink.foreachParallel(parallelism)(resizeAndCopy(_, destDir))
  val graph = source.runWith(sink)

  graph.onComplete{ _=>
    Await.result(actorSystem.terminate(), 10 seconds)
  }
}

IOもあるのでこれで16スレッド全部使ってくれるわけでもないですが、CPUは1000%以上(10スレッド飽和以上)使ってくれてました。

Source, Flow, Sinkというのが何なのかというのはちゃんと説明できる自信が無いので省略。

まとめ

並列コレクションを使えばRyzenの16スレッド分の計算資源も簡単に使い切ってくれる…というわけでもないです。Akka Streamは大規模データ処理というイメージが強いですが、意外とこういった素朴な用途でも使えるんじゃないかと思った。

あと、AkkaのActorは型が無くて微妙だったけどAkka Streamは型がちゃんとついてるのでいい感じ。以上。

自分の仕事に関連するところでは、SIer関連だと巨大なExcelを出力したいという「それ本当に必要なのか?」みたいな仕様がよくあり、普通にやるとPOIがOutOfMemoryで死んだりするのだけどこういうところでも使いたい。でもCSVならともかく、Excelはそもそもファイル構造的にStreamで吐くのは無理そうだな…。

次はこれをKotlinで実装してみます。