Runner in the High

技術のことをかくこころみ

JMOOCの「クラウドサービス・分散システム」の講座がいい感じ

内容的には自分が過去記事で紹介した「データ指向アプリケーションデザイン」の内容を、もう少しとっつきやすくかみ砕いたような雰囲気。大学での講義を収録したっぽいものなので、ある程度初めての人でも分かりやすいかもしれない。

lms.gacco.org

この講座の中で触れられているトピックは

  • 分散コミット(2層コミット, 3層コミット)
  • 定足数(クオラム, ビザンチン合意)
  • 論理クロック
  • データ複製の一貫性(強整合性, 因果整合性, 弱整合性, 結果整合性, ...)
  • IaC(Infrastructure as Code)

などが中心になっている。

定足数や論理クロックのあたりは、実際にアプリケーション開発をやっていて必ず登場する概念ではないが、クラウドプラットフォームから提供されるデータベース製品の仕様だったり、オーケストレータの挙動を理解するためにはある程度知っている方がいいのかなと思う。

一貫性回りの話は、DynamoDBだったりSpannerだったりいわゆるNoSQL周りを使うならばもはや必須の知識と言える。自分の場合は、社会人になってからプロダクト開発の都合上GoogleのDatastoreを触るようになった関係で先輩から整合性に関して細かく手ほどきを受けたけれど、こういう話を学生の頃から学べるのはちょっとうらやましい。

Goでfdkaacを用いてm4a/mp4のraw aacなデータをデコードする

github.com

fdk-aacとそのgoバインディングgo-fdkaacを使ってやってみたので、試行錯誤過程のメモ。

ffmpegみたいなツールを用いた変換方法はネット上にゴロゴロしているのに、スタンドアロンなライブラリを使ってデコードをするとなると情報がめちゃくちゃ限られてくる。

特にaacデコーダ実装はデータ部にADTSというフレーム情報を保持するヘッダがあることが前提の話ばかりで、m4a/mp4に含まれるraw aacというヘッダ情報のないaacのデコードに関する情報はかなり見つけるのが難しい。

m4aとraw aacの関係性

まず大前提としてmpeg4の構造を知る必要がある。以下の記事が詳しかった。

qiita.com

データ構造のうちmdatと呼ばれる部分にaacなデータが含まれているが、これがADTSヘッダを持たないraw aacと呼ばれるものになっている。なので、単純なデータ操作(例えばddコマンドなど)でmdatからデータを抜き出してaacファイルとして再生しようとしても、これでは単なる不正なデータとして扱われてしまう*1。ADTSヘッダがないということは、当然代わりの相当するデータがm4aのデータ構造の中に書き込まれているはずで、m4aで言うとstblがそれにあたる。

特にfdk-aacにおいてはフレームサイズというのが非常に重要で、結果を受け取るためにデコーダのメソッドへ参照渡しされるバイト配列のサイズがフレームサイズとイコールで扱われているため、アロケーションされているバイト配列のサイズがフレームサイズと異なっているとエラーがでてしまいデコーダを動かすことができない。フレームサイズが1024とか2048みたいな固定長のサイズならまだしも、実際にはストリーム中のフレームサイズは数バイト単位で変動するため「どのフレームが何バイトか」という情報がなければデコード処理は実行できないということになる。

ADTSであればヘッダ情報の一部としてフレーム長が含まれているのだが、m4aではraw aacなのでそうはいかない。ではどうすればいいのかというと、ADTSヘッダの代わりにstbl配下のstszに格納されているフレームサイズの情報を参照することになる。

デマルチプレクサ

まったく専門外なので正しいことを言えているかは微妙だが、どうやらメディアデータの変換周りではデマルチプレクサと呼ばれるものが基本的には必要らしい。たとえばlibavformatが最もオールインワンなものとして有名っぽい。libav以外にも、いろんな人があるフォーマットに特化したデマルチプレクサを作っていたりする。Node.jsで動くts/mp4/flv用のデマルチプレクサもあった。

