31-数据流:通过iam-authz-server设计,看数据流服务的设计

 IAM数据流服务iam-authz-server的设计和实现。

 

iam-authz-server的功能介绍

iam-authz-server目前的唯一功能,是通过提供 /v1/authz RESTful API接口完成资源授权。 /v1/authz 接口是通过github.com/ory/ladon来完成资源授权的。

因为iam-authz-server承载了数据流的请求,需要确保API接口具有较高的性能。为了保证API接口的性能,iam-authz-server在设计上使用了大量的缓存技术

github.com/ory/ladon包介绍

 

 

Ladon解决了这个问题:在特定的条件下,谁能够/不能够对哪些资源做哪些操作。为了解决这个问题,Ladon引入了授权策略。 Ladon可以用请求的上下文,去匹配设置的授权策略,最终判断出当前授权请求是否通过。下面是一个Ladon的授权策略样例:

{
  "description": "One policy to rule them all.",
  "subjects": ["users:<peter|ken>", "users:maria", "groups:admins"],
  "actions" : ["delete", "<create|update>"],
  "effect": "allow",
  "resources": [
    "resources:articles:<.*>",
    "resources:printer"
  ],
  "conditions": {
    "remoteIP": {
        "type": "CIDRCondition",
        "options": {
            "cidr": "192.168.0.1/16"
        }
    }
  }
}

策略(Policy)由若干元素构成, 。核心元素包括主题(Subject)、操作(Action)、效力(Effect)、资源(Resource)以及生效条件(Condition)。 对于没有特定约束条件的策略,Condition元素是可选项。一条策略包含下面6个元素:

  • 主题(Subject),主题名是唯一的,代表一个授权主题。例如,“ken” or “printer-service.mydomain.com”。
  • 操作(Action),描述允许或拒绝的操作。
  • 效力(Effect),描述策略产生的结果是“允许”还是“拒绝”,包括 allow(允许)和 deny(拒绝)。
  • 资源(Resource),描述授权的具体数据。
  • 生效条件(Condition),描述策略生效的约束条件。
  • 描述(Description),策略的描述。

有了授权策略,我们就可以传入请求上下文,由Ladon来决定请求是否能通过授权。下面是一个请求示例:

{
  "subject": "users:peter",
  "action" : "delete",
  "resource": "resources:articles:ladon-introduction",
  "context": {
    "remoteIP": "192.168.0.5"
  }
}

可以看到,在 remoteIP="192.168.0.5" 生效条件(Condition)下,针对主题(Subject) users:peter 对资源(Resource) resources:articles:ladon-introduction 的 delete 操作(Action),授权策略的效力(Effect)是 allow 的。所以Ladon会返回如下结果:

{
    "allowed": true
}

Ladon支持很多Condition,具体见下表:

至于如何使用这些Condition,你可以参考 Ladon Condition使用示例。此外,Ladon还支持自定义Condition。

另外,Ladon还支持授权审计,用来记录授权历史。我们可以通过在ladon.Ladon中附加一个ladon.AuditLogger来实现:

import "github.com/ory/ladon"
import manager "github.com/ory/ladon/manager/memory"

func main() {

    warden := ladon.Ladon{
        Manager: manager.NewMemoryManager(),
        AuditLogger: &ladon.AuditLoggerInfo{}
    }

    // ...
}

 AuditLogger是一个interface:

// AuditLogger tracks denied and granted authorizations.
type AuditLogger interface {
    LogRejectedAccessRequest(request *Request, pool Policies, deciders Policies)
    LogGrantedAccessRequest(request *Request, pool Policies, deciders Policies)
}

 我们可以实现一个AuditLogger,将授权日志保存到Redis或者MySQL中。

Ladon支持跟踪一些授权指标,比如 deny、allow、not match、error。你可以通过实现ladon.Metric接口,来对这些指标进行处理。ladon.Metric接口定义如下:

// Metric is used to expose metrics about authz
type Metric interface {
    // RequestDeniedBy is called when we get explicit deny by policy
    RequestDeniedBy(Request, Policy)
    // RequestAllowedBy is called when a matching policy has been found.
    RequestAllowedBy(Request, Policies)
    // RequestNoMatch is called when no policy has matched our request
    RequestNoMatch(Request)
    // RequestProcessingError is called when unexpected error occured
    RequestProcessingError(Request, Policy, error)
}

例如,你可以通过下面的示例,将这些指标暴露给prometheus:

type prometheusMetrics struct{}

