ストリームのduplexを使って読み込みと書き込みのストリームをまとめる

この記事は前回の記事の補足な立ち位置である
前回の記事がこちら

あわせて読みたい
【Node.js】PDFが投稿された時にサムネイル画像を作成する チャットでpdf画像が投稿された時にサムネイル用の画像を作成するという処理を実装した ローカルで単純にpdfから画像を作成するコマンドを作成するだけならそんなに難し...

前回の記事の完成形コードとして、読み込みストリームと書き込みストリームを別々にしたまま終わっていた

あれでも問題はないのだが、やはりまとめてスッキリ書きたいというのとそもそもエラーハンドリングが十分でなかったのでその辺をちゃんと追記するというのがこの記事の目的である

目次

deplexの使用

まずは前回の完成形コードを見てみる

import { spawn } from 'child_process'

import { storage } from 'firebase-admin'
import { v4 as uuidv4 } from 'uuid'

export const createImgFromPdf: (name: string) => Promise<string> = async name => {
  const thumbnailFile = storage()
    .bucket()
    .file(name.replace(/(\.[\w\d]+)$/i, `_1000x1000.jpeg`))
  const firebaseStorageDownloadTokens = uuidv4()

  const pdf2jpeg = spawn(
    'gs -sstdout=%stderr -sDEVICE=jpeg -r300 -o - - | convert -resize 1000x1000 jpeg:- jpeg:-',
    { shell: true },
  )
  pdf2jpeg.stderr.on('data', data => console.log(data.toString()))

  return new Promise((resolve, reject) => {
    storage().bucket().file(name).createReadStream().pipe(pdf2jpeg.stdin)

    pdf2jpeg.stdout
      .pipe(thumbnailFile.createWriteStream())
      .once('error', reject)
      .once('finish', async () => {
        await thumbnailFile.setMetadata({
          metadata: { firebaseStorageDownloadTokens },
        })
        resolve(
          `https://firebasestorage.googleapis.com/v0/b/${
            thumbnailFile.bucket.name
          }/o/${encodeURIComponent(
            thumbnailFile.name,
          )}?alt=media&token=${firebaseStorageDownloadTokens}`,
        )
      })
  })
}

ハイライトした部分が標準入力と標準出力の記述が乖離してしまっている

記述は乖離しているが、pdf2jpegはspawnのプロセスなのでプロセスとしては繋がっている
しかし、このコードは問題があって、標準出力の方はエラーハンドリングの記述があるが、標準入力の方(19行目)にはエラーハンドリングがない
(後述するが、関数の返り値の観点からこの時点での標準出力の方のエラーハンドリングも不適切)

したがってこちらにも適切にエラーハンドリングする必要があるが、それは煩雑になってしまう

そこでこれらをまとめてみる。
その時に使うのがduplexという読み込み兼書き込みのストリームである

さて、Node.jsの組み込みモジュールのchild processの中にもduplexはあるのだが、それだと色々な処理を自前で書く必要があってつらい

なので今回はduplexer2というライブラリを使うことにした
これを使って該当部分を書き直した結果がこちら

import duplexer2 from 'duplexer2'

...

storage()
  .bucket()
  .file(name)
  .createReadStream()
  .pipe(duplexer2(pdf2jpeg.stdin, pdf2jpeg.stdout))
  .pipe(thumbnailFile.createWriteStream())
  .once('error', reject)

...

めっちゃスッキリ書けたしわりと簡単だった笑

リファクタリングとエラーハンドリング

リファクタリング

先程のままでもいいのだが、変換処理を行う部分を別の関数として切り出してみる

import { spawn } from 'child_process'

import duplexer2 from 'duplexer2'
import { storage } from 'firebase-admin'
import { v4 as uuidv4 } from 'uuid'

export const createImgFromPdf: (name: string) => Promise<string> = async name => {
  const resizedFile = storage()
    .bucket()
    .file(name.replace(/(\.[\w\d]+)$/i, `_1000x1000.jpeg`))
  const firebaseStorageDownloadTokens = uuidv4()

  return new Promise((resolve, reject) => {
    storage()
      .bucket()
      .file(name)
      .createReadStream()
      .pipe(pdf2jpeg())
      .pipe(resizedFile.createWriteStream())
      .once('error', reject)
      .once('finish', async () => {
        await resizedFile.setMetadata({
          metadata: { firebaseStorageDownloadTokens },
        })
        resolve(
          `https://firebasestorage.googleapis.com/v0/b/${
            resizedFile.bucket.name
          }/o/${encodeURIComponent(
            resizedFile.name,
          )}?alt=media&token=${firebaseStorageDownloadTokens}`,
        )
      })
  })
}

