用 Go 写一个简单消息队列(一):定义消息和基础工具
与昊与昊 -概述
消息队列相信大家都不陌生,平时在一些需要解耦、高并发的场景下经常能看见它们的身影,Kafka、RabbitMQ 这些常用的消息队列现在甚至已经成为后端程序员的必需技能了。那么一个消息队列的基础功能有哪些,是如何实现的?现在我们就用 Go 语言来实现一个简单的单机版消息队列,借以了解消息队列的原理。
这个消息队列的实现参考了 Go 语言中最受欢迎的消息队列 nsq 的实现,取名就叫 smq (simple message queue),代码已经上传到 Github 上:https://github.com/yhao1206/SMQ。为了尽可能简洁明了,这个消息队列的功能都很基础。由于时间紧迫,加上本人也是 Go 语言的菜鸟,水平有限,有什么错误或是建议,欢迎一起讨论。
主要组件topic:一个 topic 就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会创建 topic。channel:channel 与消费者相关,每当一个发布者发送一条消息到一个 topic,消息会被复制到 topic 下面所有的 channel 上,消费者通过 channel 读取消息。同一个 channel 可以由多个消费者同时连接,以达到负载均衡的效果。message:s消息是数据流的抽象,消费者可以选择结束消息,表明它们已被正常处理,或者重新将它们排队待到后面再进行处理。smqd:smqd 是一个守护进程,负责接收,排队,投递消息给客户端。topic、channel、consumer 之间的关系可以参考下图:
话不多说,让我们直入主题,首先第一步就是定义消息。
我们定义的消息就是普通的字节数组,前 16 位是 uuid,用作消息的唯一标识,后面完成和重排消息时需要用到。后面就是消息本身的内容,还提供了几个导出的封装方法。
message.go
package message
type Message struct {
data []byte
}
func NewMessage(data []byte) *Message {
return &Message{
data: data,
}
}
func (m *Message) Uuid() []byte {
return m.data[:16]
}
func (m *Message) Body() []byte {
return m.data[16:]
}
func (m *Message) Data() []byte {
return m.data
}
工具库消息体需要 uuid 作为唯一标识,那么我们需要一个 uuid 生成器,我们直接使用一个工厂,不断地朝一个 chan 中写入uuid,代码如下:
uuid.go
package util
import (
"crypto/rand"
"fmt"
"io"
"log"
)
var UuidChan = make(chan []byte, 1000)
func UuidFactory() {
for {
UuidChan <- uuid()
}
}
func uuid() []byte {
b := make([]byte, 16)
_, err := io.ReadFull(rand.Reader, b)
if err != nil {
log.Fatal(err)
}
return b
}
func UuidToStr(b []byte) string {
return fmt.Sprintf("%x-%x-%x-%x-%x", b[:4], b[4:6], b[6:8], b[8:10], b[10:])
}
另外,由于 channel 和 topic 的实现中需要大量使用 Go 语言中的管道,甚至需要两个 goroutine 之间通过同一个管道来传递信息,多以我们事先定义好一个结构体方便后续 goroutine 之间通信:
chan_req.go
package util
type ChanReq struct {
Variable interface{}
RetChan chan interface{}
}
type ChanRet struct {
Err error
Variable interface{}
}
由于本文只是开篇,内容比较简单,只是一些“地基”,不过我们后续会在这块地基上添砖加瓦,完成一个可用的消息队列。
项目地址:https://github.com/yhao1206/SMQ
相关阅读:
php介绍
PHP即“超文本预处理器”,是一种通用开源脚本语言。PHP是在服务器端执行的脚本语言,与C语言类似,是常用的网站编程语言。PHP独特的语法混合了C、Java、Perl以及 PHP 自创的语法。利于学习,使用广泛,主要适用于Web开发领域。
Tags 标签
go消息队列php扩展阅读
隐藏apache版本信息
2018-09-30 10:56:15 []CentOS 6.5安装php5.6
2018-09-30 11:36:53 []PHP版ZIP压缩解压类库
2018-12-22 13:11:00 []CentOS7.2安装 PHP7.3.4 操作详细教程
2020-06-28 19:09:43 []PHP 设置脚本超时时间、PHP脚本内存限制设置
2020-06-28 19:09:43 []PHP 函数filesize获取文件大小错误,一直不变
2020-06-28 19:09:43 []Linux php: command not found
2020-02-05 01:30:13 []php 缓冲区 buffer 原理
2020-06-28 19:09:43 []PHP中三种设置脚本最大执行时间的方法
2020-06-28 19:17:34 []加个好友,技术交流