Goにもlibavのバインディング実装(imkira/go-libav)があり、そこそこ頑張っているっぽかった。しかし、libavの機能をcgoでラップするという壮大な世界観がメンテしきなかったのかここ数年は放置されているのが現状で、getしてみると実際にはビルドが通らず使いモノにならなかった。もしもlibavを使ってデコーダを実装するなら、自前でcgoを使ってラッパーを書くほうがましかもしれない。aacデコーダも内蔵されているので、デコーディングライブラリに拘らないのならばfdk-aacを使う必要すらない。

いずれにしても今回のケースではm4a/mp4用のデマルチプレクサだけが使えればよく、そのためだけにlibavを使うのはライブラリ自体を使うために求められる知識が多すぎて疲弊してしまう。

mp4リーダー

内部的にやっていることはlibavと同じなのかもしれないが、もう少しmp4に特化したものがある。例えばサイバーエージェントからはabema/go-mp4がでているし、他にもalfg/mp4というやつもある。どちらのライブラリも、上のQiita記事で解説されているatomsの情報をmp4のバイナリデータからパースして、ある程度参照しやすいような形(構造体とか)に変換してくれる機能*2を持っている。

これとfdk-aacを組み合わせれば、raw aacのデコードに必要な情報(フレームサイズ)をstszから読み取ることができる。今回はalfg/mp4の方を使ってみた*3

stszからフレームサイズを取り出す部分の実装は次のようになる。

// stszセクションから取り出したraw aacのフレームサイズ情報を保持する構造体
type frameSizes struct {
    frameOffset uint
    size        uint
    buffer      []byte
}

// atom.Mp4Readerを用いてstszセクションのデータを読み取り、ヘッダをスキップしたデータ部をframeSizes構造体として抜き出す
func newFrameSizes(reader *atom.Mp4Reader) (*frameSizes, error) {
    const stszHeaderOffset = 12 + 8 // stszのヘッダ部分をskipする分のオフセットサイズ

    stsz := reader.Moov.Traks[0].Mdia.Minf.Stbl.Stsz
    stszBuffer := make([]byte, stsz.Size)st
    if _, err := io.NewSectionReader(
        stsz.Reader.Reader,
        stsz.Start+int64(stszHeaderOffset),
        stsz.Size-int64(stszHeaderOffset),
    ).Read(stszBuffer); err != nil {
        return nil, err
    }

    return &frameSizes{
        frameOffset: 0,
        buffer:      stszBuffer,
    }, nil
}

// stszのデータ部にはビッグエンディアンで4バイトごとのデータとして格納されている
// binary.BigEndian.Uint32で変換してint64に変換することで10進数データとしてフレームサイズが計算できる。
func (v *frameSizes) Next() *int64 {
    const uint32byteSize = 4

    if len(v.buffer) < int(v.frameOffset) {
        return nil
    }

    frameSize := int64(binary.BigEndian.Uint32(v.buffer[v.frameOffset : v.frameOffset+uint32byteSize]))
    v.frameOffset += uint32byteSize
    return &frameSize
}

Nextメソッドで次のフレームサイズが計算されて返される間、forループでデコード処理を継続する。

以下抜粋。

offset := int64(mdatOffset)

for {
    nextFrameSize := frameSizes.Next()
    if nextFrameSize == nil {
        break
    }

    // 計算されたフレームサイズのぶんだけmdatからデータを読み取る
    part := make([]byte, *nextFrameSize)
    readCount, err := io.NewSectionReader(v.Mdat.Reader.Reader, offset, *nextFrameSize).Read(part)
    if err == io.EOF {
        break
    } else if err != nil {
        panic(err)
    }

    // mdatから読み取ったデータに対してデコード処理を実行する
    if pcm, err = d.Decode(part[:readCount]); err != nil {
        panic(err)
    }

    offset += *nextFrameSize
    if len(pcm) == 0 {
        continue
    }

    pcmWriter.Write(pcm)
}

go-fdkaacのDecodeメソッドはフレームサイズが正しくないと0x4002(AAC_DEC_PARSE_ERROR)をエラーコードとして吐き出してくるが、具体的に何が原因なのかを教えてくれないので非常に難しい。このサンプルコードでは、音声と動画が入っているmp4ファイルのことは考慮していないため、もし音声以外のストリームが含まれていた場合フレームに含まれるデータはもっと複雑なものになる。

