Runner in the High

技術のことをかくこころみ

redigoを使ったPubSubアダプタの実装

自分が開発しているgo-cleanarchitectureの中でドメインイベントのPublisher実装としてredigoを使ったPubSubアダプタの実装を用意したが、思ったよりもredigoを使ったRedisのPubSub周りに関連する実装があまりネットに転がっていなかった。

domainsパッケージなどgo-cleanarchitecture固有の実装も絡んでいるが、それでも参考になりそうなのでアダプタの実装コードを抜粋として残しておく。

https://github.com/IzumiSy/go-cleanarchitecture/blob/master/adapters/pubsub/redis.go

package pubsub

import (
    "encoding/json"
    "fmt"
    "go-cleanarchitecture/domains"
    "go-cleanarchitecture/domains/errors"

    "github.com/gomodule/redigo/redis"
)

// サブスクライバ実装のインターフェイス
type Subscriber = func(payload []byte) error

// PubSubアダプタ実装の構造体
type RedisAdapter struct {
    conn        redis.Conn
    psc         redis.PubSubConn
    subscribers map[string]Subscriber
    logger      domains.Logger
}

// RedisAdapterがdomains.EventPublisherの実装を満たしているかを型チェック
var _ domains.EventPublisher = RedisAdapter{}

func NewRedisAdapter(logger domains.Logger) (error, RedisAdapter) {
    // publish用のredisコネクション
    pubConn, err := redis.Dial("tcp", "redis:6379")
    if err != nil {
        return err, RedisAdapter{}
    }

    // subscribe用のredisコネクション
    subConn, err := redis.Dial("tcp", "redis:6379")
    if err != nil {
        return err, RedisAdapter{}
    }

    return nil, RedisAdapter{
        conn:        pubConn,
        psc:         redis.PubSubConn{Conn: subConn},
        subscribers: map[string]Subscriber{},
        logger:      logger,
    }
}

// ドメインイベントをPublishする
func (adapter RedisAdapter) Publish(event domains.Event) errors.Domain {
    eventBytes, err := json.Marshal(event)
    if err != nil {
        return errors.Postconditional(err)
    }

    _, err = adapter.conn.Do("PUBLISH", string(event.Name()), eventBytes)
    return errors.Postconditional(err)
}

// サブスクライバを登録する
func (adapter RedisAdapter) RegisterSubscriber(eventName domains.EventName, subscriber func(payload []byte) error) {
    adapter.subscribers[string(eventName)] = subscriber
}

// イベントのpublishを監視するgoroutineを起動する
func (adapter RedisAdapter) Listen() {
    var channels []string
    for c := range adapter.subscribers {
        channels = append(channels, c)
    }

    if err := adapter.psc.Subscribe(redis.Args{}.AddFlat(channels)...); err != nil {
        adapter.logger.Error(fmt.Sprintf("Failed to start listening: %s", err.Error()))
        return
    }

    for {
        switch n := adapter.psc.Receive().(type) {
        case error:
            adapter.logger.Error(fmt.Sprintf("Error received: %s", n.Error()))
            return
        case redis.Message:
            // publishされたメッセージを補足した(n.Channelに対象のチャネル文字列が入っている)
            subscriber, ok := adapter.subscribers[n.Channel]
            if ok {
                // 対象のサブスクライバがあればデータを渡して実行する
                // 本当はここでエラーが出たらリトライを実行したほうが良い(Exponential Backoffとかで)
                subscriber(n.Data)
            }
        case redis.Subscription:
            switch n.Kind {
            case "subscribe":
                // Channelがsubscribeされた
                adapter.logger.Info(fmt.Sprintf("%s subscribed", n.Channel))
            case "unsubscribed":
                // Channelのsubscribeが解除された
                adapter.logger.Info(fmt.Sprintf("%s unsubscribed", n.Channel))
            }
        }
    }
}

個人的にハマったのはredigoでRedisのPubSubを利用するためにはPubslisher/Subscriberでそれぞれ個別のコネクションを用意しておかないといけないこと。

これが分からなくて数時間悩んだ。

VimでGitの操作環境を整える

