前言
在现代分布式系统架构中,服务的稳定性和可用性是至关重要的。随着微服务和云原生技术的发展,如何有效地进行流量控制、熔断降级以及系统保护成为了一个关键课题。Sentinel 是阿里巴巴开源的一款面向分布式服务架构的流量控制组件,它不仅能够帮助开发者防止服务过载,还能在系统不稳定时自动切断请求,防止故障扩散。本文将深入探讨如何使用 Sentinel Go 进行服务防护,包括流量控制、熔断降级、并发隔离控制、系统自适应保护、热点参数流控以及动态数据源使用。通过详细的参数解析、最佳实践和示例代码,帮助您全面理解和应用这些功能。
背景
环境:Go 语言开发环境。依赖库:https://github.com/alibaba/sentinel-golang应用场景:适用于需要高可用性的微服务架构,特别是在处理突发流量、服务依赖不稳定或资源有限的情况下。
1. 快速开始
1.1 引入依赖
首先,确保您的项目已经安装了 Go 语言环境。然后,通过 go mod 引入最新版本的 Sentinel Go。
go get github.com/alibaba/sentinel-golang@latest
1.2 初始化 Sentinel
启动时调用 InitDefault() 或 Init(configPath string) 来初始化运行环境。这一步骤是必要的,因为 Sentinel 需要设置一些基本配置,如日志路径等。
import (sentinel "github.com/alibaba/sentinel-golang/api")err := sentinel.InitDefault()if err != nil { panic(err)}
1.3 定义资源并埋点
使用 Entry(resource string, opts ...Option) 将业务逻辑封装起来,实现对资源的监控。这是 Sentinel 的核心机制之一,用于识别和控制资源的访问。
import (sentinel "github.com/alibaba/sentinel-golang/api")// Entry 方法用于埋点e, b := sentinel.Entry("your-resource-name", sentinel.WithTrafficType(base.Inbound))if b != nil {// 请求被流控,可以从 BlockError 中获取限流详情// block 后不需要进行 Exit()} else {// 请求可以通过,在此处编写您的业务逻辑// 务必保证业务逻辑结束后 Exite.Exit()}
1.4 规则配置
支持硬编码方式加载规则,例如通过 flow.LoadRules 函数来设置流控规则。这允许您根据业务需求动态调整流控策略。
import ("github.com/alibaba/sentinel-golang/core/flow")_, err = flow.LoadRules([]*flow.Rule{{Resource: "your-resource-name",Threshold: 10,TokenCalculateStrategy: flow.Direct,ControlBehavior: flow.Reject,},})if err != nil {panic(err)}
2. 流量控制
2.1 概述
流量控制( flow control ),其原理是监控资源( Resource )的统计指标,然后根据 token 计算策略来计算资源的可用 token(也就是阈值),然后根据流量控制策略对请求进行控制,避免被瞬时的流量高峰冲垮,从而保障应用的高可用性。Sentinel 提供了多种流量控制策略,以满足不同场景下的需求。
2.2 策略
直接模式 (Direct):基于固定阈值进行流量控制。
匀速排队 (Throttling):以固定的间隔时间让请求通过,避免瞬时流量高峰。
预热模式 (WarmUp):在冷启动阶段逐步增加流量,平滑过渡到正常流量。
2.3 配置
通过 flow.Rule 结构体来配置具体的流控规则。每个字段都有其特定的作用,合理配置可以有效管理流量。
import ("github.com/alibaba/sentinel-golang/core/flow")type Rule struct {ID string `json:"id,omitempty"`Resource string `json:"resource"`TokenCalculateStrategy flow.TokenCalculateStrategy `json:"tokenCalculateStrategy"`ControlBehavior flow.ControlBehavior `json:"controlBehavior"`Threshold float64 `json:"threshold"`RelationStrategy flow.RelationStrategy `json:"relationStrategy"`RefResource string `json:"refResource"`MaxQueueingTimeMs uint32 `json:"maxQueueingTimeMs"`WarmUpPeriodSec uint32 `json:"warmUpPeriodSec"`WarmUpColdFactor uint32 `json:"warmUpColdFactor"`StatIntervalInMs uint32 `json:"statIntervalInMs"`LowMemUsageThreshold int64 `json:"lowMemUsageThreshold"`HighMemUsageThreshold int64 `json:"highMemUsageThreshold"`MemLowWaterMarkBytes int64 `json:"memLowWaterMarkBytes"`MemHighWaterMarkBytes int64 `json:"memHighWaterMarkBytes"`}
参数解析
ID:规则的唯一标识(可选)。Resource:资源名称。TokenCalculateStrategy:令牌计算策略,如 Direct。ControlBehavior:控制行为,如 Reject 或 Throttling。Threshold:在 StatIntervalInMs 期间内的阈值。如果 StatIntervalInMs 是 1000 毫秒(1 秒),那么 Threshold 表示每秒查询率(QPS)。RelationStrategy:关联策略,例如 Current 或 Associated。RefResource:引用资源,仅在 RelationStrategy 为 Associated 时有效。MaxQueueingTimeMs:仅在 ControlBehavior 为 Throttling 时生效。当 MaxQueueingTimeMs 为 0 时,表示仅控制请求间隔,超出阈值的请求将被直接拒绝。WarmUpPeriodSec:预热期时间(秒)。WarmUpColdFactor:预热冷启动因子。StatIntervalInMs:统计间隔(毫秒),可选设置。如果用户未设置 StatIntervalInMs,则使用资源的默认统计指标。如果用户指定的 StatIntervalInMs 无法重用全局资源统计,则 Sentinel 将为此规则生成独立的统计结构。LowMemUsageThreshold:低内存使用阈值。HighMemUsageThreshold:高内存使用阈值。MemLowWaterMarkBytes:内存低水位标记字节数。MemHighWaterMarkBytes:内存高水位标记字节数。参数限制
必须满足 LowMemUsageThreshold > HighMemUsageThreshold。
必须满足 MemHighWaterMarkBytes > MemLowWaterMarkBytes。
参数逻辑
如果当前内存使用量小于或等于 MemLowWaterMarkBytes,则 threshold 等于 LowMemUsageThreshold。
如果当前内存使用量大于或等于 MemHighWaterMarkBytes,则 threshold 等于 HighMemUsageThreshold。
如果当前内存使用量位于 MemLowWaterMarkBytes 和 MemHighWaterMarkBytes 之间,则 threshold 将介于 HighMemUsageThreshold 和 LowMemUsageThreshold 之间,根据内存使用情况动态调整。
2.4 示例
流量控制之直接模式
package mainimport ("fmt"sentinel "github.com/alibaba/sentinel-golang/api" // 导入 Sentinel Go API 包"github.com/alibaba/sentinel-golang/core/base" // 导入 Sentinel 基础包"github.com/alibaba/sentinel-golang/core/flow" // 导入 Sentinel 流控包"log" // 导入日志包"time")// 基于 Sentinel 的 QPS 限流func main() {// 必须初始化 Sentinel 框架,否则无法使用 Sentinel 的功能err := sentinel.InitDefault()if err != nil {log.Fatalf("初始化 Sentinel 失败: %+v", err) // 初始化失败时,输出错误信息并终止程序}// 加载限流规则,定义资源 "test" 的限流策略// 配置限流策略为直接模式,即按照固定的阈值进行限流// 拒绝请求,即直接返回错误// 阈值为 10,即每秒最多允许 10 次请求_, err = flow.LoadRules([]*flow.Rule{{Resource: "test", // 资源名称,用于标识被保护的资源TokenCalculateStrategy: flow.Direct, // 计算策略:直接模式,即按照固定的阈值进行限流ControlBehavior: flow.Reject, // 控制行为:超过阈值直接拒绝请求Threshold: 10, // 请求次数阈值,每秒最多允许 10 次请求StatIntervalInMs: 1000, // 统计时间窗口,单位为毫秒,这里设置为 1 秒},})if err != nil {log.Fatalf("加载限流规则失败: %+v", err) // 加载规则失败时,输出错误信息并终止程序}// 模拟 12 次请求,用于测试限流效果for i := 0; i < 12; i++ {go func(i int) {// 尝试进入资源 "test",指定流量类型为入站// Entry 方法会根据限流规则决定是否允许请求通过e, b := sentinel.Entry("test", sentinel.WithTrafficType(base.Inbound))if b != nil {// 如果被限流,打印提示信息,并记录被限流的请求次数fmt.Printf("第 %02d 请求次 被限流\n", i+1)} else {// 如果检查通过,打印提示信息,并退出资源fmt.Printf("第 %02d 请求次 已通过\n", i+1)e.Exit() // 退出资源,释放占用的令牌}}(i)}time.Sleep(3 * time.Second)}
流量控制之匀速排队
package mainimport ("fmt""github.com/alibaba/sentinel-golang/core/stat""log""math/rand""time"sentinel "github.com/alibaba/sentinel-golang/api""github.com/alibaba/sentinel-golang/core/base""github.com/alibaba/sentinel-golang/core/flow")func main() {// 初始化 Sentinelerr := sentinel.InitDefault()if err != nil {log.Fatalf("初始化 Sentinel 失败: %+v", err)}// 加载限流规则_, err = flow.LoadRules([]*flow.Rule{{Resource: "test", // 资源名称,用于标识受保护的资源Threshold: 1000, // 限流阈值,表示每秒允许的最大请求数TokenCalculateStrategy: flow.WarmUp, // 流量控制策略,WarmUp 表示预热模式ControlBehavior: flow.Reject, // 控制行为,Reject 表示拒绝超出阈值的请求WarmUpPeriodSec: 5, // 预热时间,单位为秒,表示从初始阈值逐渐增加到最大阈值的时间},})if err != nil {log.Fatalf("加载限流规则失败: %v", err)}// 启动请求模拟for i := 0; i < 50; i++ {go func() {for {// 获取 SentinelEntrye, b := sentinel.Entry("test", sentinel.WithTrafficType(base.Inbound))if b != nil {// 模拟请求耗时time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond)} else {// 模拟请求耗时time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond)// 退出资源e.Exit()}}}()}// 每秒输出统计信息// 使用 time.Ticker 每秒输出一次统计信息,并重置统计信息。go func() {ticker := time.NewTicker(time.Second)defer ticker.Stop()for range ticker.C {// 获取当前的统计数据node := stat.GetResourceNode("test")// 打印统计数据fmt.Printf("请求时间: %v, 请求数: %4v, 通过数: %4v, 限流数: %4v, 完成数: %4v\n",time.Now().Format("15:04:05"), // 当前时间node.GetSum(base.MetricEventPass)+node.GetSum(base.MetricEventBlock), // 总请求数node.GetSum(base.MetricEventPass), // 通过的请求数node.GetSum(base.MetricEventBlock), // 被限流的请求数node.GetSum(base.MetricEventComplete), // 完成的请求数)}}()// 保持程序运行select {}}
流量控制之系统自适应
package mainimport ("fmt""github.com/alibaba/sentinel-golang/core/config""github.com/alibaba/sentinel-golang/core/stat""log""math/rand""time"sentinel "github.com/alibaba/sentinel-golang/api""github.com/alibaba/sentinel-golang/core/base""github.com/alibaba/sentinel-golang/core/flow""github.com/alibaba/sentinel-golang/core/system_metric")func main() {// 创建默认配置对象conf := config.NewDefaultConfig()conf.Sentinel.Stat.System.CollectIntervalMs = 0 // 关闭系统指标收集的定时任务conf.Sentinel.Stat.System.CollectMemoryIntervalMs = 0 // 关闭内存使用量收集的定时任务// 初始化 Sentinelerr := sentinel.InitWithConfig(conf)if err != nil {log.Fatalf("初始化 Sentinel 失败: %+v", err)}// 加载限流规则// 定义并加载基于内存自适应的限流规则。_, err = flow.LoadRules([]*flow.Rule{{Resource: "test", // 资源名称TokenCalculateStrategy: flow.MemoryAdaptive, // 使用内存自适应策略ControlBehavior: flow.Reject, // 当达到限流条件时,拒绝请求StatIntervalInMs: 1000, // 统计间隔,单位为毫秒LowMemUsageThreshold: 1000, // 低内存使用阈值,单位为 MBHighMemUsageThreshold: 100, // 高内存使用阈值,单位为 MBMemLowWaterMarkBytes: 1 * 1024 * 1024, // 低水位标记字节数,1 MBMemHighWaterMarkBytes: 3 * 1024 * 1024, // 高水位标记字节数,3 MB},})if err != nil {log.Fatalf("加载限流规则失败: %+v", err)}// 设置初始内存使用量// 初始内存使用量设置为 1 MB,模拟低内存使用情况。system_metric.SetSystemMemoryUsage(1024 * 1024)// 启动请求模拟// 启动 10 个 goroutine 发送请求,模拟并发请求。for i := 0; i < 10; i++ {go func() {for {// 尝试获取资源入口e, b := sentinel.Entry("test", sentinel.WithTrafficType(base.Inbound))if b != nil {// 请求被限流time.Sleep(time.Duration(rand.Uint64()%2) * time.Millisecond) // 模拟处理时间} else {// 请求通过time.Sleep(time.Duration(rand.Uint64()%2) * time.Millisecond) // 模拟处理时间e.Exit() // 退出资源入口}}}()}// 每秒输出统计信息// 使用 time.Ticker 每秒输出一次统计信息,并重置统计信息。go func() {ticker := time.NewTicker(time.Second)defer ticker.Stop()for range ticker.C {// 获取当前的统计数据node := stat.GetResourceNode("test")// 打印统计数据fmt.Printf("请求时间: %v, 请求数: %4v, 通过数: %4v, 限流数: %4v, 完成数: %4v, 内存使用量: %4v\n",time.Now().Format("15:04:05"), // 当前时间node.GetSum(base.MetricEventPass)+node.GetSum(base.MetricEventBlock), // 总请求数node.GetSum(base.MetricEventPass), // 通过的请求数node.GetSum(base.MetricEventBlock), // 被限流的请求数node.GetSum(base.MetricEventComplete), // 完成的请求数formatMemoryUsage(system_metric.CurrentMemoryUsage()), // 当前内存使用量)}}()// 动态调整内存使用量// 模拟不同内存使用量下的限流效果。go func() {time.Sleep(time.Second * 5)// 设置内存使用量 3 MB,模拟高内存使用情况。system_metric.SetSystemMemoryUsage(3 * 1024 * 1024)time.Sleep(time.Second * 5)// 设置内存使用量 1 MB,模拟低内存使用情况。system_metric.SetSystemMemoryUsage(1024 * 1024)}()// 保持程序运行// 使用 select {} 保持程序运行,防止主 goroutine 退出。select {}}// formatMemoryUsage 将字节数转换为更易读的格式(KB、MB、GB)func formatMemoryUsage(bytes int64) string {const unit = 1024if bytes < unit {return fmt.Sprintf("%dB", bytes)}div, exp := int64(unit), 0for n := bytes / unit; n >= unit; n /= unit {div *= unitexp++}return fmt.Sprintf("%.1f%cB", float64(bytes)/float64(div), "KMGTPE"[exp])}
3 熔断降级
3.1 概述
当检测到下游服务不稳定或错误率过高时自动切断请求,防止故障扩散。熔断降级是一种常见的容错机制,用于提高系统的可用性。
3.2 熔断器模型
Sentinel 的熔断降级基于熔断器模式实现,内部维护了一个状态机,具有三种状态:
Closed (闭合):初始状态,允许请求通过。Open (打开):当检测到异常情况(如错误率或响应时间超过阈值),切换至此状态,拒绝所有新请求。Half-Open (半开):经过一定时间后,尝试少量请求以验证服务是否恢复正常;如果成功则恢复到 Closed 状态,否则返回 Open 状态。3.3 熔断策略
Sentinel 支持三种主要的熔断策略:
慢调用比例 (SlowRequestRatio):根据响应时间超过设定的最大允许值的比例决定是否熔断。异常比例 (ErrorRatio):基于一段时间内异常请求的比例超过阈值时触发熔断。异常数量 (ErrorCount):直接统计异常请求数量,达到阈值即触发熔断。 每种策略都有对应的配置参数,例如 MaxAllowedRtMs(最大允许响应时间)、Threshold(阈值)、MinRequestAmount(最小请求数量)等,用于精细化控制熔断行为。3.4 配置
利用 circuitbreaker.Rule 定义熔断规则,并通过相关 API 加载规则。合理配置熔断规则可以有效避免服务雪崩。
type Rule struct {ID string `json:"id,omitempty"` // 规则的唯一标识(可选)Resource string `json:"resource"` // 目标资源定义,表示该规则适用的资源类型Strategy circuitbreaker.Strategy `json:"strategy"` // 策略类型,定义了在触发条件满足时应采取的行动RetryTimeoutMs uint32 `json:"retryTimeoutMs"` // 重试超时时间,单位为毫秒MinRequestAmount uint64 `json:"minRequestAmount"` // 最小请求量,用于统计计算的最小请求次数StatIntervalMs uint32 `json:"statIntervalMs"` // 统计间隔,单位为毫秒,表示统计周期MaxAllowedRtMs uint64 `json:"maxAllowedRtMs"` // 最大允许响应时间,单位为毫秒Threshold float64 `json:"threshold"` // 阈值,表示资源的最大允许值或触发条件}
参数解析:
ID:规则的唯一标识(可选)。Resource:资源名称。Strategy:熔断策略,如 SlowRequestRatio、ErrorRatio、ErrorCount。RetryTimeoutMs:熔断超时重试的时间(毫秒)。MinRequestAmount:最小请求数目。StatIntervalMs:统计周期(毫秒)。MaxAllowedRtMs:最大允许响应时间(毫秒),仅对 SlowRequestRatio 有效。Threshold:阈值,对于 SlowRequestRatio 和 ErrorRatio 是比例(0.0 到 1.0),对于 ErrorCount 是计数。3.5 示例
熔断之慢调用比例
package mainimport ("errors""fmt"sentinel "github.com/alibaba/sentinel-golang/api""github.com/alibaba/sentinel-golang/core/base""github.com/alibaba/sentinel-golang/core/circuitbreaker""github.com/alibaba/sentinel-golang/core/stat""log""math/rand""time")// stateChangeTestListener 实现了断路器状态变化的监听器接口。type stateChangeTestListener struct{}// OnTransformToClosed 在断路器状态从其他状态转换为关闭时触发。func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {fmt.Printf("时间: %+v, 断路器策略: %+v, 旧状态: %-8s, 新状态: %-8s\n",time.Now().Format("15:04:05"), rule.Strategy, prev.String(), "Closed")}// OnTransformToOpen 在断路器状态从其他状态转换为打开时触发。func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {fmt.Printf("时间: %+v, 断路器策略: %+v, 旧状态: %-8s, 新状态: %-8s, 快照: %+v\n",time.Now().Format("15:04:05"), rule.Strategy, prev.String(), "Open", snapshot)}// OnTransformToHalfOpen 在断路器状态从其他状态转换为半开时触发。func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {fmt.Printf("时间: %+v, 断路器策略: %+v, 旧状态: %-8s, 新状态: %-8s\n",time.Now().Format("15:04:05"), rule.Strategy, prev.String(), "HalfOpen")}func main() {// 初始化 Sentinelif err := sentinel.InitDefault(); err != nil {log.Fatalf("初始化 Sentinel 失败: %+v", err)}// 注册状态变化监听器circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})// 定义断路器规则rules := []*circuitbreaker.Rule{{Resource: "test", // 资源名称,用于标识受保护的资源。Strategy: circuitbreaker.SlowRequestRatio, // 断路器策略,定义了如何判断请求是否应该被熔断。在这个例子中,策略为基于慢请求比例进行熔断。RetryTimeoutMs: 5000, // 断路器打开后,等待多长时间后自动尝试恢复(半开状态)。单位为毫秒。MinRequestAmount: 15, // 在统计时间窗口内,最少需要多少个请求才能触发熔断规则。StatIntervalMs: 5000, // 统计时间窗口的长度,单位为毫秒。StatSlidingWindowBucketCount: 10, // 统计滑动窗口的桶数量。滑动窗口用于更精细地统计请求数据。MaxAllowedRtMs: 50, // 最大允许的响应时间,单位为毫秒。超过这个时间的请求被视为慢请求。Threshold: 0.6, // 触发熔断的阈值。对于 `circuitbreaker.SlowRequestRatio` 策略,这个值表示慢请求的比例。},}// 加载断路器规则if _, err := circuitbreaker.LoadRules(rules); err != nil {log.Fatalf("加载断路器规则失败: %+v", err)}// 模拟请求go func() {for {entry, blockError := sentinel.Entry("test", sentinel.WithTrafficType(base.Outbound))if blockError != nil {// 如果请求被阻塞,随机等待一段时间后重试time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)} else {// 随机模拟业务错误if rand.Uint64()%20 > 9 {sentinel.TraceError(entry, errors.New("biz error"))}// 模拟请求处理时间time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond)entry.Exit()}}}()// 定期输出统计结果go func() {ticker := time.NewTicker(1 * time.Second)for range ticker.C {// 获取当前的统计数据node := stat.GetResourceNode("test")// 打印统计数据fmt.Printf("时间: %v, 请求数: %4v, 通过数: %4v, 限流数: %4v,错误数: %4v, 完成数: %4v, 平均响应时间: %4v\n",time.Now().Format("15:04:05"), // 时间node.GetSum(base.MetricEventPass)+node.GetSum(base.MetricEventBlock), // 总请求数node.GetSum(base.MetricEventPass), // 通过的请求数(请求通过 Sentinel 规则检查)node.GetSum(base.MetricEventBlock), // 被限流的请求数(求被 Sentinel 规则阻止)node.GetSum(base.MetricEventError), // 错误数量(业务错误)node.GetSum(base.MetricEventComplete), // 完成的请求数(无论是否被阻止)node.AvgRT(), // 平均响应时间)}}()// 保持程序运行select {}}
熔断之慢异常比例
package mainimport ("errors""fmt"sentinel "github.com/alibaba/sentinel-golang/api""github.com/alibaba/sentinel-golang/core/base""github.com/alibaba/sentinel-golang/core/circuitbreaker""github.com/alibaba/sentinel-golang/core/stat""log""math/rand""time")// stateChangeTestListener 实现了断路器状态变化的监听器接口。type stateChangeTestListener struct{}// OnTransformToClosed 在断路器状态从其他状态转换为关闭时触发。func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {fmt.Printf("时间: %+v, 断路器策略: %+v, 旧状态: %-8s, 新状态: %-8s\n",time.Now().Format("15:04:05"), rule.Strategy, prev.String(), "Closed")}// OnTransformToOpen 在断路器状态从其他状态转换为打开时触发。func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {fmt.Printf("时间: %+v, 断路器策略: %+v, 旧状态: %-8s, 新状态: %-8s, 快照: %+v\n",time.Now().Format("15:04:05"), rule.Strategy, prev.String(), "Open", snapshot)}// OnTransformToHalfOpen 在断路器状态从其他状态转换为半开时触发。func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {fmt.Printf("时间: %+v, 断路器策略: %+v, 旧状态: %-8s, 新状态: %-8s\n",time.Now().Format("15:04:05"), rule.Strategy, prev.String(), "HalfOpen")}func main() {// 初始化 Sentinelif err := sentinel.InitDefault(); err != nil {log.Fatalf("初始化 Sentinel 失败: %+v", err)}// 注册状态变化监听器circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})// 定义断路器规则rules := []*circuitbreaker.Rule{{Resource: "test", // 资源名称,用于标识受保护的资源。Strategy: circuitbreaker.ErrorRatio, // 断路器策略,定义了如何判断请求是否应该被熔断。在这个例子中,策略为基于错误比例进行熔断。RetryTimeoutMs: 3000, // 重试等待时间,单位为毫秒。当请求被阻塞时,Sentinel 会等待一段时间再重试。MinRequestAmount: 10, // 统计最小请求数量,当请求数量小于该值时不会触发熔断。StatIntervalMs: 1000, // 统计窗口时间,单位为毫秒。StatSlidingWindowBucketCount: 5, // 滑动窗口桶的数量,用于计算统计数据。Threshold: 0.5, // 错误比例阈值,超过该值时,请求会被阻塞。},}// 加载断路器规则if _, err := circuitbreaker.LoadRules(rules); err != nil {log.Fatalf("加载断路器规则失败: %+v", err)}// 模拟请求go func() {for {entry, blockError := sentinel.Entry("test", sentinel.WithTrafficType(base.Outbound))if blockError != nil {// 如果请求被阻塞,随机等待一段时间后重试time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)} else {// 随机模拟业务错误if rand.Uint64()%20 > 14 {sentinel.TraceError(entry, errors.New("biz error"))}// 模拟请求处理时间time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond)entry.Exit()}}}()// 定期输出统计结果go func() {ticker := time.NewTicker(1 * time.Second)for range ticker.C {// 获取当前的统计数据node := stat.GetResourceNode("test")// 打印统计数据fmt.Printf("时间: %v, 请求数: %4v, 通过数: %4v, 限流数: %4v,错误数: %4v, 完成数: %4v\n",time.Now().Format("15:04:05"), // 时间node.GetSum(base.MetricEventPass)+node.GetSum(base.MetricEventBlock), // 总请求数node.GetSum(base.MetricEventPass), // 通过的请求数(请求通过 Sentinel 规则检查)node.GetSum(base.MetricEventBlock), // 被限流的请求数(求被 Sentinel 规则阻止)node.GetSum(base.MetricEventError), // 错误数量(业务错误)node.GetSum(base.MetricEventComplete), // 完成的请求数(无论是否被阻止))}}()// 保持程序运行select {}}
熔断之慢异常数量
package mainimport ("errors""fmt"sentinel "github.com/alibaba/sentinel-golang/api""github.com/alibaba/sentinel-golang/core/base""github.com/alibaba/sentinel-golang/core/circuitbreaker""github.com/alibaba/sentinel-golang/core/stat""log""math/rand""time")// stateChangeTestListener 实现了断路器状态变化的监听器接口。type stateChangeTestListener struct{}// OnTransformToClosed 在断路器状态从其他状态转换为关闭时触发。func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) {fmt.Printf("时间: %+v, 断路器策略: %+v, 旧状态: %-8s, 新状态: %-8s\n",time.Now().Format("15:04:05"), rule.Strategy, prev.String(), "Closed")}// OnTransformToOpen 在断路器状态从其他状态转换为打开时触发。func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) {fmt.Printf("时间: %+v, 断路器策略: %+v, 旧状态: %-8s, 新状态: %-8s, 快照: %+v\n",time.Now().Format("15:04:05"), rule.Strategy, prev.String(), "Open", snapshot)}// OnTransformToHalfOpen 在断路器状态从其他状态转换为半开时触发。func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) {fmt.Printf("时间: %+v, 断路器策略: %+v, 旧状态: %-8s, 新状态: %-8s\n",time.Now().Format("15:04:05"), rule.Strategy, prev.String(), "HalfOpen")}func main() {// 初始化 Sentinelif err := sentinel.InitDefault(); err != nil {log.Fatalf("初始化 Sentinel 失败: %+v", err)}// 注册状态变化监听器circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})// 定义断路器规则rules := []*circuitbreaker.Rule{{Resource: "test", // 资源名称,用于标识受保护的资源。Strategy: circuitbreaker.ErrorCount, // 断路器策略,定义了如何判断请求是否应该被熔断。在这个例子中,策略为基于错误比例进行熔断。RetryTimeoutMs: 3000, // 重试等待时间,单位为毫秒。当请求被阻塞时,Sentinel 会等待一段时间再重试。MinRequestAmount: 10, // 统计最小请求数量,当请求数量小于该值时不会进行统计。StatIntervalMs: 5000, // 统计窗口时间,单位为毫秒。StatSlidingWindowBucketCount: 10, // 滑动窗口桶的数量,用于计算统计数据。Threshold: 40, // 错误比例阈值,超过该值时,请求会被阻塞。},}// 加载断路器规则if _, err := circuitbreaker.LoadRules(rules); err != nil {log.Fatalf("加载断路器规则失败: %+v", err)}// 定期输出统计结果go func() {ticker := time.NewTicker(1 * time.Second)for range ticker.C {// 获取当前的统计数据node := stat.GetResourceNode("test")// 打印统计数据fmt.Printf("时间: %v, 请求数: %4v, 通过数: %4v, 限流数: %4v,错误数: %4v, 完成数: %4v\n",time.Now().Format("15:04:05"), // 时间node.GetSum(base.MetricEventPass)+node.GetSum(base.MetricEventBlock), // 总请求数node.GetSum(base.MetricEventPass), // 通过的请求数(请求通过 Sentinel 规则检查)node.GetSum(base.MetricEventBlock), // 被限流的请求数(求被 Sentinel 规则阻止)node.GetSum(base.MetricEventError), // 错误数量(业务错误)node.GetSum(base.MetricEventComplete), // 完成的请求数(无论是否被阻止))}}()// 模拟请求go func() {for {entry, blockError := sentinel.Entry("test", sentinel.WithTrafficType(base.Outbound))if blockError != nil {// 如果请求被阻塞,随机等待一段时间后重试time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond)} else {// 随机模拟业务错误if rand.Uint64()%20 >= 9 {sentinel.TraceError(entry, errors.New("biz error"))}// 模拟请求处理时间time.Sleep(time.Duration(rand.Uint64()%10+10) * time.Millisecond)entry.Exit()}}}()// 保持程序运行select {}}
4. 并发隔离控制
4.1 概念
基于信号量机制限制资源访问的最大并发数。并发隔离控制可以帮助您限制某个资源的并发访问数,防止资源耗尽。
4.2 结构
定义了 Rule 结构体,包含资源名称、度量类型及阈值。
type Rule struct {ID string `json:"id,omitempty"`Resource string `json:"resource"`MetricType isolation.MetricType `json:"metricType"`Threshold uint32 `json:"threshold"`}
参数解析:
ID:规则的唯一标识(可选)。Resource:资源名称。MetricType:度量类型,目前支持 Concurrency。Threshold:阈值,表示最大并发数。4.3 示例
package mainimport ("errors""fmt""github.com/alibaba/sentinel-golang/core/base""github.com/alibaba/sentinel-golang/core/stat""log""math/rand""time"sentinel "github.com/alibaba/sentinel-golang/api""github.com/alibaba/sentinel-golang/core/isolation")func main() {// 初始化 Sentinelif err := sentinel.InitDefault(); err != nil {log.Fatalf("初始化 Sentinel 失败: %+v", err)}// 定义一个并发隔离规则// 限制名为 "test" 的资源的并发数不超过 10rule := &isolation.Rule{Resource: "test", // 资源名称MetricType: isolation.Concurrency, // 指标类型,这里使用并发数Threshold: 10, // 阈值,即允许的最大并发数}// 加载并发隔离规则到 Sentinelif _, err := isolation.LoadRules([]*isolation.Rule{rule}); err != nil {log.Fatalf("加载并发隔离规则失败: %+v", err)}// 定期输出统计结果go func() {ticker := time.NewTicker(1 * time.Second)for range ticker.C {// 获取当前的统计数据node := stat.GetResourceNode("test")// 打印统计数据fmt.Printf("时间: %v, 请求数: %4v, 通过数: %4v, 限流数: %4v, 错误数: %4v, 完成数: %4v, 并发数: %4v\n",time.Now().Format("15:04:05"), // 时间node.GetSum(base.MetricEventPass)+node.GetSum(base.MetricEventBlock), // 总请求数node.GetSum(base.MetricEventPass), // 通过的请求数(请求通过 Sentinel 规则检查)node.GetSum(base.MetricEventBlock), // 被限流的请求数(求被 Sentinel 规则阻止)node.GetSum(base.MetricEventError), // 错误数量(业务错误)node.GetSum(base.MetricEventComplete), // 完成的请求数(无论是否被阻止)node.CurrentConcurrency(), // 当前并发数)}}()// 创建 12 个 goroutine,每个 goroutine 都会尝试进入 Sentinel 保护的资源 "test"for i := 0; i < 12; i++ {go func() {for {// 尝试进入 Sentinel 保护的资源 "test"// WithBatchCount(1) 表示这次访问计为 1 个请求量entry, blockErr := sentinel.Entry("test", sentinel.WithBatchCount(1))if blockErr != nil {// 随机模拟业务错误if rand.Uint64()%20 >= 19 {sentinel.TraceError(entry, errors.New("biz error"))}// 如果 b 不为 nil,表示没有成功进入,可能是因为超过了并发限制time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)continue} else {// 如果成功进入,执行资源访问逻辑time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)// 退出 Sentinel 保护的资源,释放计数entry.Exit()}}}()}// 保持程序运行select {}}
5. 系统自适应保护
5.1 概述
结合系统负载指标(如 CPU 使用率、Load)和应用入口流量情况,自适应调整流控策略。系统自适应保护可以在系统负载过高时自动降低流量,防止系统崩溃。
5.2 配置
通过 system.SystemRule 设置触发条件和应对策略。这使得 Sentinel 可以根据系统当前状态动态调整流控策略。
// Rule 描述了隔离策略(例如信号量隔离)。type Rule struct {ID string `json:"id,omitempty"` // ID 表示规则的唯一标识(可选)。Resource string `json:"resource"` // Resource 表示目标资源定义。MetricType system.MetricType `json:"metricType"` // MetricType 指示检查逻辑的度量类型。目前支持 Concurrency 用于并发限制。Threshold uint32 `json:"threshold"` // Threshold 表示阈值。}
参数解析:
MetricType:度量类型,如 Load。TriggerCount:触发阈值。Strategy:策略,如 BBR(Bottleneck Bandwidth and RTT)。5.3 示例
系统负载
package mainimport ("fmt""github.com/alibaba/sentinel-golang/core/base""github.com/alibaba/sentinel-golang/core/stat""github.com/alibaba/sentinel-golang/core/system_metric""log""math/rand""time"sentinel "github.com/alibaba/sentinel-golang/api""github.com/alibaba/sentinel-golang/core/config""github.com/alibaba/sentinel-golang/core/system")// core/system/slot_test.gofunc main() {// 创建默认配置对象conf := config.NewDefaultConfig()// 初始化 Sentinelerr := sentinel.InitWithConfig(conf)if err != nil {log.Fatalf("初始化 Sentinel 失败: %+v", err)}// 初始化系统规则_, err = system.LoadRules([]*system.Rule{{MetricType: system.Load, // 指标类型为 LoadTriggerCount: 0.5, // 触发阈值为 0.5Strategy: system.BBR, // 策略为 BBR},})if err != nil {log.Fatalf("初始化系统规则失败: %+v", err)}// 定期输出统计结果go func() {ticker := time.NewTicker(1 * time.Second)for range ticker.C {// 获取当前的统计数据node := stat.GetResourceNode("test")// 打印统计数据fmt.Printf("时间: %v, 请求数: %4v, 通过数: %4v, 限流数: %4v, 错误数: %4v, 完成数: %4v, 系统负载: %v\n",time.Now().Format("15:04:05"), // 时间node.GetSum(base.MetricEventPass)+node.GetSum(base.MetricEventBlock), // 总请求数node.GetSum(base.MetricEventPass), // 通过的请求数(请求通过 Sentinel 规则检查)node.GetSum(base.MetricEventBlock), // 被限流的请求数(求被 Sentinel 规则阻止)node.GetSum(base.MetricEventError), // 错误数量(业务错误)node.GetSum(base.MetricEventComplete), // 完成的请求数(无论是否被阻止)system_metric.CurrentLoad(), // 当前并发数)}}()// 启动请求模拟// 每个人机器配置不同,参数自调for i := 0; i < 35000; i++ {go func() {for {// 尝试获取资源入口e, b := sentinel.Entry("test", sentinel.WithTrafficType(base.Inbound))if b != nil {// 请求被限流time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) // 模拟处理时间} else {// 请求通过time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond) // 模拟处理时间e.Exit() // 退出资源入口}}}()}// 保持程序运行select {}}
6. 热点参数流控
6.1 概述
针对具有高流量特性的特定参数进行更精细的流量控制。热点参数流控可以帮助您识别和控制那些可能引发系统瓶颈的特定参数。
6.2 配置
指定参数名及其对应的流控规则。这使得您可以对特定参数进行更加精细化的流量控制。
type Rule struct {ID string `json:"id,omitempty"` // 规则的唯一标识(可选)Resource string `json:"resource"` // 目标资源定义,表示该规则适用的资源类型MetricType hotspot.MetricType `json:"metricType"` // 度量类型,用于确定何时触发策略ControlBehavior hotspot.ControlBehavior `json:"controlBehavior"` // 控制行为,定义了在触发条件满足时应采取的行动ParamIndex int `json:"paramIndex"` // 参数索引,表示在方法调用中用于限流的参数位置Threshold int64 `json:"threshold"` // 阈值,表示资源的最大允许值或触发条件MaxQueueingTimeMs int64 `json:"maxQueueingTimeMs"` // 最大排队时间,单位为毫秒,表示请求在队列中的最大等待时间BurstCount int64 `json:"burstCount"` // 突发流量计数,表示允许的突发流量数量DurationInSec int64 `json:"durationInSec"` // 统计窗口持续时间,单位为秒,表示统计周期ParamsMaxCapacity int64 `json:"paramsMaxCapacity"` // 参数最大容量,表示参数集合的最大容量SpecificItems map[interface{}]int64 `json:"specificItems"` // 特定项目,表示特定参数值及其对应的阈值}
参数解析:
ID:规则的唯一标识(可选)。Resource:资源名称。MetricType:度量类型,如 QPS 或 Concurrency。ControlBehavior:控制行为,如 Reject 或 Throttling。ParamIndex:热点参数的索引。Threshold:阈值。MaxQueueingTimeMs:最大排队等待时间(仅在 Throttling 模式下有效)。BurstCount:静默值(仅在 Reject 模式下有效)。DurationInSec:统计周期(秒)。ParamsMaxCapacity:统计结构的容量最大值。SpecificItems:特定参数的特殊阈值配置。6.3 示例
热点参数之并发数
package mainimport ("fmt"sentinel "github.com/alibaba/sentinel-golang/api""github.com/alibaba/sentinel-golang/core/base""github.com/alibaba/sentinel-golang/core/config""github.com/alibaba/sentinel-golang/core/hotspot""github.com/alibaba/sentinel-golang/core/stat""log""math/rand""time")func main() {// 创建默认配置对象conf := config.NewDefaultConfig()// 初始化 Sentinelerr := sentinel.InitWithConfig(conf)if err != nil {log.Fatalf("初始化 Sentinel 失败: %+v", err)}// 初始化热点参数规则_, err = hotspot.LoadRules([]*hotspot.Rule{{Resource: "test", // 资源名称MetricType: hotspot.Concurrency, // 热点参数类型:并发ParamIndex: 0, // 热点参数在参数列表中的位置,从 0 开始ParamKey: "testKey", // 热点参数名称Threshold: 9, // 热点参数的阈值},})if err != nil {log.Fatalf("初始化热点参数规则失败: %+v", err)}// 模拟并发请求for i := 0; i < 10; i++ {go func() {for {// 尝试获取资源入口e, b := sentinel.Entry("test", // 设置参数sentinel.WithArgs(true, uint32(9), sentinel.WithAttachments(map[interface{}]interface{}{"testKey": rand.Uint64() % 10})))if b != nil {// 请求被限流time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) // 模拟处理时间} else {// 请求通过time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond) // 模拟处理时间e.Exit() // 退出资源入口}}}()}// 每秒输出统计信息// 使用 time.Ticker 每秒输出一次统计信息,并重置统计信息。go func() {ticker := time.NewTicker(time.Second)defer ticker.Stop()for range ticker.C {// 获取当前的统计数据node := stat.GetResourceNode("test")// 打印统计数据fmt.Printf("请求时间: %v, 请求数: %4v, 通过数: %4v, 限流数: %4v, 完成数: %4v, 并发数: %4v\n",time.Now().Format("15:04:05"), // 当前时间node.GetSum(base.MetricEventPass)+node.GetSum(base.MetricEventBlock), // 总请求数node.GetSum(base.MetricEventPass), // 通过的请求数node.GetSum(base.MetricEventBlock), // 被限流的请求数node.GetSum(base.MetricEventComplete), // 完成的请求数node.CurrentConcurrency(), // 当前并发数)}}()// 保持程序运行select {}}
热点参数之QPS数
package mainimport ("fmt""log""math/rand/v2""time""github.com/alibaba/sentinel-golang/core/base""github.com/alibaba/sentinel-golang/core/stat"sentinel "github.com/alibaba/sentinel-golang/api""github.com/alibaba/sentinel-golang/core/config""github.com/alibaba/sentinel-golang/core/hotspot")type fooStruct struct {n int64}func main() {// 创建默认配置对象conf := config.NewDefaultConfig()// 初始化 Sentinelerr := sentinel.InitWithConfig(conf)if err != nil {log.Fatalf("初始化 Sentinel 失败: %+v", err)}// 初始化热点参数规则_, err = hotspot.LoadRules([]*hotspot.Rule{{Resource: "test01", // 资源名称MetricType: hotspot.QPS, // 热点参数类型:QPSControlBehavior: hotspot.Reject, // 拒绝策略ParamKey: "testKey", // 热点参数名称Threshold: 100, // 热点参数的阈值BurstCount: 0, // 预热时间窗口的请求数量DurationInSec: 1, // 热点参数的预热时间窗口,单位为秒},{Resource: "test02", // 资源名称MetricType: hotspot.QPS, // 热点参数类型:QPSControlBehavior: hotspot.Reject, // 拒绝策略ParamIndex: 1, // 热点参数在参数列表中的位置,从 0 开始Threshold: 100, // 热点参数的阈值BurstCount: 0, // 预热时间窗口的请求数量DurationInSec: 1, // 热点参数的预热时间窗口,单位为秒},})if err != nil {log.Fatalf("初始化热点参数规则失败: %+v", err)}// 每秒输出统计信息// 使用 time.Ticker 每秒输出一次统计信息,并重置统计信息。go func() {ticker := time.NewTicker(time.Second)defer ticker.Stop()for range ticker.C {for _, resource := range []string{"test01", "test02"} {// 获取当前的统计数据node := stat.GetOrCreateResourceNode(resource, base.ResTypeCommon)// 打印统计数据fmt.Printf("请求时间: %v, 资源数: %4v, 请求数: %4v, 通过数: %4v, 限流数: %4v, 完成数: %4v, 并发数: %4v\n",time.Now().Format("15:04:05"), // 当前时间node.ResourceName(), // 资源名称node.GetQPS(base.MetricEventPass)+node.GetQPS(base.MetricEventBlock), // 总请求数node.GetQPS(base.MetricEventPass), // 通过的请求数node.GetQPS(base.MetricEventBlock), // 被限流的请求数node.GetQPS(base.MetricEventComplete), // 完成的请求数node.CurrentConcurrency(), // 当前并发数)}}}()// 模拟并发请求for i := 0; i < 10; i++ {go func() {for {// 尝试获取资源入口e, b := sentinel.Entry("test01", // 设置参数sentinel.WithArgs(true, uint32(9), sentinel.WithAttachments(map[interface{}]interface{}{"testKey": rand.Uint64() % 10})))if b != nil {// 请求被限流time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) // 模拟处理时间} else {// 请求通过time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond) // 模拟处理时间e.Exit() // 退出资源入口}}}()}// 模拟并发请求for i := 0; i < 10; i++ {go func() {for {// 尝试获取资源入口e, b := sentinel.Entry("test02", sentinel.WithArgs(true, rand.Uint32()%30, "sentinel"))if b != nil {// 请求被限流time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) // 模拟处理时间} else {// 请求通过time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond) // 模拟处理时间e.Exit() // 退出资源入口}}}()}// 阻塞主线程select {}}
7. 动态数据源使用
7.1 概述
支持从外部配置中心(如 etcd, Nacos, Consul 等)动态获取规则配置。动态数据源可以使您的配置更加灵活,无需重启服务即可更新规则。
7.2 优势
灵活性:动态调整规则,无需重启服务。可维护性:集中管理规则,减少硬编码带来的不便。实时性:快速响应配置变更,提高系统的响应速度
8. 通用配置
8.1 概述
提供了多个配置项,比如日志路径、统计窗口大小等。这些配置项可以帮助您更好地管理和监控 Sentinel 的运行状态。
8.2 优先级
环境变量 > YAML 文件配置 > 默认配置。
8.3 示例
典型的 YAML 配置文件模板。
version: v1sentinel: app: name: my-app type: 0 log: dir: ${user.home}/logs/csp usePid: false metric: maxFileCount: 8 singleFileMaxSize: 50MB flushIntervalSec: 1 stat: globalStatisticSampleCountTotal: 20 globalStatisticIntervalMsTotal: 10000 metricStatisticSampleCount: 2 metricStatisticIntervalMs: 1000 system: collectIntervalMs: 1000
参数解析:
version:配置版本。sentinel.app.name:项目名称。sentinel.app.type:项目类型。sentinel.log.dir:日志路径。sentinel.log.usePid:监控日志文件名是否带上进程 PID。sentinel.log.metric.maxFileCount:监控日志最大文件数目。sentinel.log.metric.singleFileMaxSize:监控日志单个文件大小上限。sentinel.log.metric.flushIntervalSec:监控日志聚合和刷盘的时间频率。sentinel.stat.globalStatisticSampleCountTotal:全局滑动窗口的统计格子数。sentinel.stat.globalStatisticIntervalMsTotal:全局滑动窗口的间隔时间(毫秒)。sentinel.stat.metricStatisticSampleCount:指标滑动窗口的统计格子数。sentinel.stat.metricStatisticIntervalMs:指标滑动窗口的间隔时间(毫秒)。sentinel.stat.system.collectIntervalMs:系统指标采集间隔时间(毫秒)。
总结
通过本文,我们详细介绍了如何使用 Sentinel Go 实现服务的流量控制、熔断降级、并发隔离控制、系统自适应保护、热点参数流控以及动态数据源使用。每个部分都包含了详细的参数解析、最佳实践和示例代码,帮助您更好地理解和应用这些功能。希望这些信息能帮助您构建更加健壮和稳定的分布式系统。如果您有任何问题或需要进一步的帮助,请随时查阅官方文档或社区资源。