consul 学习

consul 分 server、client 两种角色,其中 server 可以理解为 rpc server、它也是 raft quroum 中的成员,而 client 可以理解为 rpc client。

server、client 角色都可以接收应用客户端的请求,在 http/dns 的 handler 中请求都以 local/remote RPC 调用的方式发给 rpc server,如果该 rpc server 不是 raft 的 leader 节点,该请求会再次以 remote RPC 调用的方式转发给 leader 节点的 rpc server 进行最终的处理。

server、client 这两种角色都是 serf 集群中的节点。

http/dns 默认监听 127.0.0.1 地址,而 rpc server/raft/serf 也默认监听 127.0.0.1 地址,因此如果没有通过 -bind 选项显式指定监听 0.0.0.0 地址(即同时监听本机所有 ipv4 和 ipv6 地址[1]),则需要通过 -advertise 选项指定一个用于节点之外 rpc client/raft peer 进行连接的地址,各服务的默认监听端口如下图所示:

如果在同一个节点启动两个 consul 实例,则第二个实例需要显式指定所有的端口:

# 创建集群
consul agent -server -data-dir c0 -advertise 192.168.137.3 -bind 192.168.137.3 -bootstrap
# 启动
consul agent -server -data-dir c0 -advertise 192.168.137.3 -bind 192.168.137.3

# 启动第二个实例
consul agent -server -data-dir c1 -advertise 192.168.137.3 -bind 192.168.137.3 -server-port 8200 -serf-lan-port 8201 -serf-wan-port 8202 -dns-port 8203 -http-port 8204 -node c1

# 可以指定一个实例,添加另外一个实例,注意实例地址是 serf 的地址,当然一般是待添加的节点主动加入集群(即下面第一种方式)
consul join -http-addr=http://127.0.0.1:8204 192.168.137.3:8301
consul join -http-addr=http://127.0.0.1:8500 192.168.137.3:8201

# 查看状态
consul info -http-addr http://127.0.0.1:8204

# 查看 serf 成员
consul members -http-addr http://localhost:8404

# serf 成员 graceful leave
consul leave -http-addr http://localhost:8500

# serf 成员 force leave
consul force-leave -http-addr http://localhost:8204 c2

代码实现

command/agent/agent.go
func (c *cmd) init() {
  c.flags = flag.NewFlagSet("", flag.ContinueOnError)
  config.AddFlags(c.flags, &c.flagArgs)
  c.help = flags.Usage(help, c.flags)
}
func (c *cmd) run(args []string)
  config := c.readConfig()
    b, err := config.NewBuilder(c.flagArgs)
    cfg, err := b.BuildAndValidate()
    return &cfg
  lib.InitTelemetry(config.Telemetry)
  agent, err := agent.New(config)
  agent.Start()
  c.startupJoin(agent, config)
  agent.StartSync()

agent/agent.go
func (a *Agent) Start()
  c := a.config
  a.setupNodeID(c)
  // create the local state
  a.State = local.NewState(LocalConfig(c), a.logger, a.tokens)
  // create the state synchronization manager which performs
  // regular and on-demand state synchronizations (anti-entropy).
  a.sync = ae.NewStateSyncer(a.State, c.AEInterval, a.shutdownCh, a.logger)

  // create the config for the rpc server/client
  consulCfg, err := a.consulConfig()
  consulCfg.ServerUp = a.sync.SyncFull.Trigger
  // Setup either the client or the server.
  if c.ServerMode {
    server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens)
    a.delegate = server
  } else {
    client, err := consul.NewClientLogger(consulCfg, a.logger)
    a.delegate = client
  }
  a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger

  a.loadServices(c)
  a.loadProxies(c)
  a.loadChecks(c)
  a.loadMetadata(c)

  go a.reapServices()
  go a.handleEvents()
  go a.sendCoordinate()

  a.listenAndServeDNS()
  servers, err := a.listenHTTP()
  for _, srv := range servers {
    a.serveHTTP(srv)
  }
  a.listenAndServeGRPC()
  a.reloadWatches(a.config)

  go a.retryJoinLAN()
  go a.retryJoinWAN()

参考资料

[1] net: clarify Listen on 0.0.0.0 behavior

https://go-review.googlesource.com/c/go/+/45771/4/src/net/dial.go


最后修改于 2019-03-15