func (mtr *prometheusMetrics) RequestDeniedBy(r ladon.Request, p ladon.Policy) {}
func (mtr *prometheusMetrics) RequestAllowedBy(r ladon.Request, policies ladon.Policies) {}
func (mtr *prometheusMetrics) RequestNoMatch(r ladon.Request) {}
func (mtr *prometheusMetrics) RequestProcessingError(r ladon.Request, err error) {}

func main() {

    warden := ladon.Ladon{
        Manager: manager.NewMemoryManager(),
        Metric:  &prometheusMetrics{},
    }

    // ...
}

在使用Ladon的过程中,有两个地方需要你注意:

  • 所有检查都区分大小写,因为主题值可能是区分大小写的ID。
  • 如果ladon.Ladon无法将策略与请求匹配,会默认授权结果为拒绝,并返回错误。

iam-authz-server使用方法介绍

第一步,登陆iam-apiserver,创建授权策略和密钥。

这一步又分为3个小步骤。

  1. 登陆iam-apiserver系统,获取访问令牌:
$ token=`curl -s -XPOST -H'Content-Type: application/json' -d'{"username":"admin","password":"Admin@2021"}' http://127.0.0.1:8080/login | jq -r .token`
  1. 创建授权策略:
$ curl -s -XPOST -H"Content-Type: application/json" -H"Authorization: Bearer $token" -d'{"metadata":{"name":"authztest"},"policy":{"description":"One policy to rule them all.","subjects":["users:<peter|ken>","users:maria","groups:admins"],"actions":["delete","<create|update>"],"effect":"allow","resources":["resources:articles:<.*>","resources:printer"],"conditions":{"remoteIP":{"type":"CIDRCondition","options":{"cidr":"192.168.0.1/16"}}}}}' http://127.0.0.1:8080/v1/policies
  1. 创建密钥,并从请求结果中提取secretID 和 secretKey:
$ curl -s -XPOST -H"Content-Type: application/json" -H"Authorization: Bearer $token" -d'{"metadata":{"name":"authztest"},"expires":0,"description":"admin secret"}' http://127.0.0.1:8080/v1/secrets
{"metadata":{"id":23,"name":"authztest","createdAt":"2021-04-08T07:24:50.071671422+08:00","updatedAt":"2021-04-08T07:24:50.071671422+08:00"},"username":"admin","secretID":"ZuxvXNfG08BdEMqkTaP41L2DLArlE6Jpqoox","secretKey":"7Sfa5EfAPIwcTLGCfSvqLf0zZGCjF3l8","expires":0,"description":"admin secret"}

第二步,生成访问 iam-authz-server的 token。

iamctl 提供了 jwt sigin 子命令,可以根据 secretID 和 secretKey 签发 Token,方便使用。

$ iamctl jwt sign ZuxvXNfG08BdEMqkTaP41L2DLArlE6Jpqoox 7Sfa5EfAPIwcTLGCfSvqLf0zZGCjF3l8 # iamctl jwt sign $secretID $secretKey
eyJhbGciOiJIUzI1NiIsImtpZCI6Ilp1eHZYTmZHMDhCZEVNcWtUYVA0MUwyRExBcmxFNkpwcW9veCIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJpYW0uYXV0aHoubWFybW90ZWR1LmNvbSIsImV4cCI6MTYxNzg0NTE5NSwiaWF0IjoxNjE3ODM3OTk1LCJpc3MiOiJpYW1jdGwiLCJuYmYiOjE2MTc4Mzc5OTV9.za9yLM7lHVabPAlVQLCqXEaf8sTU6sodAsMXnmpXjMQ

你可以通过 iamctl jwt show <token> 来查看Token的内容:

$ iamctl jwt show eyJhbGciOiJIUzI1NiIsImtpZCI6Ilp1eHZYTmZHMDhCZEVNcWtUYVA0MUwyRExBcmxFNkpwcW9veCIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJpYW0uYXV0aHoubWFybW90ZWR1LmNvbSIsImV4cCI6MTYxNzg0NTE5NSwiaWF0IjoxNjE3ODM3OTk1LCJpc3MiOiJpYW1jdGwiLCJuYmYiOjE2MTc4Mzc5OTV9.za9yLM7lHVabPAlVQLCqXEaf8sTU6sodAsMXnmpXjMQ
Header:
{
    "alg": "HS256",
    "kid": "ZuxvXNfG08BdEMqkTaP41L2DLArlE6Jpqoox",
    "typ": "JWT"
}
Claims:
{
    "aud": "iam.authz.marmotedu.com",
    "exp": 1617845195,
    "iat": 1617837995,
    "iss": "iamctl",
    "nbf": 1617837995
}