fdk-aacソースコード中にあるエラーコードのコメントはほとんど参考にならないので、エラーが出てしまうとC言語で書かれたfdk-aacの実装を読むほかなくなってしまう。が、もちろんソースコードを読んでわかることの方が少ない...

ASC(Audio Specific Config)の値について

raw aacのデコードでフレームサイズの次に重要なのがASC(Audio Specific Config)と呼ばれる16進数値の組み合わせだ。

ADTSヘッダを持つaacなデータであれば、デコードに必要なオーディオタイプ、サンプリング周波数、チャネル設定などがすべてADTSに保存されているが、raw aacの場合にはデコーダに対して明示的に対象のraw aacをどのようにデコードするのか伝えてやらねばならない。

今回のコードで言うと、以下のデコーダを初期化する箇所で与えている16進数のデータがそれにあたる。

// AAC LC/44100Hz/2channelsなASCの設定でfdk-aacのデコーダを初期化
// (Ref: https://wiki.multimedia.cx/index.php/MPEG-4_Audio#Audio_Specific_Config)
d := fdkaac.NewAacDecoder()
if err := d.InitRaw([]byte{0x12, 0x10}); err != nil {
    panic(err)
}
defer d.Close()

この16進数はビットフラグでデコードの設定を表しているもので、ASCのビットフラグの説明は以下のものしかネットでは見つからない。

5 bits: object type
if (object type == 31)
    6 bits + 32: object type
4 bits: frequency index
if (frequency index == 15)
    24 bits: frequency
4 bits: channel configuration
var bits: AOT Specific Config

もちろんこのビットフラグを自分で組み立ててもいいが、自分でASCのビットフラグを組み立てなくてもffprobeを使えばmp4/m4aに含まれるaacのASCを知ることができる。

$  ffprobe -show_data -show_streams -hide_banner default.m4a

... 省略 ...

[STREAM]
index=0
codec_name=aac
codec_long_name=AAC (Advanced Audio Coding)
profile=LC
codec_type=audio
codec_time_base=1/44100
codec_tag_string=mp4a
codec_tag=0x6134706d
sample_fmt=fltp
sample_rate=44100
channels=2
channel_layout=stereo
bits_per_sample=0
id=N/A
r_frame_rate=0/0
avg_frame_rate=0/0
time_base=1/44100
start_pts=0
start_time=0.000000
duration_ts=2015987
duration=45.713991
bit_rate=128424
max_bit_rate=128424
bits_per_raw_sample=N/A
nb_frames=1970
nb_read_frames=N/A
nb_read_packets=N/A
extradata=
00000000: 1210 56e5 00                             ..V..

DISPOSITION:default=1
DISPOSITION:dub=0
DISPOSITION:original=0
DISPOSITION:comment=0
DISPOSITION:lyrics=0
DISPOSITION:karaoke=0
DISPOSITION:forced=0
DISPOSITION:hearing_impaired=0
DISPOSITION:visual_impaired=0
DISPOSITION:clean_effects=0
DISPOSITION:attached_pic=0
DISPOSITION:timed_thumbnails=0
TAG:language=und
TAG:handler_name=SoundHandler
[/STREAM]

ffprobeから出力される結果のうち以下の箇所がASCになっている。

extradata=
00000000: 1210 56e5 00                             ..V..

このデータのうち 1210 という箇所を 0x12 0x10 にしてやればよい。先頭のふたつだけがASCになる。

ffprobeでこういうデータが分かるということは、おそらくmp4中のどこかのatomに格納されているのかもしれない。が、今回はそれは調べていない。調べました (2022/06/19 追記) izumisy.work

*1:どうやら頑張ればバイナリデータのパターンからフレームデータのチャンクを識別できたりするらしいのだが、余計に情報が少なく泥沼に入ってしまいそうな気がしたのでやっていない。ネットの情報を見ているとfaadというaac用のデコーダはraw aacをADTS付のaacにデコードできるらしい。おそらく、フレームデータを直接エスパーしているのだろう。

*2:こういうのもデマルチプレクサっていうのかな?

*3:こちらはstszの読み取りに対応していなかったので今回はIzumiSy/mp4としてフォークしたものを使った

fdk-aacでM4A→WAV変換サンプル実装のビルド方法

