Runner in the High

技術のことをかくこころみ

redigoを使ったPubSubアダプタの実装

自分が開発しているgo-cleanarchitectureの中でドメインイベントのPublisher実装としてredigoを使ったPubSubアダプタの実装を用意したが、思ったよりもredigoを使ったRedisのPubSub周りに関連する実装があまりネットに転がっていなかった。

domainsパッケージなどgo-cleanarchitecture固有の実装も絡んでいるが、それでも参考になりそうなのでアダプタの実装コードを抜粋として残しておく。

https://github.com/IzumiSy/go-cleanarchitecture/blob/master/adapters/pubsub/redis.go

package pubsub

import (
    "encoding/json"
    "fmt"
    "go-cleanarchitecture/domains"
    "go-cleanarchitecture/domains/errors"

    "github.com/gomodule/redigo/redis"
)

// サブスクライバ実装のインターフェイス
type Subscriber = func(payload []byte) error

// PubSubアダプタ実装の構造体
type RedisAdapter struct {
    conn        redis.Conn
    psc         redis.PubSubConn
    subscribers map[string]Subscriber
    logger      domains.Logger
}

// RedisAdapterがdomains.EventPublisherの実装を満たしているかを型チェック
var _ domains.EventPublisher = RedisAdapter{}

func NewRedisAdapter(logger domains.Logger) (error, RedisAdapter) {
    // publish用のredisコネクション
    pubConn, err := redis.Dial("tcp", "redis:6379")
    if err != nil {
        return err, RedisAdapter{}
    }

    // subscribe用のredisコネクション
    subConn, err := redis.Dial("tcp", "redis:6379")
    if err != nil {
        return err, RedisAdapter{}
    }

    return nil, RedisAdapter{
        conn:        pubConn,
        psc:         redis.PubSubConn{Conn: subConn},
        subscribers: map[string]Subscriber{},
        logger:      logger,
    }
}

// ドメインイベントをPublishする
func (adapter RedisAdapter) Publish(event domains.Event) errors.Domain {
    eventBytes, err := json.Marshal(event)
    if err != nil {
        return errors.Postconditional(err)
    }

    _, err = adapter.conn.Do("PUBLISH", string(event.Name()), eventBytes)
    return errors.Postconditional(err)
}

// サブスクライバを登録する
func (adapter RedisAdapter) RegisterSubscriber(eventName domains.EventName, subscriber func(payload []byte) error) {
    adapter.subscribers[string(eventName)] = subscriber
}

// イベントのpublishを監視するgoroutineを起動する
func (adapter RedisAdapter) Listen() {
    var channels []string
    for c := range adapter.subscribers {
        channels = append(channels, c)
    }

    if err := adapter.psc.Subscribe(redis.Args{}.AddFlat(channels)...); err != nil {
        adapter.logger.Error(fmt.Sprintf("Failed to start listening: %s", err.Error()))
        return
    }

    for {
        switch n := adapter.psc.Receive().(type) {
        case error:
            adapter.logger.Error(fmt.Sprintf("Error received: %s", n.Error()))
            return
        case redis.Message:
            // publishされたメッセージを補足した(n.Channelに対象のチャネル文字列が入っている)
            subscriber, ok := adapter.subscribers[n.Channel]
            if ok {
                // 対象のサブスクライバがあればデータを渡して実行する
                // 本当はここでエラーが出たらリトライを実行したほうが良い(Exponential Backoffとかで)
                subscriber(n.Data)
            }
        case redis.Subscription:
            switch n.Kind {
            case "subscribe":
                // Channelがsubscribeされた
                adapter.logger.Info(fmt.Sprintf("%s subscribed", n.Channel))
            case "unsubscribed":
                // Channelのsubscribeが解除された
                adapter.logger.Info(fmt.Sprintf("%s unsubscribed", n.Channel))
            }
        }
    }
}

個人的にハマったのはredigoでRedisのPubSubを利用するためにはPubslisher/Subscriberでそれぞれ個別のコネクションを用意しておかないといけないこと。

これが分からなくて数時間悩んだ。