使用golang的channel实现订阅-发布模式

原文地址:这里,省略了一些非重点片段。

先定义一个结构体Pubsub,客户端可以使用它来订阅、发布主题:

type Pubsub struct {
  mu   sync.RWMutex
  subs map[string][]chan string
}

最核心的是subs变量,这个map的value是由string类型的channel组成的slice,每个channel都是对topic的订阅(Roy注:map的key代表了topic)。

这个结构体并不会直接被外部直接使用,客户端通过某些方法和这个结构体交互,先从构造函数开始:

func NewPubsub() *Pubsub {
  ps := &Pubsub{}
  ps.subs = make(map[string][]chan string)
  return ps
}

客户端可以通过Subscribe方法来订阅新的topic,只需要提供topic的名称和一个channel即可:

func (ps *Pubsub) Subscribe(topic string, ch chan string) {
  ps.mu.Lock()
  defer ps.mu.Unlock()

  ps.subs[topic] = append(ps.subs[topic], ch)
}

感谢golang的语法特性我们可以把代码写的如此简洁,如果传入的topic不存在将返回一个空slice,或者将新传入的channel加入已经存在的slice中,不管哪种情形我们都可以获得期望的结果。(Roy注:因为golang中获取的key在map中不存在时,会返回value类型的默认值,这里就是空slice)

Publish方法用于向topic中写入数据:

func (ps *Pubsub) Publish(topic string, msg string) {
  ps.mu.RLock()
  defer ps.mu.RUnlock()

  for _, ch := range ps.subs[topic] {
    ch <- msg
  }
}

这里又一次利用了go的语法特性,如果topic不存在则循环不会执行。

有一个值得注意的地方就是锁,go中最著名的一句话就是"share memory by communicating”。但go也是一门实用至上的语言,当我们跨goroutine共享数据时,如果代码足够清晰使用锁也是可以接受的。在上述代码里使用lock + defer unlock足够简单清晰。

这里我们没实现取消订阅功能,这个留给读者自己练习。

上面的代码还有些小问题,比如channel并没有关闭。这样的代码不够优雅,没法通知订阅者没有新的数据会被发送了。 go语言中发送完数据关闭channel是十分重要的事情,因为这代表job完成并且资源可以被清理掉。

修改一下代码:

type Pubsub struct {
  mu     sync.RWMutex
  subs   map[string][]chan string
  closed bool
}

func (ps *Pubsub) Publish(topic string, msg string) {
  ps.mu.RLock()
  defer ps.mu.RUnlock()

  if ps.closed {
    return
  }

  for _, ch := range ps.subs[topic] {
    ch <- msg
  }
}

func (ps *Pubsub) Close() {
  ps.mu.Lock()
  defer ps.mu.Unlock()

  if !ps.closed {
    ps.closed = true
    for _, subs := range ps.subs {
      for _, ch := range subs {
        close(ch)
      }
    }
  }
}

我们在结构体里新增了close属性并且增加了Close方法,这里有个细节就是订阅者的channel并不是由Pubsub结构体定义的,而是调用Subscribe方法传递进去的,在这里关闭channel合适吗?这是一个好问题, 通常来说,发送端关闭channel是一个惯例,这样才能够通知到接收端没有更多数据了。此外,向一个关闭的channel发送数据将会导致panic,在接收端关闭channel是一个比较危险的事情,因为发送端无法得知channel什么时候被关闭了

这也引起了一个更重要的问题:这些订阅者channel应该在哪里创建?在Pubsub外部建立后传递,还是通过Pubsub来建立?

代码还有一个问题:

for _, ch := range ps.subs[topic] {
  ch <- msg
}

这里使用了无缓冲的channel,如果消息没及时被消费掉,ch <- msg将阻塞消息向其他的channel投递,这并不是我们期望的。使用有缓冲的channel会让程序更加健壮,并且快速的通知所有的订阅者。(除非订阅者有严重的问题,前面的消息也没处理。)

如果订阅者channel在Pubsub外面建立,那么缓冲大小则是由订阅者决定,Pubsub无需去考虑缓冲区大小设定为什么比较合适。但缺点也很明显,就是Pubsub的正确性依赖于订阅者,如果某个订阅者channel没有设定缓冲区则会堵塞其他订阅者。

这里我们修改一下Subcribe方法,在这里创建channel:

func (ps *Pubsub) Subscribe(topic string) <-chan string {
  ps.mu.Lock()
  defer ps.mu.Unlock()

  ch := make(chan string, 1)
  ps.subs[topic] = append(ps.subs[topic], ch)
  return ch
}

这里硬编码缓冲区为1,尽管这是一个不错的默认值,但也可以让客户端通过参数来定义缓冲区大小,或者在创建Pubsub实例时。

