自分が開発しているgo-cleanarchitectureの中でドメインイベントのPublisher実装としてredigoを使ったPubSubアダプタの実装を用意したが、思ったよりもredigoを使ったRedisのPubSub周りに関連する実装があまりネットに転がっていなかった。
domainsパッケージなどgo-cleanarchitecture固有の実装も絡んでいるが、それでも参考になりそうなのでアダプタの実装コードを抜粋として残しておく。
https://github.com/IzumiSy/go-cleanarchitecture/blob/master/adapters/pubsub/redis.go
package pubsub import ( "encoding/json" "fmt" "go-cleanarchitecture/domains" "go-cleanarchitecture/domains/errors" "github.com/gomodule/redigo/redis" ) // サブスクライバ実装のインターフェイス type Subscriber = func(payload []byte) error // PubSubアダプタ実装の構造体 type RedisAdapter struct { conn redis.Conn psc redis.PubSubConn subscribers map[string]Subscriber logger domains.Logger } // RedisAdapterがdomains.EventPublisherの実装を満たしているかを型チェック var _ domains.EventPublisher = RedisAdapter{} func NewRedisAdapter(logger domains.Logger) (error, RedisAdapter) { // publish用のredisコネクション pubConn, err := redis.Dial("tcp", "redis:6379") if err != nil { return err, RedisAdapter{} } // subscribe用のredisコネクション subConn, err := redis.Dial("tcp", "redis:6379") if err != nil { return err, RedisAdapter{} } return nil, RedisAdapter{ conn: pubConn, psc: redis.PubSubConn{Conn: subConn}, subscribers: map[string]Subscriber{}, logger: logger, } } // ドメインイベントをPublishする func (adapter RedisAdapter) Publish(event domains.Event) errors.Domain { eventBytes, err := json.Marshal(event) if err != nil { return errors.Postconditional(err) } _, err = adapter.conn.Do("PUBLISH", string(event.Name()), eventBytes) return errors.Postconditional(err) } // サブスクライバを登録する func (adapter RedisAdapter) RegisterSubscriber(eventName domains.EventName, subscriber func(payload []byte) error) { adapter.subscribers[string(eventName)] = subscriber } // イベントのpublishを監視するgoroutineを起動する func (adapter RedisAdapter) Listen() { var channels []string for c := range adapter.subscribers { channels = append(channels, c) } if err := adapter.psc.Subscribe(redis.Args{}.AddFlat(channels)...); err != nil { adapter.logger.Error(fmt.Sprintf("Failed to start listening: %s", err.Error())) return } for { switch n := adapter.psc.Receive().(type) { case error: adapter.logger.Error(fmt.Sprintf("Error received: %s", n.Error())) return case redis.Message: // publishされたメッセージを補足した(n.Channelに対象のチャネル文字列が入っている) subscriber, ok := adapter.subscribers[n.Channel] if ok { // 対象のサブスクライバがあればデータを渡して実行する // 本当はここでエラーが出たらリトライを実行したほうが良い(Exponential Backoffとかで) subscriber(n.Data) } case redis.Subscription: switch n.Kind { case "subscribe": // Channelがsubscribeされた adapter.logger.Info(fmt.Sprintf("%s subscribed", n.Channel)) case "unsubscribed": // Channelのsubscribeが解除された adapter.logger.Info(fmt.Sprintf("%s unsubscribed", n.Channel)) } } } }
個人的にハマったのはredigoでRedisのPubSubを利用するためにはPubslisher/Subscriberでそれぞれ個別のコネクションを用意しておかないといけないこと。
これが分からなくて数時間悩んだ。