我们生成的Token包含了下面这些信息。

Header

  • alg:生成签名的算法。
  • kid:密钥ID。
  • typ:Token的类型,这里是JWT。

Claims

  • aud:JWT Token的接受者。
  • exp:JWT Token的过期时间(UNIX时间格式)。
  • iat:JWT Token的签发时间(UNIX时间格式)。
  • iss:签发者,因为我们是用 iamctl 工具签发的,所以这里的签发者是 iamctl。
  • nbf:JWT Token的生效时间(UNIX时间格式),默认是签发时间。

第三步,调用/v1/authz接口完成资源授权请求。

请求方法如下:

$ curl -s -XPOST -H'Content-Type: application/json' -H'Authorization: Bearer eyJhbGciOiJIUzI1NiIsImtpZCI6Ilp1eHZYTmZHMDhCZEVNcWtUYVA0MUwyRExBcmxFNkpwcW9veCIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJpYW0uYXV0aHoubWFybW90ZWR1LmNvbSIsImV4cCI6MTYxNzg0NTE5NSwiaWF0IjoxNjE3ODM3OTk1LCJpc3MiOiJpYW1jdGwiLCJuYmYiOjE2MTc4Mzc5OTV9.za9yLM7lHVabPAlVQLCqXEaf8sTU6sodAsMXnmpXjMQ' -d'{"subject":"users:maria","action":"delete","resource":"resources:articles:ladon-introduction","context":{"remoteIP":"192.168.0.5"}}' http://127.0.0.1:9090/v1/authz
{"allowed":true}

如果授权通过,会返回:{"allowed":true} 。 如果授权失败,则返回:

{"allowed":false,"denied":true,"reason":"Request was denied by default"}

iam-authz-server的代码实现

 

iam-authz-server的配置处理

iam-authz-server服务的main函数位于authzserver.go文件中,  iam-authz-server的服务框架设计跟iam-apiserver的服务框架设计保持一致,也是有3种配置:Options配置、组件配置和HTTP服务配置。

Options配置见options.go文件:

type Options struct {
    RPCServer               string
    ClientCA                string
    GenericServerRunOptions *genericoptions.ServerRunOptions
    InsecureServing         *genericoptions.InsecureServingOptions
    SecureServing           *genericoptions.SecureServingOptions
    RedisOptions            *genericoptions.RedisOptions
    FeatureOptions          *genericoptions.FeatureOptions
    Log                     *log.Options
    AnalyticsOptions        *analytics.AnalyticsOptions
}

 

iam-apiserver和iam-authz-server共用了GenericServerRunOptions、InsecureServing、SecureServing、FeatureOptions、RedisOptions、Log这些配置。  这也是命令行参数分组带来的好处:批量共享。

iam-authz-server启动流程设计

接下来,我们来详细看下iam-authz-server的启动流程。

 ,和iam-apiserver相比,iam-authz-server只提供了REST API服务。启动流程如下图所示:

iam-authz-server 的 RESTful API请求处理流程

iam-authz-server的请求处理流程也是清晰、规范的,具体流程如下图所示:

首先,我们通过API调用(<HTTP Method> + <HTTP Request Path>)请求iam-authz-server提供的RESTful API接口 POST /v1/authz 。

接着,Gin Web框架接收到HTTP请求之后,会通过认证中间件完成请求的认证,iam-authz-server采用了Bearer认证方式。

然后,请求会被我们加载的一系列中间件所处理,例如跨域、RequestID、Dump等中间件。

最后,根据<HTTP Method> + <HTTP Request Path>进行路由匹配。

比如,我们请求的RESTful API是POST /v1/authz,Gin Web框架会根据 HTTP Method 和 HTTP Request Path,查找注册的Controllers,最终匹配到 authzController.Authorize Controller。在 Authorize Controller中,会先解析请求参数,接着校验请求参数、调用业务层的方法进行资源授权,最后处理业务层的返回结果,返回最终的 HTTP 请求结果。

iam-authz-server的代码架构

iam-authz-server的代码设计和iam-apiserver一样,遵循简洁架构设计。

iam-authz-server的代码架构也分为4层,分别是模型层(Models)、控制层(Controller)、业务层 (Service)和仓库层(Repository)。 

iam-authz-server 和 iam-apiserver 的代码架构有这三点不同:

  • iam-authz-server客户端不支持前端和命令行。
  • iam-authz-server仓库层对接的是iam-apiserver微服务,而非数据库。
  • iam-authz-server业务层的代码存放在目录authorization中。

iam-authz-server关键代码分析

 

资源授权

