· code
Golang分布式应用之etcd
etcd 是一个可靠的分布式 KV 存储数据库,由 CoreOS 开源。Kuberentes 使用 etcd 作为其存储引擎,随着云原生的火热,etcd 也逐渐广泛应用起来。
etcd 除了作为普通的 KV 存储、配置存储,还可以用在以下分布式场景中:
- 服务发现
- 分布式锁
- 选主
- 分布式队列
- 分布式系统协调
- 负载均衡
本文结合 Golang 来编写对应的中间件,所有代码见https://github.com/qingwave/gocorex
服务发现
在分布式系统中,如何能找到所需要访问的服务即服务发现。服务较少时可以直接访问其 IP,但随着业务规模的扩大,维护其地址越来越复杂,如果服务频繁的扩缩容,必须能够实时感应服务的断点变化。 通常有多种方式可以解决
- 系统级别,如 LVS、DNS、Kubernetes 中的 Service、Istio 等
- 微服务注册中心,如 Spring Cloud 中的 Enruka,Dubbo 等
- 借助分布式协调系统 etcd、ZK、Consul 等
服务发现提供的功能包括:
- 服务注册、注销
- 服务宕机或异常时,自动注销
- 感知服务端点变化
借助 etcd 实现服务发现
- 可以通过将端点写同一个目录(相同前缀,如/services/job/endpoint1, /services/job/endpoint2),并通过 Lease 设置一个过期时间,不断刷新 Lease,如果服务宕机,Lease 过期对应端点会自动删除
- 通过 Watch API 可以监听端点变化
主要代码如下
func New(config EtcdDiscoveryConfig) (*EtcdDiscovery, error) {
// 创建session,session会自动续约
session, err := concurrency.NewSession(config.Client, concurrency.WithTTL(config.TTLSeconds))
if err != nil {
return nil, err
}
config.Prefix = strings.TrimSuffix(config.Prefix, "/") + "/"
return &EtcdDiscovery{
EtcdDiscoveryConfig: config,
session: session,
myKey: config.Prefix + config.Key,
services: make(map[string]string),
}, nil
}
func (d *EtcdDiscovery) Register(ctx context.Context) error {
lease := d.session.Lease()
// 注册服务
_, err := d.Client.Put(ctx, d.myKey, d.Val, clientv3.WithLease(lease))
return err
}
func (d *EtcdDiscovery) UnRegister(ctx context.Context) error {
// 注销服务
_, err := d.Client.Delete(ctx, d.myKey)
return err
}
// 监听端点变化
func (d *EtcdDiscovery) Watch(ctx context.Context) error {
// context用来停止监听
d.watchContext, d.watchCancel = context.WithCancel(ctx)
// 首先获取所有端点
resp, err := d.Client.Get(d.watchContext, d.Prefix, clientv3.WithPrefix())
services := make(map[string]string)
for _, kv := range resp.Kvs {
services[string(kv.Key)] = string(kv.Value)
}
d.setServices(services)
// 回调点,用户可自定义
if d.Callbacks.OnStartedDiscovering != nil {
d.Callbacks.OnStartedDiscovering(d.ListServices())
}
defer func() {
if d.Callbacks.OnStoppedDiscovering != nil {
d.Callbacks.OnStoppedDiscovering()
}
}()
defer d.watchCancel()
// 监听目录,通过WithPrefix可以添加子目录变化
ch := d.Client.Watch(d.watchContext, d.Prefix, clientv3.WithPrefix())
for {
select {
case <-d.watchContext.Done():
return nil
case wr, ok := <-ch:
if !ok {
return fmt.Errorf("watch closed")
}
if wr.Err() != nil {
return wr.Err()
}
// 将添加事件同步到本地端点列表
for _, ev := range wr.Events {
key, val := string(ev.Kv.Key), string(ev.Kv.Value)
switch ev.Type {
case mvccpb.PUT:
d.addService(key, val)
case mvccpb.DELETE:
d.delService(key)
}
if d.Callbacks.OnServiceChanged != nil {
event := DiscoveryEvent{Type: mvccpb.Event_EventType_name[int32(ev.Type)], Service: d.serviceFromKv(key, val)}
d.Callbacks.OnServiceChanged(d.ListServices(), event)
}
}
}
}
}
主要实现逻辑如下:
- 创建 Session, Session 中 Lease 会自动续约
- 服务注册时,在目录下创建对应的子目录,并附带 Lease
- 通过 Watch 接口监听目录变化,同步到本地
简单测试下,通过 worker 模拟不同的端点
func main() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 3 * time.Second,
})
if err != nil {
log.Fatalf("failed to create etcd lock: %v", err)
}
defer client.Close()
worker := func(i int, run bool) {
id := fmt.Sprintf("worker-%d", i)
val := fmt.Sprintf("10.0.0.%d", i)
sd, err := etcdiscovery.New(etcdiscovery.EtcdDiscoveryConfig{
Client: client,
Prefix: "/services",
Key: id,
Val: val,
TTLSeconds: 2,
Callbacks: etcdiscovery.DiscoveryCallbacks{
OnStartedDiscovering: func(services []etcdiscovery.Service) {
log.Printf("[%s], onstarted, services: %v", id, services)
},
OnStoppedDiscovering: func() {
log.Printf("[%s], onstoped", id)
},
OnServiceChanged: func(services []etcdiscovery.Service, event etcdiscovery.DiscoveryEvent) {
log.Printf("[%s], onchanged, services: %v, event: %v", id, services, event)
},
},
})
if err != nil {
log.Fatalf("failed to create service etcdiscovery: %v", err)
}
defer sd.Close()
if !run {
if sd.UnRegister(context.Background()); err != nil {
log.Fatalf("failed to unregister service [%s]: %v", id, err)
}
return
}
if err := sd.Register(context.Background()); err != nil {
log.Fatalf("failed to register service [%s]: %v", id, err)
}
if err := sd.Watch(context.Background()); err != nil {
log.Fatalf("failed to watch service: %v", err)
}
}
wg := group.NewGroup()
for i := 0; i < 3; i++ {
id := i
wg.Go(func() { worker(id, true) })
}
go func() {
time.Sleep(2 * time.Second)
worker(3, true)
}()
// unregister
go func() {
time.Sleep(4 * time.Second)
worker(2, false)
}()
wg.Wait()
}
通过结果可以看到,服务可以正常的注册注销,并能实时监听端点变化
2022/08/08 08:44:02 [worker-1], onstarted, services: [{/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2} {/services/worker-0 worker-0 10.0.0.0}]
2022/08/08 08:44:02 [worker-2], onstarted, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2}]
2022/08/08 08:44:02 [worker-0], onstarted, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2}]
2022/08/08 08:44:04 [worker-2], onchanged, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2} {/services/worker-3 worker-3 10.0.0.3}], event: {PUT {/services/worker-3 worker-3 10.0.0.3}}
2022/08/08 08:44:04 [worker-1], onchanged, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2} {/services/worker-3 worker-3 10.0.0.3}], event: {PUT {/services/worker-3 worker-3 10.0.0.3}}
2022/08/08 08:44:04 [worker-0], onchanged, services: [{/services/worker-3 worker-3 10.0.0.3} {/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2}], event: {PUT {/services/worker-3 worker-3 10.0.0.3}}
2022/08/08 08:44:04 [worker-3], onstarted, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-2 worker-2 10.0.0.2} {/services/worker-3 worker-3 10.0.0.3}]
2022/08/08 08:44:06 [worker-1], onchanged, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-3 worker-3 10.0.0.3}], event: {DELETE {/services/worker-2 worker-2 }}
2022/08/08 08:44:06 [worker-3], onchanged, services: [{/services/worker-3 worker-3 10.0.0.3} {/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1}], event: {DELETE {/services/worker-2 worker-2 }}
2022/08/08 08:44:06 [worker-0], onchanged, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-3 worker-3 10.0.0.3}], event: {DELETE {/services/worker-2 worker-2 }}
2022/08/08 08:44:06 [worker-2], onchanged, services: [{/services/worker-0 worker-0 10.0.0.0} {/services/worker-1 worker-1 10.0.0.1} {/services/worker-3 worker-3 10.0.0.3}], event: {DELETE {/services/worker-2 worker-2 }}
分布式锁
在 ECTD 官方库go.etcd.io/etcd/client/v3/concurrency中,已经支持分布式锁。
主要原理与之前通过Redis实现的分布式锁类似,如果目录创建成功则加锁成功,解锁直接删除即可。
etcd 锁的使用
// 创建session并不断刷新
session, err := concurrency.NewSession(client, concurrency.WithTTL(2*time.Second))
if err != nil {
return nil, err
}
mutex := concurrency.NewMutex(session, config.Prefix)
mutex.Lock()
defer mutext.UnLock()
do()....
加锁的核心逻辑如下
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
s := m.s
client := m.s.Client()
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return nil, err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
return resp, nil
}
tryAcquire 通过事务来执行加锁逻辑:
- 判断当前 Key 是否为空,即代码中 Revision 为 0
- 如果为空,使用 Put 设置并附加 Lease
- 如果不为空,获取当前锁的所有者,即最先加锁的对象,避免惊群效应
func (m *Mutex) Lock(ctx context.Context) error {
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
client := m.s.Client()
_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
return werr
}
// make sure the session is not expired, and the owner key still exists.
gresp, werr := client.Get(ctx, m.myKey)
return nil
}
Lock 方法会一直阻塞,直到获取锁返回执行出错:
- 调用 tryAcquire
- 如果已经加锁成功,或者已经加过锁(可重入),则直接返回
- 调用 waitDeletes 方法,等待所有小于当前 Revsion 的 Key 删除
分布式选主
对于有状态的服务,为了提供其服务水平 SLA 减少宕机时间,通过会有多个副本,当主节点宕机时,副本节点可以快速切换。
通过 etcd 可以实现选主服务,与分布式比较类似
- 选主成功,不断上报心跳
- 通过 Watch 接口,当节点失效时,去竞争主(类似加锁过程)
在 ECTD 官方库go.etcd.io/etcd/client/v3/concurrency中,已经支持了分布式选主。
选主核心逻辑如下
func (e *Election) Campaign(ctx context.Context, val string) error {
s := e.session
client := e.session.Client()
k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
txn = txn.Else(v3.OpGet(k))
resp, err := txn.Commit()
if err != nil {
return err
}
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
if !resp.Succeeded {
kv := resp.Responses[0].GetResponseRange().Kvs[0]
e.leaderRev = kv.CreateRevision
if string(kv.Value) != val {
if err = e.Proclaim(ctx, val); err != nil {
e.Resign(ctx)
return err
}
}
}
_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
if err != nil {
// clean up in case of context cancel
select {
case <-ctx.Done():
e.Resign(client.Ctx())
default:
e.leaderSession = nil
}
return err
}
e.hdr = resp.Header
return nil
}
以上逻辑与 ECTD 锁中的实现非常相似
- 开启事务,首先判断当前服务 Key 是否存在
- 不存在,通过 Put 设置对应值
- 存在获得当前目录最小 Revision 的值,即当前主节点
- 通过 waitDeletes,直到当前进程的 Revision
简单封装下,支持回调,参考了 Kubernetes 的选主实现
func New(config LeaderElectionConfig) (*EctdLeaderElection, error) {
session, err := concurrency.NewSession(config.Client, concurrency.WithTTL(config.LeaseSeconds))
if err != nil {
return nil, err
}
election := concurrency.NewElection(session, config.Prefix)
return &EctdLeaderElection{
LeaderElectionConfig: config,
session: session,
election: election,
}, nil
}
// 运行选主
func (le *EctdLeaderElection) Run(ctx context.Context) error {
defer func() {
le.Callbacks.OnStoppedLeading()
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 添加选主变化
go le.observe(ctx)
// 开始选主
if err := le.election.Campaign(ctx, le.Identity); err != nil {
return err
}
// 选主完成,运行OnStarted,运行结束则退出选主
le.Callbacks.OnStartedLeading(ctx)
return nil
}
// 监听Key变化,执行回调
func (le *EctdLeaderElection) observe(ctx context.Context) {
if le.Callbacks.OnNewLeader == nil {
return
}
ch := le.election.Observe(ctx)
for {
select {
case <-ctx.Done():
return
case resp, ok := <-ch:
if !ok {
return
}
if len(resp.Kvs) == 0 {
continue
}
leader := string(resp.Kvs[0].Value)
if leader != le.Identity {
go le.Callbacks.OnNewLeader(leader)
}
}
}
}
func (le *EctdLeaderElection) Close() error {
return le.session.Close()
}
测试选主服务
func main() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 3 * time.Second,
})
if err != nil {
log.Fatalf("failed to create etcd lock: %v", err)
}
defer client.Close()
prefix := "/worker/election"
worker := func(i int) {
id := fmt.Sprintf("worker-%d", i)
le, err := leaderelection.New(leaderelection.LeaderElectionConfig{
Client: client,
LeaseSeconds: 15,
Prefix: prefix,
Identity: id,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
log.Printf("OnStarted[%s]: acquire new leader", id)
time.Sleep(3 * time.Second)
log.Printf("OnStarted[%s]: worker done", id)
},
OnStoppedLeading: func() {
log.Printf("OnStopped[%s]: exit", id)
},
OnNewLeader: func(identity string) {
log.Printf("OnNewLeader[%s]: new leader %s", id, identity)
},
},
})
if err != nil {
log.Fatalf("failed to create leader election: %v", err)
}
defer le.Close()
le.Run(context.Background())
}
wg := sync.WaitGroup{}
for i := 1; i <= 3; i++ {
wg.Add(1)
id := i
go func() {
defer wg.Done()
worker(id)
}()
}
wg.Wait()
}
运行结果
2022/08/08 09:33:32 OnNewLeader[worker-2]: new leader worker-3
2022/08/08 09:33:32 OnNewLeader[worker-1]: new leader worker-3
2022/08/08 09:33:32 OnStarted[worker-3]: acquire new leader
2022/08/08 09:34:02 OnStarted[worker-3]: worker done
2022/08/08 09:34:02 OnStopped[worker-3]: exit
2022/08/08 09:34:02 OnStarted[worker-2]: acquire new leader
2022/08/08 09:34:02 OnNewLeader[worker-1]: new leader worker-2
2022/08/08 09:34:32 OnStarted[worker-2]: worker done
2022/08/08 09:34:32 OnStopped[worker-2]: exit
2022/08/08 09:34:32 OnStarted[worker-1]: acquire new leader
2022/08/08 09:35:02 OnStarted[worker-1]: worker done
2022/08/08 09:35:02 OnStopped[worker-1]: exit
发布订阅
借助 etcd 的前缀查找、Watch 的功能,可以实现发布订阅功能,主要逻辑如下
// 发布时,直接通过Put将对象设置在对应Topic路径下,并可以设置Lease,自动删除过时消息
func (ps *EtcdPubSub) Publish(ctx context.Context, topic string, msg Msg) error {
le, err := ps.Client.Lease.Grant(ctx, int64(ps.TTLSeconds))
if err != nil {
return err
}
_, err = ps.Client.Put(ctx, ps.Prefix+topic+"/"+msg.Name, msg.Val, clientv3.WithLease(le.ID))
return err
}
// 订阅时,通过Watch来监听Topic是否有Put事件,这里忽略Delete事件
// Revision为0时,从当前时间点开始监听
// Revision为1时,监听Topic创建后的所有事件
func (ps *EtcdPubSub) SubscribeFromRev(ctx context.Context, topic string, rev int64) (<-chan Msg, error) {
wch := ps.Client.Watch(ctx, ps.Prefix+topic, clientv3.WithPrefix(), clientv3.WithFilterDelete(), clientv3.WithRev(rev))
msg := make(chan Msg)
go func() {
defer close(msg)
for {
wc, ok := <-wch
if !ok {
return
}
for _, ev := range wc.Events {
if ev.Type != mvccpb.PUT {
break
}
name := strings.TrimPrefix(string(ev.Kv.Key), ps.Prefix+topic+"/")
msg <- Msg{Name: name, Val: string(ev.Kv.Value)}
}
}
}()
return msg, nil
}
发布时,直接通过 PUT 操作在 Topic 路径下设置消息; 订阅时,通过 Watch 来捕获消息,通过 Revision 来配置不同的监听行为
- Revision 为 0 时,从当前时间点开始监听
- Revision 为 1 时,监听 Topic 创建后的所有事件
总结
本文主要结合 Golang 总结了 etcd 中服务发现、分布式锁、选主等实现方式,另外 etcd 还可以应用在发布订阅、负载均衡等方面。
本文所有代码见https://github.com/qingwave/gocorex,欢迎批评指正。
Explore more in https://qingwave.github.io