izumisy.work

上記の記事のビルドをやっている前提

# libavformatは必須らしい
$ sudo apt install libavformat-dev

# fdk-aac配下でサンプルがあるブランチに切り替え
$ cd fdk-aac
$ git checkout decoder-example

# m4a-dec.cをビルド
# (https://github.com/mstorsjo/fdk-aac/issues/66#issuecomment-304263604から拾ったやつ)
$ gcc m4a-dec.c -o m4a-dec wavreader.c wavwriter.c \
    -I./libAACdec/include -I./libAACenc/include \
    -I./libSBRdec/include -I./libSBRenc/include \
    -I./libMpegTPDec/include -I./libMpegTPEnc/include \
    -I./libFDK/include  -I./libSYS/include ./.libs/libfdk-aac.a -I./libPCMutils/include \
    -lm -Wdeprecated-declarations -lavcodec -lavformat -lavutil

あとは m4a-dec というファイルが実行できればok

Ubuntu環境でのfdk-aacビルド方法

github.com

リポジトリにはREADMEどころか何も書かれていないのでメモしておく

# ビルドに必要なパッケージをインストール
$ sudo apt install autoconf libtool

# リポジトリをクローン
$ git clone git@github.com:mstorsjo/fdk-aac.git

# ビルド
$ cd fdk-aac
$ bash autogen.sh
$ ./configure --prefix=`pwd`/objs
$ make 
$ make install

Golangのあの動的にDIするやつ

名称が分からないが、実際の例で言うとcloudspannerecosystem/yoにあるこういうやつ。

// YOLog provides the log func used by generated queries.
var YOLog = func(context.Context, string, ...interface{}) {}

これだけ見ると何のために定義されているか分からない。ではなんなのかと言うと、これは後から動的に実装を差し込むために用意されているインターフェイス的なやつらしい。

親しいコードを用意すると以下のよう感じ。

package main

import (
    "fmt"
)

// ロガーのインターフェイス
var Log = func(string) {}

func main() {
    // LogAの実装に差し替え
    Log = func(msg string) {
        fmt.Printf("LogA: %s\n", msg)
    }
    Log("Hello World") // LogA: Hello World
 
    // LogBの実装に差し替え
    Log = func(msg string) {
        fmt.Printf("LogB: %s\n", msg)
    }
    Log("Hello World") // LogB: Hello World
}

Log自体には定義の段階では実装は空だが、それが呼び出されるタイミング(あるいはどこかの初期化のタイミング)で実装を差し込むことで好きな振る舞いを与えられる。普段からImmutableな雰囲気のコードばっかり書いているとこういうmutableな手法はなかなか思いつかないが、こういうのがGoっぽいのかもしれない。

今回の例は変数の型が関数だからいいが、これが構造体のポインタとかになると複雑なアプリケーションではいつ実装が差し込まれるのか分からずSEGVしてしまい危険。絶妙なタイミングで発生するランタイムエラーなバグを埋め込む可能性がある。なので、当然といえば当然だが、意図的に定義と同時に空の実装を与えたり未初期化のときに呼び出されたらエラーログを吐くなどの実装をしておいたほうが無難だろうなとは思う。

ビジネス・アーキテクチャとシステム・アーキテクチャ

これにめちゃくちゃ共感した。

よくあるアーキテクチャ設計では、ドメイン層と応用層を分離することで応用層におけるコントロール不能な変化をドメイン層に及ばないようにさせたりする。しかし、現実のプロダクト開発ではDBやフレームワークなどの応用層で起きる変化よりもドメイン層の変化のほうが圧倒的に多い。システムにおけるすべての中心にあり、あらゆるところから依存されているドメインが頻繁に変化するとなると非常にツライ。

とはいえ、ビジネス・アーキテクチャが定まっていない領域のアプリケーションほど短いスパンで頻繁なドメインの変化を求められることはむしろ必然で、ドメインを柔軟に変化させ差別化することで競合優位性を確保せねばならない。そのようなケースで設計方針などの開発都合によりドメインを変化させきれない製品のほうこそが市場競争から脱落していく。ドメインの変化に耐えることを第一にして設計せず、ただ単に教科書的な永続化装置との切り離しなどを目的視して重厚に設計されたアプリケーション基盤は、一見教科書的には正しく見えるだけに、目に見えない形で生産効率の足を引っ張ってくる。