先来看下,iam-authz-server是如何实现资源授权的。

我们可以调用iam-authz-server的 /v1/authz API接口,实现资源的访问授权。 /v1/authz 对应的controller方法是Authorize:

func (a *AuthzController) Authorize(c *gin.Context) {
	var r ladon.Request
	if err := c.ShouldBind(&r); err != nil {
		core.WriteResponse(c, errors.WithCode(code.ErrBind, err.Error()), nil)

		return
	}

	auth := authorization.NewAuthorizer(authorizer.NewAuthorization(a.store))
	if r.Context == nil {
		r.Context = ladon.Context{}
	}

	r.Context["username"] = c.GetString("username")
	rsp := auth.Authorize(&r)

	core.WriteResponse(c, nil, rsp)
}

该函数使用 github.com/ory/ladon 包进行资源访问授权,授权流程如下图所示:

具体分为以下几个步骤:

第一步,在Authorize方法中调用 c.ShouldBind(&r) ,将API请求参数解析到 ladon.Request 类型的结构体变量中。

第二步,调用authorization.NewAuthorizer函数,该函数会创建并返回包含Manager和AuditLogger字段的Authorizer类型的变量。

Manager包含一些函数,比如 Create、Update和FindRequestCandidates等,用来对授权策略进行增删改查。AuditLogger包含 LogRejectedAccessRequest 和 LogGrantedAccessRequest 函数,分别用来记录被拒绝的授权请求和被允许的授权请求,将其作为审计数据使用。

第三步,调用auth.Authorize函数,对请求进行访问授权。auth.Authorize函数内容如下:

func (a *Authorizer) Authorize(request *ladon.Request) *authzv1.Response {
	log.Debug("authorize request", log.Any("request", request))

	if err := a.warden.IsAllowed(request); err != nil {
		return &authzv1.Response{
			Denied: true,
			Reason: err.Error(),
		}
	}

	return &authzv1.Response{
		Allowed: true,
	}
}

该函数会调用 a.warden.IsAllowed(request) 完成资源访问授权。IsAllowed函数会调用 FindRequestCandidates(r) 查询所有的策略列表, 在Authorize函数中,我们将username存入ladon Request的context中:

r.Context["username"] = c.GetHeader("username")

在FindRequestCandidates函数中,我们可以从Request中取出username,并根据username查询缓存中的policy列表,FindRequestCandidates实现如下:

func (m *PolicyManager) FindRequestCandidates(r *ladon.Request) (ladon.Policies, error) {
		username := ""
	
		if user, ok := r.Context["username"].(string); ok {
			username = user
		}
	
		policies, err := m.client.List(username)
		if err != nil {
			return nil, errors.Wrap(err, "list policies failed")
		}
	
		ret := make([]ladon.Policy, 0, len(policies))
		for _, policy := range policies {
			ret = append(ret, policy)
		}
	
		return ret, nil
	}

IsAllowed函数代码如下:

func (l *Ladon) IsAllowed(r *Request) (err error) {
    policies, err := l.Manager.FindRequestCandidates(r)
    if err != nil {
        go l.metric().RequestProcessingError(*r, nil, err)
        return err
    }

    return l.DoPoliciesAllow(r, policies)
}

 

缓存设计

iam-authz-server主要用来做资源访问授权,属于数据流的组件,对接口访问性能有比较高的要求,所以该组件采用了缓存的机制。如下图所示:

iam-authz-server组件通过缓存密钥和授权策略信息到内存中,加快密钥和授权策略的查询速度。通过缓存授权记录到内存中,提高了授权数据的写入速度,从而大大降低了授权请求接口的延时。

上面的缓存机制用到了Redis key-value存储,所以在iam-authz-server初始化阶段,需要先建立Redis连接(位于initialize函数中):

go storage.ConnectToRedis(ctx, s.buildStorageConfig())

 

先来看下密钥和策略缓存。

iam-authz-server通过load包来完成密钥和策略的缓存。

在iam-authz-server进程启动时,会创建并启动一个Load服务(位于initialize函数中):

load.NewLoader(ctx, cacheIns).Start() 

先来看创建Load服务。创建Load服务时,传入了cacheIns参数,cacheIns是一个实现了Loader接口的实例:

type Loader interface {
    Reload() error
}

然后看启动Load服务。通过Load实例的 Start 方法来启动Load服务:

func (l *Load) Start() {
    go startPubSubLoop()
    go l.reloadQueueLoop()
    go l.reloadLoop()

    l.DoReload()
}

Start函数先启动了3个协程,再调用 l.DoReload() 完成一次密钥和策略的同步:

