1.服务发现
1.1 介绍
- 服务发现是指用注册中心来记录服务信息,以便其他服务快速查找已注册服务
- 服务发现分类:
- 客户端服务发现
- 服务端服务发现
1.2 客户端服务发现
客户端服务发现(Client-side Service Discovery)是一种微服务架构中的模式,用于让客户端应用动态地发现并调用其他服务的实例,而无需通过一个中介(例如负载均衡器或服务网关)。它通常用于分布式系统中,通过客户端直接决定并选择与哪个服务实例通信,从而实现服务发现和负载均衡。
1.3 服务端服务发现
服务端服务发现(Server-side Service Discovery)是另一种服务发现模式,与客户端服务发现相对。在这种模式中,服务的实例发现和负载均衡由服务端组件处理,客户端只需将请求发送给一个固定的入口点(如负载均衡器或 API 网关),由这个入口点负责将请求路由到合适的服务实例。
2.zookeeper
2.1 zookeeper介绍
Apache ZooKeeper 是一个用于分布式系统中的协调服务。它提供了一套高效、可靠的分布式协调工具,用于实现服务注册、配置管理、同步、领导者选举等功能。Zookeeper 的设计初衷是简化分布式应用中的协调任务,从而使应用开发更容易。
- 是一个分布式数据库(程序协调服务),Hadoop子项目
- 树状方式维护节点方数据的增、删、改、查
- 监听通知机制:通过监听可以获取相应消息事件(内容,子节点)
2.2 zookeeper安装
安装zookeeper
- 参考官方文档安装
- http://zookeeper.apache.org/doc/r3.6.0/zookeeperStarted.html
- 下载时需要注意下载的是编译过的二进制文件,不是源码
- 不然会爆错:找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain
- 解压缩
- 编辑 conf/zoo.cfg
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
- 运行
bin/zkServer.sh start
2.3 zookeeper核心功能
- 持久节点
- 一直存在服务器上
- 临时节点
- 会话失效,节点自动清理
- 顺序节点
- 节点创建,自动分配序列号
2.3.1 增删改查API
package main
import (
"fmt"
"github.com/samuel/go-zookeeper/zk"
"time"
)
var (
host = []string{"127.0.0.1:2181"}
)
func main() {
//连接客户端
conn, _, err := zk.Connect(host, 5*time.Second)
if err != nil {
panic(err)
}
//增
if _, err := conn.Create("/test_tree2", []byte("tree_content"),
0, zk.WorldACL(zk.PermAll)); err != nil {
fmt.Println("create err", err)
}
//查
nodeValue, dStat, err := conn.Get("/test_tree2")
if err != nil {
fmt.Println("get err", err)
return
}
fmt.Println("nodeValue", string(nodeValue))
//改,需要先查询得到版本号
if _, err := conn.Set("/test_tree2", []byte("new_content"),
dStat.Version); err != nil {
fmt.Println("update err", err)
}
//删除,也,需要先查询得到版本号
_, dStat, _ = conn.Get("/test_tree2")
if err := conn.Delete("/test_tree2", dStat.Version); err != nil {
fmt.Println("Delete err", err)
//return
}
//验证存在
hasNode, _, err := conn.Exists("/test_tree2")
if err != nil {
fmt.Println("Exists err", err)
//return
}
fmt.Println("node Exist", hasNode)
//增加
if _, err := conn.Create("/test_tree2", []byte("tree_content"),
0, zk.WorldACL(zk.PermAll)); err != nil {
fmt.Println("create err", err)
}
//设置子节点,如果上游节点不存在则会报错
if _, err := conn.Create("/test_tree2/subnode", []byte("node_content"),
0, zk.WorldACL(zk.PermAll)); err != nil {
fmt.Println("create err", err)
}
//获取子节点列表
childNodes, _, err := conn.Children("/test_tree2")
if err != nil {
fmt.Println("Children err", err)
}
fmt.Println("childNodes", childNodes)
}
2.3.2 监听子节点变化
package main
import (
"fmt"
"github.com/e421083458/gateway_demo/proxy/zookeeper"
"log"
"os"
"os/signal"
"syscall"
)
var addr = "127.0.0.1:2002"
func main() {
//获取zk节点列表
zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"})
zkManager.GetConnect()
defer zkManager.Close()
// 注册一个节点
err := zkManager.RegistServerPath("/real_server", "127.0.0.1")
err = zkManager.RegistServerPath("/real_server/test", "127.0.0.1:8823")
err = zkManager.RegistServerPath("/real_server/test2", "127.0.0.1:8823")
if err != nil {
return
}
// 获取节点列表
zlist, err := zkManager.GetServerListByPath("/real_server")
fmt.Println("server node:")
fmt.Println(zlist)
if err != nil {
log.Println(err)
}
//动态监听节点变化
chanList, chanErr := zkManager.WatchServerListByPath("/real_server")
go func() {
for {
select {
case changeErr := <-chanErr:
fmt.Println("changeErr")
fmt.Println(changeErr)
case changedList := <-chanList:
fmt.Println("watch node changed")
fmt.Println(changedList)
}
}
}()
time.Sleep(time.Second * 5)
zkManager.RegistServerPath("/real_server/test3", "127.0.0.2:8888")
//关闭信号监听
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
2.3.3 监听节点内容变化
package main
import (
"fmt"
"github.com/e421083458/gateway_demo/proxy/zookeeper"
"log"
"os"
"os/signal"
"syscall"
"time"
)
var addr = "127.0.0.1:2002"
func main() {
//获取zk节点列表
zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"})
zkManager.GetConnect()
defer zkManager.Close()
// 注册一个节点
err := zkManager.RegistServerPath("/rs_server_conf", "192.168.1.101")
if err != nil {
fmt.Printf("2001:%v \n", err)
return
}
// 获取节点列表
zlist, err := zkManager.GetServerListByPath("/rs_server_conf")
fmt.Println("server node:")
fmt.Println(zlist)
if err != nil {
log.Println(err)
}
获取节点内容
zc, _, err := zkManager.GetPathData("/rs_server_conf")
if err != nil {
log.Println(err)
}
fmt.Println("get node data:")
fmt.Println(string(zc))
//动态监听节点内容
dataChan, dataErrChan := zkManager.WatchPathData("/rs_server_conf")
go func() {
for {
select {
case changeErr := <-dataErrChan:
fmt.Println("changeErr")
fmt.Println(changeErr)
case changedData := <-dataChan:
fmt.Println("WatchGetData changed")
fmt.Println(string(changedData))
}
}
}()
// 尝试修改内容
time.Sleep(5 * time.Second)
_, z, err := zkManager.GetPathData("/rs_server_conf")
if err != nil {
return
}
err = zkManager.SetPathData("/rs_server_conf", []byte(addr), z.Version)
if err != nil {
fmt.Sprintf("2002:%v \n", err)
return
}
//关闭信号监听
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
3.网关实现服务发现原理
3.1 网关实现客户端服务发现
3.2 网关实现服务端服务发现
- 将服务注册到zookeeper中
- 网关通过监听zookeeper中的事件来感知变化
4.网关拓展服务发现
- 下游机器启动时创建临时节点:节点名与内容为服务地址
- 以观察者模式构建负载均衡配置LoadBalanceConf
- 负载均衡配置LoadBalanceConf与负载均衡器整合
4.1 观察者模式
观察者模式(Observer Pattern)是一种行为设计模式,它定义了一种一对多的依赖关系,使得多个观察者对象可以同时监听某一个主题对象。当这个主题对象的状态发生变化时,会通知所有观察者对象,使它们能够自动更新。观察者模式常用于实现事件处理系统,如用户界面事件、订阅/发布系统等。
- 关键概念
- 主题(Subject):也称为发布者(Publisher),它维护一组观察者对象,并提供注册和移除观察者的方法。当主题的状态发生变化时,会通知所有观察者。
- 观察者(Observer):也称为订阅者(Subscriber),它定义了一个更新接口,用于接收来自主题的通知。每个观察者在接收到通知后,可以执行特定的操作。
- 通知(Notification):指的是主题状态变化时向观察者发送的信号或消息。
4.2 以观察者模式构建负载均衡配置
package load_balance
import (
"fmt"
"github.com/e421083458/gateway_demo/proxy/zookeeper"
)
// 配置主题
type LoadBalanceConf interface {
Attach(o Observer)
GetConf() []string
WatchConf()
UpdateConf(conf []string)
}
type LoadBalanceZkConf struct {
observers []Observer
path string
zkHosts []string
confIpWeight map[string]string
activeList []string
format string
}
func (s *LoadBalanceZkConf) Attach(o Observer) {
s.observers = append(s.observers, o)
}
func (s *LoadBalanceZkConf) NotifyAllObservers() {
for _, obs := range s.observers {
obs.Update()
}
}
func (s *LoadBalanceZkConf) GetConf() []string {
confList := []string{}
for _, ip := range s.activeList {
weight, ok := s.confIpWeight[ip]
if !ok {
weight = "50" //默认weight
}
confList = append(confList, fmt.Sprintf(s.format, ip)+","+weight)
}
return confList
}
//更新配置时,通知监听者也更新
func (s *LoadBalanceZkConf) WatchConf() {
zkManager := zookeeper.NewZkManager(s.zkHosts)
zkManager.GetConnect()
fmt.Println("watchConf")
chanList, chanErr := zkManager.WatchServerListByPath(s.path)
go func() {
defer zkManager.Close()
for {
select {
case changeErr := <-chanErr:
fmt.Println("changeErr", changeErr)
case changedList := <-chanList:
fmt.Println("watch node changed")
s.UpdateConf(changedList)
}
}
}()
}
//更新配置时,通知监听者也更新
func (s *LoadBalanceZkConf) UpdateConf(conf []string) {
s.activeList = conf
for _, obs := range s.observers {
obs.Update()
}
}
func NewLoadBalanceZkConf(format, path string, zkHosts []string, conf map[string]string) (*LoadBalanceZkConf, error) {
zkManager := zookeeper.NewZkManager(zkHosts)
zkManager.GetConnect()
defer zkManager.Close()
zlist, err := zkManager.GetServerListByPath(path)
if err != nil {
return nil, err
}
mConf := &LoadBalanceZkConf{format: format, activeList: zlist, confIpWeight: conf, zkHosts: zkHosts, path: path}
mConf.WatchConf()
return mConf, nil
}
type Observer interface {
Update()
}
type LoadBalanceObserver struct {
ModuleConf *LoadBalanceZkConf
}
func (l *LoadBalanceObserver) Update() {
fmt.Println("Update get conf:", l.ModuleConf.GetConf())
}
func NewLoadBalanceObserver(conf *LoadBalanceZkConf) *LoadBalanceObserver {
return &LoadBalanceObserver{
ModuleConf: conf,
}
}
4.3 负载均衡配置LoadBalanceConf与负载均衡器整合
package main
import (
"github.com/e421083458/gateway_demo/proxy/load_balance"
"github.com/e421083458/gateway_demo/proxy/middleware"
proxy2 "github.com/e421083458/gateway_demo/proxy/proxy"
"log"
"net/http"
)
var (
addr = "127.0.0.1:2002"
)
func main() {
mConf, err := load_balance.NewLoadBalanceZkConf("http://%s/base",
"/real_server",
[]string{"127.0.0.1:2181"},
map[string]string{"127.0.0.1:2003": "20"})
if err != nil {
panic(err)
}
rb := load_balance.LoadBanlanceFactorWithConf(load_balance.LbWeightRoundRobin, mConf)
proxy := proxy2.NewLoadBalanceReverseProxy(&middleware.SliceRouterContext{}, rb)
log.Println("Starting httpserver at " + addr)
log.Fatal(http.ListenAndServe(addr, proxy))
}
4.4 客户端服务发现实现
网关主动通过心跳检测区检测客户端的服务
- 下游机器启动时无需进行任何操作
- 以观察者模式构建负载均衡配置LoadBalanceConf
- 负载均衡配置固定时间频率监测下游节点健康状况
package load_balance
import (
"fmt"
"net"
"reflect"
"sort"
"time"
)
const (
//default check setting
DefaultCheckMethod = 0
DefaultCheckTimeout = 2
DefaultCheckMaxErrNum = 2
DefaultCheckInterval = 5
)
type LoadBalanceCheckConf struct {
observers []Observer
confIpWeight map[string]string
activeList []string
format string
}
func (s *LoadBalanceCheckConf) Attach(o Observer) {
s.observers = append(s.observers, o)
}
func (s *LoadBalanceCheckConf) NotifyAllObservers() {
for _, obs := range s.observers {
obs.Update()
}
}
func (s *LoadBalanceCheckConf) GetConf() []string {
confList := []string{}
for _, ip := range s.activeList {
weight, ok := s.confIpWeight[ip]
if !ok {
weight = "50" //默认weight
}
confList = append(confList, fmt.Sprintf(s.format, ip)+","+weight)
}
return confList
}
//更新配置时,通知监听者也更新
func (s *LoadBalanceCheckConf) WatchConf() {
fmt.Println("watchConf")
go func() {
confIpErrNum := map[string]int{}
for {
changedList := []string{}
for item, _ := range s.confIpWeight {
conn, err := net.DialTimeout("tcp", item, time.Duration(DefaultCheckTimeout)*time.Second)
//todo http statuscode
if err == nil {
conn.Close()
if _, ok := confIpErrNum[item]; ok {
confIpErrNum[item] = 0
}
}
if err != nil {
if _, ok := confIpErrNum[item]; ok {
confIpErrNum[item] += 1
} else {
confIpErrNum[item] = 1
}
}
if confIpErrNum[item] < DefaultCheckMaxErrNum {
changedList = append(changedList, item)
}
}
sort.Strings(changedList)
sort.Strings(s.activeList)
if !reflect.DeepEqual(changedList, s.activeList) {
s.UpdateConf(changedList)
}
time.Sleep(time.Duration(DefaultCheckInterval) * time.Second)
}
}()
}
//更新配置时,通知监听者也更新
func (s *LoadBalanceCheckConf) UpdateConf(conf []string) {
fmt.Println("UpdateConf", conf)
s.activeList = conf
for _, obs := range s.observers {
obs.Update()
}
}
func NewLoadBalanceCheckConf(format string, conf map[string]string) (*LoadBalanceCheckConf, error) {
aList := []string{}
//默认初始化
for item, _ := range conf {
aList = append(aList, item)
}
mConf := &LoadBalanceCheckConf{format: format, activeList: aList, confIpWeight: conf}
mConf.WatchConf()
return mConf, nil
}