目前版本的Pubsub可以创建、关闭通道,责任清晰,订阅者获取到一个channel监听直到这个channel被关闭即可。

这个版本有一个小小的不便之处就是没法像上个版本那样,通过传递一个channel参数实现订阅多个topic。换句话说,使用同一个channel订阅多个topic是会带来问题的,比如会尝试关闭同一个channel而导致panic,这样的话不得不在Close方法中添加一些特殊的处理来避免这种情形出现。(比如维护一个已经关闭的channel集合)

通常来说,我建议避免这种情形出现,保持一个channel订阅一个topic,在客户端想要通过一个range来处理多个主题的情况,可以使用扇入(fan-in)来替代。

当我们讨论危险的ch <- msg可能会导致所有客户端阻塞时,你也许会好奇为什么不使用goroutine来解决问题,比如:

func (ps *Pubsub) Publish(topic string, msg string) {
  ps.mu.RLock()
  defer ps.mu.RUnlock()

  if ps.closed {
    return
  }

  for _, ch := range ps.subs[topic] {
    go func(ch chan string) {
      ch <- msg
    }(ch)
  }
}

这样的话,无论传入的channel缓冲区设定为多少,也不会阻塞其他客户端了。但这可能会引起性能问题,即便goroutine的创建和销毁很快,你真的想每条消息都创建一个goroutine吗?这个问题要取决于具体的应用场景,如果你对这个拿不准,盘他。(benchmark it)

而且性能问题并不是上面代码的最大潜在问题,最大的问题在于这种方案将数据发送到channel的地方和channel被关闭的地方分开了(Roy注:指新开的goroutine不受Pubsub管控),这总是让我感到不安。考虑这种场景:一个特别慢的客户端正在阻塞中,而此时Pubsub被关闭并且尝试关闭对应的channel——尝试关闭一个正在进行操作的channel是十分糟糕的,这引起了资源竞争(race condition),最容易产生bug。而在原始版本里我们通过Publish的锁来避免来这种情况。

这篇文章主要目的是为了展示在一些简单而又功能齐全的场景下如何做选择,go语言的channel强大但并不是魔法,一些关于职责和顺序的问题依然会出现,从多个角度考虑问题是十分有意义的。我个人倾向Subscribe来创建channel并返回,恕我直言,这个方式在概念上最清晰,因为channel所属最集中。Pubsub创建他们、向他们发送消息,关闭他们。对于客户端来说,订阅的channel生命周期很清晰:获得一个由Subscribe创建的channel并且从其中获得数据,直到channel关闭。调用Pubsub.Close来关闭所有channel来清理环境,如果需要可配置的缓冲channel,也非常容易添加。

完整代码

// PubSub implementation taking a channel as argument in Subscibe, w/o Close.
//
// Eli Bendersky [https://eli.thegreenplace.net]
// This code is in the public domain.
package main

import (
  "fmt"
 "sync"
 "time"
)

type Pubsub struct {
 mu   sync.RWMutex
 subs map[string][]chan string
}

func NewPubsub() *Pubsub {
 ps := &Pubsub{}
 ps.subs = make(map[string][]chan string)
 return ps
}

func (ps *Pubsub) Subscribe(topic string, ch chan string) {
 ps.mu.Lock()
 defer ps.mu.Unlock()

 ps.subs[topic] = append(ps.subs[topic], ch)
}

func (ps *Pubsub) Publish(topic string, msg string) {
 ps.mu.RLock()
 defer ps.mu.RUnlock()

 for _, ch := range ps.subs[topic] {
  ch <- msg
 }
}

func main() {
 ps := NewPubsub()
 makesub := func(topic string) chan string {
  ch := make(chan string, 1)
  ps.Subscribe(topic, ch)
  return ch
 }
 ch1 := makesub("tech")
 ch2 := makesub("travel")
 ch3 := makesub("travel")

 listener := func(name string, ch chan string) {
  for i := range ch {
   fmt.Printf("[%s] got %s\n", name, i)
  }
 }

 go listener("1", ch1)
 go listener("2", ch2)
 go listener("3", ch3)

 pub := func(topic string, msg string) {
  fmt.Printf("Publishing @%s: %s\n", topic, msg)
  ps.Publish(topic, msg)
  time.Sleep(1 * time.Millisecond)
 }

 time.Sleep(50 * time.Millisecond)
 pub("tech", "tablets")
 pub("health", "vitamins")
 pub("tech", "robots")
 pub("travel", "beaches")
 pub("travel", "hiking")
 pub("tech", "drones")

 time.Sleep(50 * time.Millisecond)
}