自分はvim-fugitiveとvim-flogを使っている

Plug 'tpope/vim-fugitive'
Plug 'rbong/vim-flog'

fugitiveだけでもgit logは見れるが --decorate--graph などのコマンドを使っても色がつかないのでコミットログを見ることに関しては若干微妙。

ログを見るという単一用途に関してはvim-flogがあったほうがいい。Vimに統合されたtigのような使い心地で非常に便利だと思う。

キーバインディング

基本的には <leader>g で関連操作ができるように統一している。

nnoremap <silent><leader>gs :Git<CR>:20wincmd_<CR>
nnoremap <silent><leader>gd :vert Git diff --staged<CR>
nnoremap <silent><leader>gl :vert Flogsplit<CR>
nnoremap <silent><leader>gg :Git commit<CR>
nnoremap <silent><leader>gw :Git commit -m "wip"<CR>
nnoremap <silent><leader>ga :Git add . --verbose<CR>
nnoremap <silent><leader>gp :call GitPushCurrentBranch()<CR>

" Pushes commits to the branch whose name is the same as we are currently checking out on
function! GitPushCurrentBranch()
  let branch = trim(system('git branch --show-current'))
  echo "Git pushing to the remote branch... (" . branch . ")"
  execute ":Git push origin " . branch
endfunction

GitPushCurrentBranch はまあまあ便利で、いまチェックアウトしているブランチを同名のリモートブランチに対してpushできる。

<leader>gs につけている :20wincmd_<CR> はウィンドウの高さを制限するコマンド。これがないとgit statusしたときに画面の半分くらいでsplitされてしまって邪魔なので、20くらいにしている。

メルカリShopsの開発組織に関する記事を読んで

engineering.mercari.com

メルカリShopsの開発組織に関する記事が興味深かった。ソフトウェアエンジニアがフロントエンド/バックエンド関係なく開発をするというのは、たしかに開発組織の理想形だと思う。

2020年にオークランドで開催されたDeveloperWeek 2020に参加したとき、IBMのCTOによるトークの中でも開発組織に関して似たような話が出ていたのを思い出した。IBMにはスクアッドと呼ばれるチームの単位が存在し、チームリード、プロダクトオーナー、フルスタックエンジニア2名、SRE2名、そしてチーム横断で仕事をするアーキテクトとデザイナというチーム体制で開発が行われる。ソフトウェアエンジニアはサーバーサイドとフロントエンドという括りでは業務を分割しない。このようなrole-agnosticなカルチャーをガレージ・メソッドIBMでは名付けているらしい。

事実、チームをマネジメントする立場にいるとスキルセットの偏りで意図せず開発リソースがだぶつくことはあるあるバナシだ。従業員の稼働量とそれに基づく人件費をカジュアルに調整できるならまだしも、日本ではなかなかそうはいかないというケースが多いのではないか。スキルセット的にリソースがだぶついてしまうと一時的なリファクタリングや改善系のアイテムをやったりするくらいでしか調整できない。

しかし、フルスタックエンジニアの割合が多ければリソースのダブつきは最小限にできるし、横道にそれることなくリソースの100%を本質的な価値提供に集中させられる。もちろん理想論ではあるが。

フロントエンドとしての/サーバーサイドとしての"フルスタック"さ

最近思うがなんとなくフルスタックと一口に言っても、ソフトウェアエンジニアにおける"フルスタック"にはサーバーサイドとしてのフルスタックと、フロントエンドとしてのフルスタックがあるような気がしている。この言い方が妥当かどうかは分からない。

サーバーサイドとしてのフルスタックとは、たとえばRailsやLaravelだったりGoのRevelなどのテンプレートエンジンを用いてフロントエンドまで一気通貫で開発ができるものがそれにあたる。

一方で、フロントエンドとしてのフルスタックとはNext.js/Nuxt.jsなどの登場に端を発し結果的にBlitz.jsRedwoodJSなど誕生に繋がったものを指している。両者はフルスタックの思想が根本的にサーバーサイド由来のものとは異なり、フロントエンドの実装を主としてサーバーサイド実装を抽象化することで、実質的にフロントエンド開発のみでサーバー側実装も含めWebアプリケーションを完成させようとしている。