func (l *Load) DoReload() {
    l.lock.Lock()
    defer l.lock.Unlock()

    if err := l.loader.Reload(); err != nil {
        log.Errorf("faild to refresh target storage: %s", err.Error())
    }

    log.Debug("refresh target storage succ")
}

 

我们再来看下,startPubSubLoop、reloadQueueLoop、reloadLoop 这3个Go协程分别完成了什么功能。

  1. startPubSubLoop协程

startPubSubLoop函数通过StartPubSubHandler函数,订阅Redis的 iam.cluster.notifications channel,并注册一个回调函数:

func(v interface{}) {
    handleRedisEvent(v, nil, nil)
}

handleRedisEvent函数中,会将消息解析为Notification类型的消息,并判断Command的值。如果是NoticePolicyChanged或NoticeSecretChanged,就会向 reloadQueue channel中写入一个回调函数。 reloadQueue 主要用来告诉程序,需要完成一次密钥和策略的同步。

  1. reloadQueueLoop协程

reloadQueueLoop函数会监听 reloadQueue ,当发现有新的消息(这里是回调函数)写入时,会实时将消息缓存到 requeue 切片中,代码如下:

func (l *Load) reloadQueueLoop(cb ...func()) {
		for {
			select {
			case <-l.ctx.Done():
				return
			case fn := <-reloadQueue:
				requeueLock.Lock()
				requeue = append(requeue, fn)
				requeueLock.Unlock()
				log.Info("Reload queued")
				if len(cb) != 0 {
					cb[0]()
				}
			}
		}
	}
  1. reloadLoop协程

通过reloadLoop函数启动一个timer定时器,每隔1秒会检查 requeue 切片是否为空,如果不为空,则调用 l.DoReload 方法,从iam-apiserver中拉取密钥和策略,并缓存在内存中。

密钥和策略的缓存模型如下图所示:

密钥和策略缓存的具体流程如下:

接收上游消息(这里是从Redis中接收),将消息缓存到切片或者带缓冲的channel中,并启动一个消费协程去消费这些消息。这里的消费协程是reloadLoop,reloadLoop会每隔1s判断 requeue 切片是否长度为0,如果不为0,则执行 l.DoReload() 缓存密钥和策略。

讲完了密钥和策略缓存,看下授权日志缓存。

在启动iam-authz-server时,还会启动一个Analytics服务,代码如下(位于internal/authzserver/authzserver.go文件中):

    if s.analyticsOptions.Enable {    
        analyticsStore := storage.RedisCluster{KeyPrefix: RedisKeyPrefix}    
        analyticsIns := analytics.NewAnalytics(s.analyticsOptions, &analyticsStore)    
        analyticsIns.Start()    
        s.gs.AddShutdownCallback(shutdown.ShutdownFunc(func(string) error {    
            analyticsIns.Stop()    
    
            return nil    
        }))    
    }

NewAnalytics函数会根据配置,创建一个Analytics实例:

func NewAnalytics(options *AnalyticsOptions, store storage.AnalyticsHandler) *Analytics {
		ps := options.PoolSize
		recordsBufferSize := options.RecordsBufferSize
		workerBufferSize := recordsBufferSize / uint64(ps)
		log.Debug("Analytics pool worker buffer size", log.Uint64("workerBufferSize", workerBufferSize))
	
		recordsChan := make(chan *AnalyticsRecord, recordsBufferSize)
	
		return &Analytics{
			store:                      store,
			poolSize:                   ps,
			recordsChan:                recordsChan,
			workerBufferSize:           workerBufferSize,
			recordsBufferFlushInterval: options.FlushInterval,
		}
	} 

上面的代码创建了一个带缓冲的 recordsChan :

recordsChan := make(chan *AnalyticsRecord, recordsBufferSize)

recordsChan 存放的数据类型为AnalyticsRecord,缓冲区的大小为 recordsBufferSize (通过 --analytics.records-buffer-size 选项指定)。可以通过RecordHit函数,向recordsChan 中写入 AnalyticsRecord 类型的数据:

func (r *Analytics) RecordHit(record *AnalyticsRecord) error {                                                         
    // check if we should stop sending records 1st                                                                     
    if atomic.LoadUint32(&r.shouldStop) > 0 {                                                                          
        return nil                                                                                                     
    }                                                                                                                  
                                                                                                                       
    // just send record to channel consumed by pool of workers                                                         
    // leave all data crunching and Redis I/O work for pool workers                                                    
    r.recordsChan <- record                                                                                            
                                                                                                                       
    return nil                                                                                                         
}   

