自分が開発しているgo-cleanarchitectureの中でドメインイベントのPublisher実装としてredigoを使ったPubSubアダプタの実装を用意したが、思ったよりもredigoを使ったRedisのPubSub周りに関連する実装があまりネットに転がっていなかった。
domainsパッケージなどgo-cleanarchitecture固有の実装も絡んでいるが、それでも参考になりそうなのでアダプタの実装コードを抜粋として残しておく。
https://github.com/IzumiSy/go-cleanarchitecture/blob/master/adapters/pubsub/redis.go
package pubsub
import (
"encoding/json"
"fmt"
"go-cleanarchitecture/domains"
"go-cleanarchitecture/domains/errors"
"github.com/gomodule/redigo/redis"
)
type Subscriber = func(payload []byte) error
type RedisAdapter struct {
conn redis.Conn
psc redis.PubSubConn
subscribers map[string]Subscriber
logger domains.Logger
}
var _ domains.EventPublisher = RedisAdapter{}
func NewRedisAdapter(logger domains.Logger) (error, RedisAdapter) {
pubConn, err := redis.Dial("tcp", "redis:6379")
if err != nil {
return err, RedisAdapter{}
}
subConn, err := redis.Dial("tcp", "redis:6379")
if err != nil {
return err, RedisAdapter{}
}
return nil, RedisAdapter{
conn: pubConn,
psc: redis.PubSubConn{Conn: subConn},
subscribers: map[string]Subscriber{},
logger: logger,
}
}
func (adapter RedisAdapter) Publish(event domains.Event) errors.Domain {
eventBytes, err := json.Marshal(event)
if err != nil {
return errors.Postconditional(err)
}
_, err = adapter.conn.Do("PUBLISH", string(event.Name()), eventBytes)
return errors.Postconditional(err)
}
func (adapter RedisAdapter) RegisterSubscriber(eventName domains.EventName, subscriber func(payload []byte) error) {
adapter.subscribers[string(eventName)] = subscriber
}
func (adapter RedisAdapter) Listen() {
var channels []string
for c := range adapter.subscribers {
channels = append(channels, c)
}
if err := adapter.psc.Subscribe(redis.Args{}.AddFlat(channels)...); err != nil {
adapter.logger.Error(fmt.Sprintf("Failed to start listening: %s", err.Error()))
return
}
for {
switch n := adapter.psc.Receive().(type) {
case error:
adapter.logger.Error(fmt.Sprintf("Error received: %s", n.Error()))
return
case redis.Message:
subscriber, ok := adapter.subscribers[n.Channel]
if ok {
subscriber(n.Data)
}
case redis.Subscription:
switch n.Kind {
case "subscribe":
adapter.logger.Info(fmt.Sprintf("%s subscribed", n.Channel))
case "unsubscribed":
adapter.logger.Info(fmt.Sprintf("%s unsubscribed", n.Channel))
}
}
}
}
個人的にハマったのはredigoでRedisのPubSubを利用するためにはPubslisher/Subscriberでそれぞれ個別のコネクションを用意しておかないといけないこと。
これが分からなくて数時間悩んだ。