かつてSails.jsやLoopback.jsなどのサーバーサイドJSフレームワークが出たあたりでよく見た「JSがあればフロントもサーバーも開発できる!」みたいな宣伝文句が若干近いような気もするが、今思えば仮にサーバーサイドがJSで書けるとしても純朴なSPAとAPIサーバという形である限り、サーバーとの通信プロトコルの検討や、エンドポイント命名、認証基盤実装など、アプリケーションが完成するまでの道のりと考慮事項はRailsのようなフルスタックフレームワークで開発するよりも明らかに高く付く。SPAが顧客要望として本当に必要なら仕方ないが、やる場合にはこれらを背負っていくことになる。

サーバサイドなフルスタックフレームワークでは難の多かったフロントエンドの自由度を獲得する手段としてAPIサーバ+SPAの構成が生まれたものの、サーバーからフロントまでを一気通貫しないことで生まれる追加の開発工数、開発者間でのコミュニケーションコストやドキュメンテーションコストは逃れられない課題になってしまった。

Blitz.jsやRedwoodJSが実現しようとしているサーバーサイド実装の抽象化は「フロントエンド開発の自由度も維持したいがフルスタックの良さも捨てたくない」というこれまでには無い目的追及の過程で生まれた前述の課題に対するひとつの解なのではないか。どんな組織であっても企業体である限り「できるだけ少ない工数で開発したい」というのは共通の目的であるし、それを叶えるのが"フルスタック"であることに違いない。

これまではフルスタックといえばサーバーサイドが中心なイメージがあったが、クラウドネイティブなインフラの成熟やフロントエンド・フレームワークのサーバーサイド領域への進出、高性能なコンピューターリソースの普及によって、これまでにない形でパワーバランスに変化が起きているような印象がある... とはいえ、仮にフレームワークで抽象化されたとしてもサーバーサイドという領域が消えるわけではないので、見方を変えればフロントエンドエンジニアであっても結局はこれまでのフルスタックエンジニアと同じ素養を求められているような気がしなくてもない。

database/sqlとdatabase/sql/driverの関係性に学ぶインターフェイス設計

golangのdatabase/sqlには設計に関するドキュメントが用意されており、これが興味深い。

ドキュメント自体は非常に短い。以下のリンクでサクッと読める。

golang.org

中でも個人的に印象的なのは以下の説明で、ここからgolangにおけるdatabase/sqlの設計思想が見て取れる。

* Provide a generic database API for a variety of SQL or SQL-like
  databases.  There currently exist Go libraries for SQLite, MySQL,
  and Postgres, but all with a very different feel, and often
  a non-Go-like feel.

...

* Separate out the basic implementation of a database driver
  (implementing the sql/driver interfaces) vs the implementation
  of all the user-level types and convenience methods.
  In a nutshell:

  User Code ---> sql package (concrete types) ---> sql/driver (interfaces)
  Database Driver -> sql (to register) + sql/driver (implement interfaces)

ざっくり言うと、golangにおけるdatabase/sqlはユーザーアプリケーションに対して一般的なSQLの操作インターフェイスのみを提供するということを示している。

翻って、database/sqlを利用するアプリケーションはそのインターフェイスを介して操作する対象のSQLデータベースがなんであるかを意識しなくてもよい。アプリケーションはdatabase/sqlインターフェイスにのみ依存している状態であり、まさにクリーンアーキテクチャなどでいうところの抽象への依存(依存性逆転)そのものだ。

database/sql/driverの存在

ではdatabase/sqlSQLiteMySQLを利用するとき、実装がdatabase/sqlに依存しているかというとそんなことはない。実装(ドライバ)はdatabase/sqlとは別にdatabase/sql/driverというインターフェイスに依存している。上の抜粋でいうところの sql/driver (interfaces) がそれにあたる。

便宜的にこれをユーザ層、I/F、ドライバ層と分類してみると、以下のような依存関係になる。

f:id:IzumiSy:20210806113753p:plain
依存関係図