iam-authz-server是通过调用 LogGrantedAccessRequest 和 LogRejectedAccessRequest 函数来记录授权日志的。在记录授权日志时,会将授权日志写入 recordsChan channel中。LogGrantedAccessRequest函数代码如下:

func (auth *Authorization) LogGrantedAccessRequest(r *ladon.Request, p ladon.Policies, d ladon.Policies) {
    conclusion := fmt.Sprintf("policies %s allow access", joinPoliciesNames(d))                               
    rstring, pstring, dstring := convertToString(r, p, d)                          
    record := analytics.AnalyticsRecord{                     
        TimeStamp:  time.Now().Unix(),                              
        Username:   r.Context["username"].(string),                 
        Effect:     ladon.AllowAccess,                       
        Conclusion: conclusion,                              
        Request:    rstring,       
        Policies:   pstring,                                                   
        Deciders:   dstring,                                                   
    }                           
                           
    record.SetExpiry(0)
    _ = analytics.GetAnalytics().RecordHit(&record)         
} 

上面的代码,会创建AnalyticsRecord类型的结构体变量,并调用RecordHit将变量的值写入 recordsChan channel中。将授权日志写入 recordsChan   channel中,而不是直接写入Redis中,这可以大大减少写入延时,减少接口的响应延时。

还有一个worker进程从recordsChan中读取数据,并在数据达到一定阈值之后,批量写入Redis中。在Start函数中,我们创建了一批worker,worker个数可以通过 --analytics.pool-size 来指定 。Start函数内容如下:

func (r *Analytics) Start() {
		analytics = r
		r.store.Connect()
	
		// start worker pool
		atomic.SwapUint32(&r.shouldStop, 0)
		for i := 0; i < r.poolSize; i++ {
			r.poolWg.Add(1)
			go r.recordWorker()
		}
	
		// stop analytics workers
		go r.Stop()
	}

上面的代码通过 go r.recordWorker() 创建了 由poolSize 指定个数的recordWorker(worker),recordWorker函数会从 recordsChan 中读取授权日志并存入recordsBuffer中,recordsBuffer的大小为workerBufferSize,workerBufferSize计算公式为:

ps := options.PoolSize
recordsBufferSize := options.RecordsBufferSize
workerBufferSize := recordsBufferSize / uint64(ps)

其中,options.PoolSize由命令行参数 --analytics.pool-size 指定,代表worker 的个数,默认 50;options.RecordsBufferSize由命令行参数 --analytics.records-buffer-size 指定,代表缓存的授权日志消息数。也就是说,我们把缓存的记录平均分配给所有的worker。

当recordsBuffer存满或者达到投递最大时间后,调用 r.Store.AppendToSetPipelined(analyticsKeyName, recordsBuffer) 将记录批量发送给Redis,为了提高传输速率,这里将日志内容编码为msgpack格式后再传输。

上面的缓存方法可以抽象成一个缓存模型,满足实际开发中的大部分需要异步转存的场景,如下图所示:

Producer将数据投递到带缓冲的channel中,后端有多个worker消费channel中的数据,并进行批量投递。你可以设置批量投递的条件,一般至少包含最大投递日志数最大投递时间间隔这两个。

 

数据一致性

 数据一致性架构如下图所示:

密钥和策略同步流程如下:

  1. 通过iam-webconsole请求iam-apiserver创建(或更新、删除)密钥(或策略)。
  2. iam-apiserver收到“写”请求后,会向Redis iam.cluster.notifications channel发送PolicyChanged或SecretChanged消息。
  3. Loader收到消息后,会触发cache loader实例执行 Reload 方法,重新从iam-apiserver中同步密钥和策略信息。 

在cache实例的 Reload 方法中,我们其实是调用仓库层Secret和Policy的List方法来获取密钥和策略列表。仓库层又是通过执行gRPC请求,从iam-apiserver中获取密钥和策略列表。

cache的Reload方法,会将获取到的密钥和策略列表缓存在ristretto类型的Cache中,供业务层调用。业务层代码位于internal/authzserver/authorization目录下。

总结

这一讲中,我介绍了IAM数据流服务iam-authz-server的设计和实现。iam-authz-server提供了 /v1/authz RESTful API接口,供第三方用户完成资源授权功能,具体是使用Ladon包来完成资源授权的。Ladon包解决了“在特定的条件下,谁能够/不能够对哪些资源做哪些操作”的问题。

iam-authz-server的配置处理、启动流程和请求处理流程跟iam-apiserver保持一致。此外,iam-authz-server也实现了简洁架构。

