1.hystrix-go类库
hystrix-go 是 Netflix 开源的 Hystrix 库在 Go 语言中的实现,用于处理服务中的故障和延迟问题。它通过提供熔断器(Circuit Breaker)、隔离、降级、限流、以及实时监控等机制,帮助开发者构建健壮的分布式系统。hystrix-go 旨在提高系统的容错能力和稳定性。
- 熔断器(Circuit Breaker):
- 监控操作的成功率和失败率,当失败率达到一定阈值时,熔断器会“跳闸”,短路后续的请求,从而防止系统被不稳定的服务压垮。
- 熔断器会定期尝试恢复正常的请求流,当检测到服务恢复正常时会“闭合”,恢复正常的请求处理。
- 隔离(Isolation):
- 提供基于并发数或线程池的隔离机制,防止单个操作耗尽系统资源,影响其他操作。
- 通过配置最大并发数限制,确保资源使用在可控范围内。
- 降级(Fallback):
- 在操作失败或超时时提供降级处理逻辑,确保服务在部分故障的情况下仍然能够提供部分功能。
- 允许定义备用逻辑或返回默认值,以避免服务中断。
- 限流(Rate Limiting):
- 控制请求速率,防止系统过载。
- 通过限制单位时间内的请求数,确保系统在高负载情况下仍能稳定运行。
- 实时监控:
- 提供实时监控指标,如请求成功率、失败率、超时次数等,帮助开发者了解系统的健康状态。
- 集成监控仪表盘工具(如 Hystrix Dashboard)可以实时查看各个熔断器的状态和指标。
2. 基本使用方式
package main
import (
"errors"
"fmt"
"github.com/afex/hystrix-go/hystrix"
"log"
"net/http"
"testing"
"time"
)
func Test_main(t *testing.T) {
// 初始化流统计服务器
hystrixStreamHandler := hystrix.NewStreamHandler()
hystrixStreamHandler.Start()
go http.ListenAndServe(":8074", hystrixStreamHandler)
// 配置熔断器
hystrix.ConfigureCommand("aaa", hystrix.CommandConfig{
Timeout: 1000, // 单次请求 超时时间(ms)
MaxConcurrentRequests: 1, // 最大并发量,限流(基于token令牌,使用完会放回)
SleepWindow: 5000, // 熔断后多久去尝试服务是否可用(ms)
RequestVolumeThreshold: 1, // 熔断器在评估跳闸前,需要至少统计的请求数
ErrorPercentThreshold: 1, // 验证熔断的 错误百分比
})
// 使用
for i := 0; i < 10000; i++ {
//异步调用使用 hystrix.Go
err := hystrix.Do("aaa", func() error {
//test case 1 并发测试
if i == 0 {
return errors.New("service error")
}
//test case 2 超时测试
//time.Sleep(2 * time.Second)
log.Println("do services")
return nil
}, func(err error) error {
fmt.Println("短暂出现了错误,请让服务器休息一下")
return err
})
if err != nil {
log.Println("hystrix err:" + err.Error())
time.Sleep(1 * time.Second)
log.Println("sleep 1 second")
}
}
time.Sleep(100 * time.Second)
}
3. hystrix-go dashboard
3.1 docker安装
https://github.com/mlabouardy/hystrix-dashboard-docker/tree/master
这里我的电脑不支持docker,但是我们发现这是一个jar包,我们可以尝试直接使用java -jar来启动即可。(请保证你的java版本为8即可)
3.2 使用教程
启动后输入:http://localhost:8080/hystrix即可进入界面
输入要监控的地址:http://localhost:8074
启动上面给的test代码:
即可看到效果:
4. 核心源码
4.1 流量统计
// DefaultMetricCollector 结构体保存了关于接口状态的各种指标信息。
// 这个 MetricCollector 的实现是关于接口的信息的标准来源。
// 它用于所有内部 hystrix 操作,包括接口健康检查和发送到 hystrix 仪表板的指标。
//
// Metric Collectors 不需要 Mutex 来保护,因为它们在受锁定的上下文内被接口更新。
type DefaultMetricCollector struct {
mutex *sync.RWMutex // 用于控制并发访问的读写锁
numRequests *rolling.Number // 请求总数
errors *rolling.Number // 错误数
successes *rolling.Number // 成功执行次数
failures *rolling.Number // 失败执行次数
rejects *rolling.Number // 拒绝执行次数
shortCircuits *rolling.Number // 短路执行次数
timeouts *rolling.Number // 超时次数
contextCanceled *rolling.Number // 上下文取消次数
contextDeadlineExceeded *rolling.Number // 上下文截止时间超过次数
fallbackSuccesses *rolling.Number // 回退成功次数
fallbackFailures *rolling.Number // 回退失败次数
totalDuration *rolling.Timing // 总执行时间
runDuration *rolling.Timing // 实际执行时间
}
// Number 结构体跟踪一个有限数量时间桶的 numberBucket。
// 当前每个桶的时长为一秒钟,仅保留最近的 10 秒钟数据,新的数据会把旧的数据替换出去。
type Number struct {
Buckets map[int64]*numberBucket // 存储时间桶数据的映射,int64表示时间,numberBucket表示并发量
Mutex *sync.RWMutex // 用于并发访问控制的读写锁
}
type numberBucket struct {
Value float64
}
4.2 流量控制
基于可放回的token实现了流量控制
type executorPool struct {
Name string // 池子的名称
Metrics *poolMetrics // 池子的指标信息
Max int // 最大并发请求数量
Tickets chan *struct{} // 用于控制并发访问的通道,存放还在的token令牌
}
func newExecutorPool(name string) *executorPool {
p := &executorPool{} // 创建 executorPool 结构体实例
p.Name = name // 设置池子的名称
p.Metrics = newPoolMetrics(name) // 初始化池子的指标信息
p.Max = getSettings(name).MaxConcurrentRequests // 获取并设置最大并发请求数量
p.Tickets = make(chan *struct{}, p.Max) // 初始化 Tickets 通道,缓冲大小为最大并发数
for i := 0; i < p.Max; i++ {
p.Tickets <- &struct{}{} // 在 Tickets 中预先放入最大并发数个空结构体指针
}
return p // 返回初始化后的 executorPool 实例
}
func (p *executorPool) Return(ticket *struct{}) {
if ticket == nil {
return // 如果票据为空,则直接返回
}
// 发送池子的更新指标信息到 Metrics 的 Updates 通道
p.Metrics.Updates <- poolMetricsUpdate{
activeCount: p.ActiveCount(), // 更新活跃任务数量
}
p.Tickets <- ticket // 将票据放回 Tickets 通道中
}
4.3 实时数据流
// 初始化流统计服务器
hystrixStreamHandler := hystrix.NewStreamHandler()
hystrixStreamHandler.Start()
go http.ListenAndServe(":8074", hystrixStreamHandler)
type StreamHandler struct {
requests map[*http.Request]chan []byte//存放请求和对应的通道
mu sync.RWMutex
done chan struct{}
}
func (sh *StreamHandler) loop() {
tick := time.Tick(1 * time.Second) // 创建一个每秒触发的定时器
for {
select {
case <-tick:
circuitBreakersMutex.RLock() // 获取断路器列表的读锁
for _, cb := range circuitBreakers { // 遍历所有的断路器
sh.publishMetrics(cb) // 发布断路器的指标信息
sh.publishThreadPools(cb.executorPool) // 发布断路器关联的线程池信息
}
circuitBreakersMutex.RUnlock() // 释放断路器列表的读锁
case <-sh.done:
return // 如果收到 sh.done 通道的信号,结束循环
}
}
}
func (sh *StreamHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// 确保 writer 支持刷新操作。
f, ok := rw.(http.Flusher)
if !ok {
http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
return
}
// 注册请求,获取事件通道。
events := sh.register(req)
defer sh.unregister(req)
// 监听连接关闭通知。
notify := rw.(http.CloseNotifier).CloseNotify()
// 设置响应头部信息。
rw.Header().Add("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
// 循环处理事件和通知。
for {
select {
case <-notify:
// 客户端断开连接。
return
case event := <-events:
// 发送事件给客户端。
_, err := rw.Write(event)
if err != nil {
return
}
f.Flush() // 刷新响应,确保事件立即发送给客户端。
}
}
}
所以当我们访问127.0.0.1:8074接口时就会不停的接受到实时数据流
5. 总结
hystrix-go 是一个用于实现断路器模式的类库,它是 Netflix 的 Hystrix 的 Go 语言实现版本。它主要用于分布式系统中的容错和弹性设计,特别是在处理微服务架构中的服务间调用时非常有用。