では「コアとなるドメインがコロコロ変わるならそもそも丁寧にモデリングをすることすら意味がないのか」というとそんなことはなく、前提としてコアが変わることを念頭に置いてシステムを設計*1をすればいい。しかし見通しの立たない未来の変化に対して過度に汎化しすぎると、今度はドメインの表現力を失ってしまう。こうなると、唯一の対処法はビジネス・ドメインの変化の速さに追従するレベルで頻繁にドメインの分析とコードへの反映を地道にやっていくしかない。

これを実現するためには早い段階で頻繁にリファクタリングをしていく必要があるし、頻繁にリファクタリングをするためにはデリバリのスループットを止めないために効率のいい自動テストも必要になる。しかし、実際にはリファクタリングばかりをやっているわけにはいかないし、必ずしもテストが最初から潤沢に用意されていないこともある。そして、そういう環境が変わらないケースもある。極論、プロダクトに関わる人々が "ドメイン層の変化"という抜本的な影響の及ぶ開発コストに対してどのような態度をとるか? がシステム・アーキテクチャの在り方へ大きなインパクトを与える原因なのではないかと思う。

リファクタリングと言えば 「なぜ機能追加しないものに手をつける工数(予算)が必要なんだ云々」*2 という話は往々にして起きがちで、システム・アーキテクチャの形をいびつにさせる大きな原因は技術力ではなくむしろ社会学的/経済学的なボトルネックに集約されるだろう。大抵、こういう現象はプロダクトに対して市場規模・ビジネスモデルの都合上金回りが悪かったり、顧客とのパワーバランスからくる短期的な価値提供(機能開発)を強いられたりする場合に起きる。

個人的な体感では、toBなビジネスにおいては"対象とする顧客の業界属性""導入ニーズに対するパワーバランスのコントロール"がシステム・アーキテクチャの形を決める大きな鍵のような気がしている。もしも、あらゆる顧客の要望をすべて受け入れるイエスマンになってしまえば受注こそ楽かもしれないが、あっという間にカオスなAll-in-oneソフトウェアに様変わりし、継ぎはぎのメンテナンスと機能追加で工数が無限に溶けるのは目に見えている。そんなプロダクトの作り方をしている組織がドメインの分析をしたり継続的なリファクタリングをしているとは、到底思えない。

*1:もちろん一から書き直した方が手っ取り早いこともある

*2:色々と凝り固まった組織でもいろんな人たちを納得させてエンジニアリングに対する考え方を変えられるエンジニアが個人的には最強だと思う。特にある程度の規模の組織においては。

redigoを使ったPubSubアダプタの実装

自分が開発しているgo-cleanarchitectureの中でドメインイベントのPublisher実装としてredigoを使ったPubSubアダプタの実装を用意したが、思ったよりもredigoを使ったRedisのPubSub周りに関連する実装があまりネットに転がっていなかった。

domainsパッケージなどgo-cleanarchitecture固有の実装も絡んでいるが、それでも参考になりそうなのでアダプタの実装コードを抜粋として残しておく。

https://github.com/IzumiSy/go-cleanarchitecture/blob/master/adapters/pubsub/redis.go

package pubsub

import (
    "encoding/json"
    "fmt"
    "go-cleanarchitecture/domains"
    "go-cleanarchitecture/domains/errors"

    "github.com/gomodule/redigo/redis"
)

// サブスクライバ実装のインターフェイス
type Subscriber = func(payload []byte) error

// PubSubアダプタ実装の構造体
type RedisAdapter struct {
    conn        redis.Conn
    psc         redis.PubSubConn
    subscribers map[string]Subscriber
    logger      domains.Logger
}

// RedisAdapterがdomains.EventPublisherの実装を満たしているかを型チェック
var _ domains.EventPublisher = RedisAdapter{}

