consul 学习
consul 分 server、client 两种角色,其中 server 可以理解为 rpc server、它也是 raft quroum 中的成员,而 client 可以理解为 rpc client。
server、client 角色都可以接收应用客户端的请求,在 http/dns 的 handler 中请求都以 local/remote RPC 调用的方式发给 rpc server,如果该 rpc server 不是 raft 的 leader 节点,该请求会再次以 remote RPC 调用的方式转发给 leader 节点的 rpc server 进行最终的处理。
server、client 这两种角色都是 serf 集群中的节点。
http/dns 默认监听 127.0.0.1 地址,而 rpc server/raft/serf 也默认监听 127.0.0.1 地址,因此如果没有通过 -bind 选项显式指定监听 0.0.0.0 地址(即同时监听本机所有 ipv4 和 ipv6 地址[1]),则需要通过 -advertise 选项指定一个用于节点之外 rpc client/raft peer 进行连接的地址,各服务的默认监听端口如下图所示:
如果在同一个节点启动两个 consul 实例,则第二个实例需要显式指定所有的端口:
# 创建集群
consul agent -server -data-dir c0 -advertise 192.168.137.3 -bind 192.168.137.3 -bootstrap
# 启动
consul agent -server -data-dir c0 -advertise 192.168.137.3 -bind 192.168.137.3
# 启动第二个实例
consul agent -server -data-dir c1 -advertise 192.168.137.3 -bind 192.168.137.3 -server-port 8200 -serf-lan-port 8201 -serf-wan-port 8202 -dns-port 8203 -http-port 8204 -node c1
# 可以指定一个实例,添加另外一个实例,注意实例地址是 serf 的地址,当然一般是待添加的节点主动加入集群(即下面第一种方式)
consul join -http-addr=http://127.0.0.1:8204 192.168.137.3:8301
consul join -http-addr=http://127.0.0.1:8500 192.168.137.3:8201
# 查看状态
consul info -http-addr http://127.0.0.1:8204
# 查看 serf 成员
consul members -http-addr http://localhost:8404
# serf 成员 graceful leave
consul leave -http-addr http://localhost:8500
# serf 成员 force leave
consul force-leave -http-addr http://localhost:8204 c2
代码实现
command/agent/agent.go
func (c *cmd) init() {
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
config.AddFlags(c.flags, &c.flagArgs)
c.help = flags.Usage(help, c.flags)
}
func (c *cmd) run(args []string)
config := c.readConfig()
b, err := config.NewBuilder(c.flagArgs)
cfg, err := b.BuildAndValidate()
return &cfg
lib.InitTelemetry(config.Telemetry)
agent, err := agent.New(config)
agent.Start()
c.startupJoin(agent, config)
agent.StartSync()
agent/agent.go
func (a *Agent) Start()
c := a.config
a.setupNodeID(c)
// create the local state
a.State = local.NewState(LocalConfig(c), a.logger, a.tokens)
// create the state synchronization manager which performs
// regular and on-demand state synchronizations (anti-entropy).
a.sync = ae.NewStateSyncer(a.State, c.AEInterval, a.shutdownCh, a.logger)
// create the config for the rpc server/client
consulCfg, err := a.consulConfig()
consulCfg.ServerUp = a.sync.SyncFull.Trigger
// Setup either the client or the server.
if c.ServerMode {
server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens)
a.delegate = server
} else {
client, err := consul.NewClientLogger(consulCfg, a.logger)
a.delegate = client
}
a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger
a.loadServices(c)
a.loadProxies(c)
a.loadChecks(c)
a.loadMetadata(c)
go a.reapServices()
go a.handleEvents()
go a.sendCoordinate()
a.listenAndServeDNS()
servers, err := a.listenHTTP()
for _, srv := range servers {
a.serveHTTP(srv)
}
a.listenAndServeGRPC()
a.reloadWatches(a.config)
go a.retryJoinLAN()
go a.retryJoinWAN()
参考资料
[1] net: clarify Listen on 0.0.0.0 behavior
https://go-review.googlesource.com/c/go/+/45771/4/src/net/dial.go
最后修改于 2019-03-15