mongoShake基于go实践应用
金闽金闽 -MongoShake——基于MongoDB的跨数据中心的数据复制平台-阿里云开发者社区
通过阿里云自研的MongoShake开源工具,您可以实现MongoDB数据库间的数据同步,该功能可用于数据分析、灾备和多活等业务场景。本文以云数据库MongoDB实例间的数据实时同步为例介绍配置流程。
1. 下载git clone https://github.com/alibaba/MongoShake
2. 介绍MongoShake是阿里云以Golang语言编写的通用平台型服务工具,它通过读取MongoDB的Oplog操作日志来复制MongoDB的数据以实现特定需求。
MongoShake还提供了日志数据的订阅和消费功能,可通过SDK、Kafka、MetaQ等方式的灵活对接,适用于日志订阅、数据中心同步、Cache异步淘汰等场景
3. 应用场景MongoDB集群间数据的异步复制,免去业务双写开销。MongoDB集群间数据的镜像备份(当前1.0开源版本支持受限)日志离线分析日志订阅数据路由。根据业务需求,结合日志订阅和过滤机制,可以获取关注的数据,达到数据路由的功能。Cache同步。日志分析的结果,知道哪些Cache可以被淘汰,哪些Cache可以进行预加载,反向推动Cache的更新。基于日志的集群监控4. 功能MongoShake从源库抓取oplog数据,然后发送到各个不同的tunnel通道。源库支持:ReplicaSet,Sharding,Mongod,目的库支持:Mongos,Mongod。现有通道类型有:
Direct:直接写入目的MongoDBRPC:通过net/rpc方式连接TCP:通过tcp方式连接File:通过文件方式对接Kafka:通过Kafka方式对接Mock:用于测试,不写入tunnel,抛弃所有数据消费者可以通过对接tunnel通道获取关注的数据,例如对接Direct通道直接写入目的MongoDB,或者对接RPC进行同步数据传输等。此外,用户还可以自己创建自己的API进行灵活接入。下面2张图给出了基本的架构和数据流。
首先前往https://github.com/alibaba/MongoShake下载mongoshake,如下两种方式都可以
go get github.com/alibaba/MongoShake
git clone https://github.com/alibaba/MongoShake
在项目目录中主要关注如下两个目录
github.com/mongoShake/cmd
├── collector
│ ├── collector.go
│ └── sanitize.go
└── receiver
└── receiver.go
其中collector就是启动mongoShake监听mongodb的入口,而receiver则是用于接收collector监听mongo中的oplogs日志变化的数据处理程序理论上需要根据语言的不同而需进行独立开发,因为mongoShake整体的实现采用的是go故此receiver本身也是基于go实现的。
如下为配置文件目录地址
github.com/mongoShake/conf
├── collector.conf
└── receiver.conf
其中与启动mongoShake实现监听的主要配置是collector.conf,如下是我选择整个配置中较为常用的几项配置信息
# 源MongoDB连接串信息,逗号分隔同一个副本集内的结点,分号分隔分片sharding实例,免密模式
mongo_urls = mongodb://192.168.145.10:27017
# tunnel pipeline type. now we support rpc,file,kafka,mock,direct
# 通道模式。
tunnel = rpc
# tunnel.address = mongodb://127.0.0.1:20080
tunnel.address = 127.0.0.1:1234
# raw是默认的类型,其采用聚合的模式进行写入和
# 读取,但是由于携带了一些控制信息,所以需要专门用receiver进行解析。
# json以json的格式写入kafka,便于用户直接读取。
# bson以bson二进制的格式写入kafka。
tunnel.message = raw
# 黑白名单过滤,目前不支持正则,白名单表示通过的namespace,黑名单表示过滤的namespace,
# 不能同时指定。分号分割不同namespace,每个namespace可以是db,也可以是db.collection。
filter.namespace.black =
filter.namespace.white =
# checkpoint的具体写入的MongoDB地址,如果不配置,对于副本集和分片集群都将写入源库(db=mongoshake)
# 2.4版本以后不需要配置为源端cs的地址。
checkpoint.storage.url = mongodb://127.0.0.1:27017
其他的配置信息比较多可以看官方文档也可以看配置注释(是中文的),第一次使用只需要关注如下三个配置信息即可。案例采用rpc为示例。
# mongo源数据地址
mongo_urls = mongodb://192.168.145.10:27017
# tunnel pipeline type. now we support rpc,file,kafka,mock,direct
# 通道模式。采集的数据发送方式
tunnel = rpc
# 数据发送的目的地
tunnel.address = 127.0.0.1:1234
6. 启动与使用6.1 启动启动方式你可以选择在下载的目录下执行 ./build.sh 将整个程序编译,需注意要安装go版本选择1.15以上。本次案例直接采用go启动程序,如果出现问题也好调试。
go run cmd/collector/* -conf=/www/go/src/github.com/MongoShake/conf/collector.conf
(如果是编译的则会生成相关系统的可执行脚本,只需执行 collector.* -conf 即可启动)
[root@localhost MongoShake]# ll ./bin/
总用量 109868
-rwxr-xr-x. 1 root root 20191856 3月 23 17:29 collector.darwin
-rwxr-xr-x. 1 root root 20468754 3月 23 17:29 collector.linux
-rwxr-xr-x. 1 root root 20406272 3月 23 17:29 collector.windows
-rwxr-xr-x. 1 root root 9534 3月 23 17:29 comparison.py
-rwxr-xr-x. 1 root root 13808 3月 23 17:29 hypervisor
-rwxr-xr-x. 1 root root 9400 3月 23 17:29 mongoshake-stat
-rwxr-xr-x. 1 root root 17005680 3月 23 17:29 receiver.darwin
-rwxr-xr-x. 1 root root 17232383 3月 23 17:29 receiver.linux
-rwxr-xr-x. 1 root root 17138688 3月 23 17:29 receiver.windows
-rwxr-xr-x. 1 root root 621 3月 23 17:29 start.sh
-rwxr-xr-x. 1 root root 373 3月 23 17:29 stop.sh
6.2 配置mongo对mongo的主要配置,如果是单节点则需要开启mongo的oplogs项
replSet=single
然后可以新建一个用户赋予复制的权限。如下是mongo的相关权限
Read:允许用户读取指定数据库
readWrite :允许用户读写指定数据库
dbAdmin:允许用户在指定数据库中执行管理函数,如索引创建、删除,查看统计或访问system.profile
userAdmin:允许用户向system.users集合写入,可以找指定数据库里创建、删除和管理用户
clusterAdmin:只在admin数据库中可用,赋予用户所有分片和复制集相关函数的管理权限。
readAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的读权限
readWriteAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的读写权限
userAdminAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的userAdmin权限
dbAdminAnyDatabase:只在admin数据库中可用,赋予用户所有数据库的dbAdmin权限。
root:只在admin数据库中可用。超级账号,超级权限
创建角色 -- 当然也可以选择不整(仅仅推荐是在测试学习的时候不整)
use admin
//创建超级管理员,操作齐他users
db.createUser(
{
user: "root",
pwd: "root",
roles: [ { role: "root", db: "admin" } ]
}
)
db.auth("root","root")
//创建其他用户
db.createUser(
{
user: "admin",
pwd: "0000",
roles: [
{ role: "readAnyDatabase", db: "admin" },
{role:"read",db:"local"}
]
}
)
db.auth("admin","0000")
6.3 使用实际上官方已经提供了相关较好的用例即为receiver我们可以直接使用。
分析 官方receiver的实现首先在入口 cmd/receiver/receiver.go 中,startup启动方法会通过tunnel工厂根据在 conf/receiver.conf配置文件中配置的tunnel方式创建相应的接收器。
// this is the main connector function
// MongoShake/cmd/receiver/receiver.go
func startup() {
factory := tunnel.ReaderFactory{Name: conf.Options.Tunnel}
reader := factory.Create(conf.Options.TunnelAddress)
if reader == nil {
return
}
/*
* create re-players, the number of re-players number is equal to the
* collector worker number to fulfill load balance. The tunnel that message
* sent to is determined in the collector side: `TMessage.Shard`.
*/
repList := make([]tunnel.Replayer, conf.Options.ReplayerNum)
for i := range repList {
repList[i] = replayer.NewExampleReplayer(i)
}
LOG.Info("receiver is starting...")
if err := reader.Link(repList); err != nil {
LOG.Critical("Replayer link to tunnel error %v", err)
return
}
}
// MongoShake/tunnel/tunnel.go
func (factory *ReaderFactory) Create(address string) Reader {
switch factory.Name {
case utils.VarTunnelKafka:
return &KafkaReader{address: address}
case utils.VarTunnelTcp:
return &TCPReader{listenAddress: address}
case utils.VarTunnelRpc:
return &RPCReader{address: address}
case utils.VarTunnelMock:
return &MockReader{}
case utils.VarTunnelFile:
return &FileReader{File: address}
case utils.VarTunnelDirect:
LOG.Critical("direct mode not supported in reader")
return nil
default:
LOG.Critical("Specific tunnel not found [%s]", factory.Name)
return nil
}
}
然后在startup中为接收器添加实际对mongo采集监听数据的处理器Replayer对象
/*
* create re-players, the number of re-players number is equal to the
* collector worker number to fulfill load balance. The tunnel that message
* sent to is determined in the collector side: `TMessage.Shard`.
*/
repList := make([]tunnel.Replayer, conf.Options.ReplayerNum)
for i := range repList {
repList[i] = replayer.NewExampleReplayer(i)
}
在MongoShake/receiver/replayer.go中ExampleReplayer对象的方法主要为如下
func (er *ExampleReplayer) Sync(message *tunnel.TMessage, completion func()) int64
func (er *ExampleReplayer) GetAcked() int64
func (er *ExampleReplayer) handler()
其中Sync是接收消息的入口负责数据的验证解析等相关工作,handler就是我们最终处理数据的核心代码
func (er *ExampleReplayer) handler() {
for msg := range er.pendingQueue {
count := uint64(len(msg.message.RawLogs))
if count == 0 {
// probe request
continue
}
// parse batched message
oplogs := make([]oplog.ParsedLog, len(msg.message.RawLogs))
for i, raw := range msg.message.RawLogs {
oplogs[i] = oplog.ParsedLog{}
if err := bson.Unmarshal(raw, &oplogs[i]); err != nil {
// impossible switch, need panic and exit
LOG.Crashf("unmarshal oplog[%v] failed[%v]", raw, err)
return
}
LOG.Info(oplogs[i]) // just print for test, users can modify to fulfill different needs
// fmt.Println(oplogs[i])
}
if callback := msg.completion; callback != nil {
callback() // exec callback
}
// get the newest timestamp
n := len(oplogs)
lastTs := utils.TimeStampToInt64(oplogs[n-1].Timestamp)
er.Ack = lastTs
LOG.Debug("handle ack[%v]", er.Ack)
// add logical code below
}
}
而我们的业务代码只需要增加到 // add logical code below 之后即可,数据对象是 []oplog.ParsedLog
使用可以利用go启动即可,你可以在 // add logical code below 增加关于 oplogs 结果的打印输出
go run cmd/receiver/* -conf=/www/go/src/github.com/MongoShake/conf/receiver.conf
最佳实现(只针对数据CURD操作)在基于上面代码的情况下,定义好处理的对象与接口以及路由
import "github.com/alibaba/MongoShake/v2/oplog"
type Handler interface {
Namespace() string
Insert(oplogs *oplog.ParsedLog) (err error)
Update(oplogs *oplog.ParsedLog) (err error)
Delete(oplogs *oplog.ParsedLog) (err error)
}
type Routes map[string]Handler
var routes = make(Routes)
func RegistryRouter(handler Handler) {
routes[handler.Namespace()] = handler
}
修改MongoShake/receiver/replayer.go中ExampleReplayer.handler方法
func (r *Replayer) handler() {
for msg := range r.pendingQueue {
count := uint64(len(msg.message.RawLogs))
if count == 0 {
continue
}
oplogs, err := r.parseMsg(msg)
if err != nil {
log.Error("parseMsg msg err ", zap.Error(err))
return
}
if callback := msg.completion; callback != nil {
callback()
}
// get the newest timestamp
n := len(oplogs)
lastTs := utils.TimeStampToInt64(oplogs[n-1].Timestamp)
r.Ack = lastTs
// add logical code below
for _, oplog := range oplogs {
handler, ok := routes[oplog.Namespace]
if ok {
var err error
op := Op(oplog.Operation)
switch op {
case OpI:
err = handler.Insert(oplog)
case OpU:
err = handler.Update(oplog)
case OpD:
err = handler.Delete(oplog)
}
if err != nil {
// 同步失败处理
log.Errorf("oplog sync err mongo db = %s, option = %s, query = %s, err = %s", oplog.Namespace, op.String(), oplog.Query, err.Error())
}
} else {
log.Warn("oplog hander not found", zap.String("namespace", oplog.Namespace))
}
}
}
}
在业务端只需要实现Handler接口并进行注册即可,注意注册的Namespace是mongo中的 collection.db 的名称格式
示例代码https://gitee.com/dn-jinmin/mongoShake/tree/master/tunnelx
import (
"fmt"
"gitee.com/dn-jinmin/mongoShake/tunnelx"
)
func loadHandler() []tunnelx.Handler {
return []tunnelx.Handler{
test.NewUser(),
}
}
func Run() {
handlers := loadHandler()
for _, handler := range handlers {
tunnelx.RegistryRouter(handler)
}
tunnelx.Startup(&tunnelx.Configuration{
Tunnel: configs.MongoShake.Tunnel,
TunnelAddress: configs.MongoShake.TunnelAddress,
ReplayerNum: configs.MongoShake.Replayer,
})
select {}
}
type User struct {
}
func NewUser() tunnelx.Handler {
return &Manager{
}
}
func (m *User) Namespace() string {
return "test.user"
}
func (m *User) Insert(oplogs *oplog.ParsedLog) (err error) {
fmt.Println("参数:", oplogs.Object)
return nil
}
func (m *User) Update(oplogs *oplog.ParsedLog) (err error) {
fmt.Println("参数:", oplogs.Object)
fmt.Println("条件:", oplogs.query)
return nil
}
func (m *User) Delete(oplogs *oplog.ParsedLog) (err error) {
fmt.Println("条件:", oplogs.Object)
return nil
}