function pdf2jpeg() {
  const {
    stdin,
    stdout,
    stderr,
  } = spawn(
    `gs -sstdout=%stderr -sDEVICE=jpeg -r300 -o - - | convert -resize 1000x1000 jpeg:- jpeg:-`,
    { shell: true },
  )
  stderr.on('data', data => console.log(data.toString()))

  return duplexer2(stdin, stdout)
}

単純に切り出しただけ
ついでにspawnの返り値で返ってくるプロセスから分割代入で標準入力、標準出力、標準エラー出力をそれぞれ取り出して使っている

そんで関数のreturnとしてduplexer2で標準入力と標準出力を使用したdeplexを返すようにした

エラーハンドリング

さて、現状標準エラー出力で出てきたものをconsole.logでログに残すようにしている

stderr.on('data', data => console.log(data.toString()))

これでもいいのだが、これだとnodeの一連のストリームでエラーをキャッチできない
pdf2jpegでエラーが起こってもそれをnodeのストリームの7行目の.once('error', ...)ではキャッチできない

storage()
  .bucket()
  .file(name)
  .createReadStream()
  .pipe(pdf2jpeg())
  .pipe(resizedFile.createWriteStream())
  .once('error', reject)
  .once('finish', async () => {
  ...

今回はこのcreateImgFromPdf関数の返り値としてPromise<string>を返すようになっており、そこでエラーが起こった時にエラーをキャッチしてconsole.errorでエラーログを出力するような実装にしている

したがって、ここではpdf2jpegの過程でエラーが起こった時にはそれをちゃんとキャッチしてエラーオブジェクトを返すように実装したい

それを行ったのが以下の実装

.pipe(pdf2jpeg().once('error', reject))

...

function pdf2jpeg(width: number, height: number) {
  const {
    stdin,
    stdout,
    stderr,
  } = spawn(
    `gs -sstdout=%stderr -sDEVICE=jpeg -r300 -dLastPage=1 -dTextAlphaBits=4 -dGraphicsAlphaBits=4 -o - - | convert -resize ${width}x${height} jpeg:- jpeg:-`,
    { shell: true },
  ).on('exit', code => {
    if (code !== 0) stdout.emit('error', new Error(String(stderr.read())))
  })

  return duplexer2(stdin, stdout)
}

まず、13-15行目
このプロセスが終了した時にcodeが0ではない場合、つまり正常終了しなかった場合にstdout.emit('error', ...)で標準出力でエラーを吐き出すようにしている

吐き出すエラーの中身は標準エラー出力の内容を読み取った文字列をnew Errorでエラーオブジェクトとしたもの

そして1行目でpdf2jpeg関数に対して.once('error')...でrejectしている
pdf2jpegの型はduplexなので.onceが使える

まとめると、spawnの過程で生じたエラーは標準出力でエラーオブジェクトを吐き出すようにしてあり、それを.once('error', ...)でキャッチしてrejectするようにしている

これによってspawnでエラーが発生した場合は、このcreateImgFromPdf関数の返り値はrejectしたエラーオブジェクトになるというわけである

これで実装もまとめられてスッキリした上に、エラーハンドリングも適切に行うことができた

on, onceのエラーハンドリングの違い

適切にエラーハンドリングが実装できたと思っていたのだが、実際にエラーを起こすとちゃんとハンドリングできていなかった。。。

実はerrorハンドリングとしてonceを使っていたがこれがよくなかった
結論、onを使うべき

.pipe(pdf2jpeg().once('error', reject)) // NG
.pipe(pdf2jpeg().on('error', reject)) // OK

公式ドキュメントのemitter.onceの部分を読むとこんなことが書いてある

Adds a one-time listener function for the event named eventName.

The next time eventName is triggered, this listener is removed and then invoked.

https://nodejs.org/api/events.html#events_emitter_once_eventname_listener

つまり一度.onceでイベントが発火すると、その時点でリスナーは削除される
そのため、一度のプロセス中にエラーが2回以上起こると一度目のエラーはハンドリングされるが、2回目以降のエラーはここではハンドリングできなくなる

これがonになると2回目以降のエラーも適切にハンドリングできるようになる

したがってエラーハンドリングに関わるところは.on('error', ...)にすべき

こうして無事エラーハンドリングも適切にできるようになった

まとめ

頻繁に使うわけではないかもしれないけど、この辺の実装を通してI/O、Promise、エラーハンドリングについて深堀りできたのでとても勉強になった

参考

GitHub
GitHub - deoxxa/duplexer2: Like duplexer (http://npm.im/duplexer) but using streams2 Like duplexer (http://npm.im/duplexer) but using streams2 - deoxxa/duplexer2
あわせて読みたい
よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!
目次