iam-authz-server通过缓存密钥和策略信息、缓存授权日志来提高 /v1/authz 接口的性能。

在缓存密钥和策略信息时,为了和iam-apiserver中的密钥和策略信息保持一致,使用了Redis Pub/Sub机制。当iam-apiserver有密钥/策略变更时,会往指定的Redis channel Pub一条消息。iam-authz-server订阅相同的channel,在收到新消息时,会解析消息,并重新从iam-apiserver中获取密钥和策略信息,缓存在内存中。

iam-authz-server执行完资源授权之后,会将授权日志存放在一个带缓冲的channel中。后端有多个worker消费channel中的数据,并进行批量投递。可以设置批量投递的条件,例如最大投递日志数和最大投递时间间隔。

课后练习

  1. iam-authz-server和iam-apiserver共用了应用框架(包括一些配置项)和HTTP服务框架层的代码,请阅读iam-authz-server代码,看下IAM项目是如何实现代码复用的。
  2. iam-authz-server使用了ristretto来缓存密钥和策略信息,请调研下业界还有哪些优秀的缓存包可供使用,欢迎在留言区分享。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/516810.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ES6展开运算符

1.展开可迭代对象&#xff08;简单理解为数组和伪数组&#xff09;&#xff0c;如数组、 NodeList 、arguments。 可以通过展开运算符把一个伪数组转换为数组 const a [...document.body.children]; console.log(a); console.log(Array.isArray(a));2.实现数组的浅拷贝 cons…

51单片机入门之独立按键

目录 1.按键简介 2.独立按键控制LED亮灭 3.独立按键控制LED移位 1.按键简介 在生活中&#xff0c;我们常常会见到各种按键&#xff0c;我们的开发板上也有按键&#xff0c;就在左下角有四个按键&#xff0c;我们把它们叫做独立按键。 独立按键的原理比较简单&…

【三十三】【算法分析与设计】回溯(1),46. 全排列,78. 子集,没有树结构,但是依旧模拟树结构,回溯,利用全局变量+递归函数模拟树结构

46. 全排列 给定一个不含重复数字的数组 nums &#xff0c;返回其 所有可能的全排列 。你可以 按任意顺序 返回答案。 示例 1&#xff1a; 输入&#xff1a;nums [1&#xff0c;2&#xff0c;3] 输出&#xff1a;[[1&#xff0c;2&#xff0c;3]&#xff0c;[1&#xff0c;3&a…

WPF中通过自定义Panel实现控件拖动

背景 看到趋时软件的公众号文章&#xff08;WPF自定义Panel&#xff1a;让拖拽变得更简单&#xff09;&#xff0c;发现可以不通过Drag的方法来实现ListBox控件的拖动&#xff0c;而是通过对控件的坐标相加减去实现控件的位移等判断&#xff0c;因此根据文章里面的代码,边理解边…

考题抄错会做也白搭--模版方法模式

1.1 选择题不会做&#xff0c;蒙呗&#xff01; "题目抄错了&#xff0c;那就不是考试题目了&#xff0c;而考试试卷最大的好处就是&#xff0c;大家都是一样的题目&#xff0c;特别是标准化的考试&#xff0c;比如全是选择或判断的题目&#xff0c;那就最大化地限制了答题…

整合Mybatis(Spring学习笔记十二)

一、导入相关的包 junit 包 Mybatis包 mysql数据库包 Spring相关的包 Aop相关的包 Mybatis-Spring包&#xff08;现在就来学这个&#xff09; 提示jdk版本不一致的朋友记得 jdk8只支持spring到5.x 所以如果导入的spring(spring-we…

Linux:进程终止和等待

一、进程终止 main函数的返回值也叫做进程的退出码&#xff0c;一般0表示成功&#xff0c;非零表示失败。我们也可以用不同的数字来表示不同失败的原因。 echo $?//打印最近一次进程执行的退出码 而作为程序猿&#xff0c;我们更需要知道的是错误码所代表的错误信息&#x…

【智能算法】磷虾群算法(KHA)原理及实现

目录 1.背景2.算法原理2.1算法思想2.2算法过程 3.结果展示4.参考文献 1.背景 2012年&#xff0c;Gandomi等人受到自然界中磷虾生存行为启发&#xff0c;提出了磷虾群算法&#xff08;Krill Herd Algorithm, KHA&#xff09;。 2.算法原理 2.1算法思想 KHA受南极鳞虾群觅食行…

Java | Leetcode Java题解之第8题字符串转换整数atoi