この依存関係図から、golangのdatabase/sqlパッケージは"アプリケーションでSQLを利用したいユーザー"に向けたインターフェイスを提供し、一方でsql/driverは"ドライバの実装者"に向けたインターフェイスを提供しているということが読み取れる。

利用者と実装者でI/Fを分離することのメリットは、インターフェイスを小さくかつ関心の対象を限定したものにできるというところにある。database/sqlにおいてはコネクションプーリングやセッション管理などの概念はインターフェイスに登場せず関心の対象外であり、一方でsql/driverにおいてはSessionResetterやConnectorの形で概念が登場し実装を用意することが要求される。このようにインターフェイスが分離されていることで、利用者は「database/sqlではコネクションプールやセッション管理はしなくてもいいんだな」とインターフェイスから理解でき、思考のコストが減る。

ジェネリックSQLインターフェイスのみを使う限りは依存のほとんどはdatabase/sqlのみに限定できる。もしRDBMS製品に固有な機能を使いたい場合には例外的にドライバから提供されているインターフェイスを直接呼び出せばいい。その場合にはドライバの実装に対する依存が生まれてしまうが、いずれにしても必要に応じて外部装置への依存性を利用者の側から選択できるようになっている点がdatabase/sqlの設計思想の優れたところなのかな、と思う。

参考情報

英語の記事だが、以下の記事も同じようなことを解説している。

eli.thegreenplace.net

makeのlengthとcapacityについて、それからbytes.Bufferのアロケーション処理速度について

izumisy.work

上記の記事をstackoverflowで質問したら回答がついた。

stackoverflow.com

結論から言うと自分はmakeの使い方を間違えていて、makeの第3引数にあたるcapacityを省略してスライスを作成するとそこには空データが埋まってしまうとのこと。

なので、もともとの make([]byte, ALLOC_SIZE) だと最初からALLOC_SIZE分のデータが詰まっている状態から始まるため、そこにappendされデータが増えると再び拡張のアロケーションが発生していた。正しくは make([]byte, 0, ALLOC_SIZE) として宣言するべきで、こうすることでアロケーションの回数は bytes.Buffer を使ったときと同じになる。

この辺の話は以下の記事も詳しい。

note.com

bytes.Bufferとmakeのアロケーション処理速度

bytes.Buffer も内部的にはmakeしているので、 理論上は bytes.Buffer よりもただ単純にmakeするだけのほうが速い。実際にそういうベンチマークの結果もある。

github.com

しかし、appendでデータを詰め始めたときの処理速度には違いが出る。

テストコードはこれ。

package app

import (
    "bytes"
    "testing"
)

const ALLOC_SIZE = 1024 * 1024 * 1024

func BenchmarkFunc1(b *testing.B) {
    for i := 0; i < b.N; i++ {
        v := make([]byte, 0, ALLOC_SIZE)
        fill(v, '1', 0, ALLOC_SIZE)
    }
}

func BenchmarkFunc2(b *testing.B) {
    for i := 0; i < b.N; i++ {
        v := new(bytes.Buffer)
        v.Grow(ALLOC_SIZE)
        fill(v.Bytes(), '2', 0, ALLOC_SIZE)
    }
}

func fill(slice []byte, val byte, start, end int) {
    for i := start; i < end; i++ {
        slice = append(slice, val)
    }
}

この結果は以下になる

at 13:32:12 ❯ go test -bench . -benchmem
goos: darwin
goarch: amd64
pkg: app
cpu: Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
BenchmarkFunc1-8               1        1489858750 ns/op        1073743744 B/op        4 allocs/op
BenchmarkFunc2-8               2         930927369 ns/op        1073742880 B/op        3 allocs/op
PASS
ok      app     4.395s

1.5倍近く処理速度の差がある。なお、Growしない場合にはbytes.Bufferのほうが遅い。

makeとbytes.Bufferでは少なくともアロケーションの回数は同等だがアロケーションの速度はmakeのほうが確実に速い。一方、appendしてデータを詰める処理をし始めるとmakeよりもbytes.BufferしてGrowしたほうが処理速度的に優位になるということが分かった。