func NewRedisAdapter(logger domains.Logger) (error, RedisAdapter) {
    // publish用のredisコネクション
    pubConn, err := redis.Dial("tcp", "redis:6379")
    if err != nil {
        return err, RedisAdapter{}
    }

    // subscribe用のredisコネクション
    subConn, err := redis.Dial("tcp", "redis:6379")
    if err != nil {
        return err, RedisAdapter{}
    }

    return nil, RedisAdapter{
        conn:        pubConn,
        psc:         redis.PubSubConn{Conn: subConn},
        subscribers: map[string]Subscriber{},
        logger:      logger,
    }
}

// ドメインイベントをPublishする
func (adapter RedisAdapter) Publish(event domains.Event) errors.Domain {
    eventBytes, err := json.Marshal(event)
    if err != nil {
        return errors.Postconditional(err)
    }

    _, err = adapter.conn.Do("PUBLISH", string(event.Name()), eventBytes)
    return errors.Postconditional(err)
}

// サブスクライバを登録する
func (adapter RedisAdapter) RegisterSubscriber(eventName domains.EventName, subscriber func(payload []byte) error) {
    adapter.subscribers[string(eventName)] = subscriber
}

// イベントのpublishを監視するgoroutineを起動する
func (adapter RedisAdapter) Listen() {
    var channels []string
    for c := range adapter.subscribers {
        channels = append(channels, c)
    }

    if err := adapter.psc.Subscribe(redis.Args{}.AddFlat(channels)...); err != nil {
        adapter.logger.Error(fmt.Sprintf("Failed to start listening: %s", err.Error()))
        return
    }

    for {
        switch n := adapter.psc.Receive().(type) {
        case error:
            adapter.logger.Error(fmt.Sprintf("Error received: %s", n.Error()))
            return
        case redis.Message:
            // publishされたメッセージを補足した(n.Channelに対象のチャネル文字列が入っている)
            subscriber, ok := adapter.subscribers[n.Channel]
            if ok {
                // 対象のサブスクライバがあればデータを渡して実行する
                // 本当はここでエラーが出たらリトライを実行したほうが良い(Exponential Backoffとかで)
                subscriber(n.Data)
            }
        case redis.Subscription:
            switch n.Kind {
            case "subscribe":
                // Channelがsubscribeされた
                adapter.logger.Info(fmt.Sprintf("%s subscribed", n.Channel))
            case "unsubscribed":
                // Channelのsubscribeが解除された
                adapter.logger.Info(fmt.Sprintf("%s unsubscribed", n.Channel))
            }
        }
    }
}

個人的にハマったのはredigoでRedisのPubSubを利用するためにはPubslisher/Subscriberでそれぞれ個別のコネクションを用意しておかないといけないこと。

これが分からなくて数時間悩んだ。

VimでGitの操作環境を整える

自分はvim-fugitiveとvim-flogを使っている

Plug 'tpope/vim-fugitive'
Plug 'rbong/vim-flog'

fugitiveだけでもgit logは見れるが --decorate--graph などのコマンドを使っても色がつかないのでコミットログを見ることに関しては若干微妙。

ログを見るという単一用途に関してはvim-flogがあったほうがいい。Vimに統合されたtigのような使い心地で非常に便利だと思う。

キーバインディング

基本的には <leader>g で関連操作ができるように統一している。

nnoremap <silent><leader>gs :Git<CR>:20wincmd_<CR>
nnoremap <silent><leader>gd :vert Git diff --staged<CR>
nnoremap <silent><leader>gl :vert Flogsplit<CR>
nnoremap <silent><leader>gg :Git commit<CR>
nnoremap <silent><leader>gw :Git commit -m "wip"<CR>
nnoremap <silent><leader>ga :Git add . --verbose<CR>
nnoremap <silent><leader>gp :call GitPushCurrentBranch()<CR>

" Pushes commits to the branch whose name is the same as we are currently checking out on
function! GitPushCurrentBranch()
  let branch = trim(system('git branch --show-current'))
  echo "Git pushing to the remote branch... (" . branch . ")"
  execute ":Git push origin " . branch
endfunction

GitPushCurrentBranch はまあまあ便利で、いまチェックアウトしているブランチを同名のリモートブランチに対してpushできる。

<leader>gs につけている :20wincmd_<CR> はウィンドウの高さを制限するコマンド。これがないとgit statusしたときに画面の半分くらいでsplitされてしまって邪魔なので、20くらいにしている。