RabbitMQ的Channel池
Issue
“channel id space exhausted”,部分消息无法进行正常地发布,这便是我们在生产环境中遇到的问题。虽然对connection进行了复用, 但在高并发时,channel id还是耗尽了。在当前的AMQP的releases版本中,打开channel的最大默认值是2047。基于这个情况,我们决定复用channel, 构造一个channel pool。
方案预研
开源库
Java的组件中,早就引入了channel pool的概念。所以刚开始的思路,肯定是去找开源的库。 houseofcat/turbocookedrabbit是golang中star较高的一个库, 但是看了源码之后,发现其并没有re-use channel, 跟我们的建设思路严重不符,遂抛弃之。对这个库有兴趣的朋友,可以看一下。
sql.driver的连接池
golang的"database/sql"是操作数据库时常用的包,这个包定义了一些sql操作的接口,具体的实现还是需要不同的数据库来实现。 “database/sql"除了定义接口之外还有一个重要的功能:连接池,在其他网络通信时可以借鉴其实现。 我们RabbitMQ的channel pool就是参考database connection pool来实现的,可以说是超级简化版的pool。虽然说channel和connection是完全不一样的概念,但是不妨碍我们学习上层的封装。
channel pool的实现
流程图
getChannelFromPool
func (m *AmqpClient) getChannelFromPool() (*channelHost, error) {
m.mu.Lock()
numFree := len(m.freeChannel)
if numFree > 0 {
ch := m.freeChannel[0]
copy(m.freeChannel, m.freeChannel[1:])
m.freeChannel = m.freeChannel[:numFree-1]
m.mu.Unlock()
return ch, nil
}
if m.numOpenChannel >= m.maxChannel {
req := make(chan channelRequest)
reqKey := m.nextRequestsKeyLocked()
m.channelRequests[reqKey] = req
m.mu.Unlock()
ret, ok := <-req
if !ok {
return nil, errors.New("error request")
}
return ret.cHost, nil
}
m.numOpenChannel++
channel, err := m.conn.Channel()
if err != nil {
m.numOpenChannel--
m.mu.Unlock()
return nil, errors.New("create channel error")
}
return &channelHost{channel: channel, isUse: false}, nil
}
- 首先通过len(m.freeChannel)判断pool中是否有空闲的channel, 如果存在空闲的channel, 直接返回给client,用于发布消息
- numOpenChannel已经建立或者等待建立的channel数,如果numOpenChannel>maxChannel, 证明当前的存在的channel数已经到达设置的上限,client的请求被加入等待队列中,阻塞等待
- 创建一个channel,首先将numOpenChannel加1,然后再创建连接。如果创建失败,将numOpenChannel减1
returnChannel
func (m *AmqpClient) returnChannel(cHost *channelHost) {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
c := len(m.channelRequests)
if c > 0 {
var req chan channelRequest
var reqKey uint64
for reqKey, req = range m.channelRequests {
break
}
delete(m.channelRequests, reqKey)
cHost.isUse = true
req <- channelRequest{
cHost: cHost,
}
}
if m.maxIdleChannel > len(m.freeChannel) {
m.freeChannel = append(m.freeChannel, cHost)
} else {
err := cHost.channel.Close()
if err != nil {
return
}
}
m.numOpenChannel--
}
- len(m.channelRequests)>0, 证明队列中有等待的请求,channel re-use应该优先考虑等待队列
- 注意,代码中使用map作为等待队列,range方法帮助我们随机选择一个等待的请求
- m.maxIdleChannel > len(m.freeChannel), 说明channel pool中连接池还有剩余空间,这时候可以将channel归还到pool
- 如果channel pool已经满了,直接close该channel