ここまでの処理をまとめたベンチマークのテストの結果を整理すると以下になる。

処理 確保 確保 + 書き込み
make 2976935 ns/op 66047937 ns/op
bytes.Buffer 0.3202 ns/op 85604838 ns/op
bytes.Buffer (Grow) 2950193 ns/op 44761869 ns/op

bytes.BufferはGrowしなければ確保の速度は最速だが、データを書き込み始めるととんでもなく遅い。メモリだけ確保しておいてデータは書き込まないということは無いはずなので、あまり意味のない速度な気がする。

少なくともこの中で一番無難なのはbytes.BufferでGrow使うことなのかな。

リテラル値でmakeしたらヒープに乗らなくてアロケーションの回数は少なくなるんじゃないか?という実験

goコンパイラの最適化によって、可変長配列であってもリテラル値でlength指定されていればコンパイルする時点で確保するデータサイズが決定し結果的にランタイム時にアロケーションを発生させなくなるのではないかという仮説。

テストコード

package app

import (
    "bytes"
    "testing"
)

const ALLOC_SIZE = 64 * 1024

func BenchmarkFunc1(b *testing.B) {
    for i := 0; i < b.N; i++ {
        v := make([]byte, ALLOC_SIZE)
        fill(v, '1', 0, ALLOC_SIZE)
    }
}

func BenchmarkFunc2(b *testing.B) {
    for i := 0; i < b.N; i++ {
        b := new(bytes.Buffer)
        b.Grow(ALLOC_SIZE)
        fill(b.Bytes(), '2', 0, ALLOC_SIZE)
    }
}

func fill(slice []byte, val byte, start, end int) {
    for i := start; i < end; i++ {
        slice = append(slice, val)
    }
}

これを実行した結果が以下。

at 19:05:47 ❯ go test -bench . -benchmem -gcflags=-m
# app [app.test]
./main_test.go:25:6: can inline fill
./main_test.go:10:6: can inline BenchmarkFunc1
./main_test.go:13:7: inlining call to fill
./main_test.go:20:9: inlining call to bytes.(*Buffer).Grow
./main_test.go:21:15: inlining call to bytes.(*Buffer).Bytes
./main_test.go:21:7: inlining call to fill
./main_test.go:10:21: b does not escape
./main_test.go:12:12: make([]byte, ALLOC_SIZE) escapes to heap
./main_test.go:20:9: BenchmarkFunc2 ignoring self-assignment in bytes.b.buf = bytes.b.buf[:bytes.m·3]
./main_test.go:17:21: b does not escape
./main_test.go:19:11: new(bytes.Buffer) does not escape
./main_test.go:25:11: slice does not escape
# app.test
/var/folders/45/vh6dxx396d590hxtz7_9_smmhqf0sq/T/go-build1328509211/b001/_testmain.go:35:6: can inline init.0
/var/folders/45/vh6dxx396d590hxtz7_9_smmhqf0sq/T/go-build1328509211/b001/_testmain.go:43:24: inlining call to testing.MainStart
/var/folders/45/vh6dxx396d590hxtz7_9_smmhqf0sq/T/go-build1328509211/b001/_testmain.go:43:42: testdeps.TestDeps{} escapes to heap
/var/folders/45/vh6dxx396d590hxtz7_9_smmhqf0sq/T/go-build1328509211/b001/_testmain.go:43:24: &testing.M{...} escapes to heap
goos: darwin
goarch: amd64
pkg: app
cpu: Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
BenchmarkFunc1-8            8565            118348 ns/op          393217 B/op          4 allocs/op
BenchmarkFunc2-8           23332             53043 ns/op           65536 B/op          1 allocs/op
PASS
ok      app     2.902s

うーむ make([]byte, ALLOC_SIZE) escapes to heap と言われているのでヒープに乗ってしまった。仮説は間違ってたっぽい。

結局比較に使っている new(bytes.Buffer) のほうがアロケーションの回数も使うメモリの量も小さいと出た。go力が低すぎて理由が分からない。

(答え合わせの続編↓) izumisy.work