题目&#xff1a; 题解&#xff1a; class Solution {public int myAtoi(String str) {Automaton automaton new Automaton();int length str.length();for (int i 0; i < length; i) {automaton.get(str.charAt(i));}return (int) (automaton.sign * automaton.ans);} …

Matlab|储能辅助电力系统调峰的容量需求研究

目录 1 主要内容 目标函数 约束条件 2 部分代码 3 程序结果 4 下载链接 1 主要内容 该程序参考文献《储能辅助电力系统调峰的容量需求研究》&#xff0c;主要是对火电、风电和储能等电力设备主体进行优化调度&#xff0c;在调峰能力达不到时采用弃负荷&#xff0c;程序以…

无人售货奶柜:开启便捷生活的新篇章

无人售货奶柜&#xff1a;开启便捷生活的新篇章 在这个快节奏的现代生活中&#xff0c;科技的革新不仅为我们带来了前所未有的便利&#xff0c;更在不经意间改变着我们的日常。其中&#xff0c;无人售货技术的出现&#xff0c;尤其是无人售货奶柜&#xff0c;已经成为我们生活…

012:vue3使用vue-i18n实现国际化

文章目录 1. 安装 vue-i18n2. 创建文件存储翻译的语言3. 注册i18n实例4. 在main.ts中引入vue-i18n实例5. 在组件模板中使用6. 在js中使用7. locale.value 实现国际化语言切换8. vue3 中ref里面的国际化值没生效问题 1. 安装 vue-i18n cnpm i --save vue-i18n2. 创建文件存储翻…

树状数组-数据结构

树状数组 t[x] 节点的父节点为 t[x lowbit(x)] 整棵树的深度为 log2n 1 1 . add(x,k) 给指定的节点x加上k — 动态的维护前缀和 需要从x开始&#xff0c;向上找到所有父节点&#xff0c;值都加上k 2. ask(x) 求取节点x之前的前缀和 求取单点之前的前缀和只需要累加即可 …

【算法】单单单单单调栈,接接接接接雨水

【算法】单单单单单调栈&#xff0c;接接接接接雨水 今天没有小故事。 参考以及题单来源&#xff1a; 代码随想录 (programmercarl.com) Part 1 啥是单调栈&#xff1f; 1.啥啥啥是单调栈&#xff1f; 栈的特性想必各位再熟悉不过了&#xff1a;先进后出。栈仅仅有一个出口&a…

算法 day29 回溯5

491 非递减子序列 给你一个整数数组 nums &#xff0c;找出并返回所有该数组中不同的递增子序列&#xff0c;递增子序列中 至少有两个元素 。你可以按 任意顺序 返回答案。 数组中可能含有重复元素&#xff0c;如出现两个整数相等&#xff0c;也可以视作递增序列的一种特殊情…

检测头篇 | 利用RT-DETR模型的检测头去替换YOLOv8中的检测头

前言:Hello大家好,我是小哥谈。RT-DETR号称是打败YOLO的检测模型,其作为一种基于Transformer的检测方法,相较于传统的基于卷积的检测方法,提供了更为全面和深入的特征理解,将RT-DETR检测头融入YOLOv8,我们可以结合YOLO的实时检测能力和RT-DETR的深度特征理解能力,打造出…

业务网关的设计与实践

在过去的两年里&#xff0c;主要在做业务网关的开发。今年春节后选择转岗去做更偏近业务的开发。公司的业务是金融相关&#xff0c;一直觉得金融相关的业务是有一定门槛并且是对职业生涯有帮助的&#xff0c;所以趁这个机会来深入了解这块业务。 仔细回想&#xff0c;在做业务…

【数据处理包Pandas】数据载入与预处理

目录 一、数据载入二、数据清洗&#xff08;一&#xff09;Pandas中缺失值的表示&#xff08;二&#xff09;与缺失值判断和处理相关的方法 三、连续特征离散化四、哑变量处理 准备工作 导入 NumPy 库和 Pandas 库。 import numpy as np import pandas as pd一、数据载入 对…

开机自启动

对win10,给一种开机自启动的设置方法: 1. winr 打开 2. 输入shell:startup打开 开始\程序\启动 3. 把想要自启动的应用的快捷方式放在这里即可 亲测有用

第十一届蓝桥杯物联网试题(省赛)

对于通信方面&#xff0c;还是终端A、B都保持接收状态&#xff0c;当要发送的数组不为空再发送数据&#xff0c;发送完后立即清除&#xff0c;接收数据的数组不为空则处理&#xff0c;处理完后立即清除&#xff0c;分工明确 继电器不亮一般可能是电压不够 将数据加空格再加\r…