Istio Pilot源码学习(二):ServiceController服务发现

本文基于Istio 1.18.0版本进行源码学习

4、服务发现:ServiceController

ServiceController是服务发现的核心模块,主要功能是监听底层平台的服务注册中心,将平台服务模型转换成Istio服务模型并缓存;同时根据服务的变化,触发相关服务的事件处理回调函数的执行

1)、ServiceController的核心接口

ServiceController可以同时支持多个服务注册中心,因为它包含不同的注册中心控制器,它们的聚合是通过抽象聚合接口(aggregate.Controller)完成的,该接口相关定义如下:

// pilot/pkg/serviceregistry/aggregate/controller.go
// 聚合所有底层注册中心的数据,并监控数据的变化
type Controller struct {
	// meshConfig
	meshHolder mesh.Holder

	// The lock is used to protect the registries and controller's running status.
	storeLock sync.RWMutex
	// 注册中心的集合
	registries []*registryEntry
	// indicates whether the controller has run.
	// if true, all the registries added later should be run manually.
	running bool

	// 控制器回调函数的集合,当添加了某一注册中心时,控制器会向其注册回调函数
	handlers model.ControllerHandlers
	// 按照集群区分的回调函数
	handlersByCluster map[cluster.ID]*model.ControllerHandlers
	model.NetworkGatewaysHandler
}

type registryEntry struct {
	serviceregistry.Instance
	// stop if not nil is the per-registry stop chan. If null, the server stop chan should be used to Run the registry.
	stop <-chan struct{}
}
// pilot/pkg/serviceregistry/instance.go
// 注册中心接口
type Instance interface {
	// 控制器接口
	model.Controller
	// 服务发现接口
	model.ServiceDiscovery

	// Provider backing this service registry (i.e. Kubernetes etc.)
	Provider() provider.ID

	// Cluster for which the service registry applies. Only needed for multicluster systems.
	Cluster() cluster.ID
}

注册中心接口Instance实现了Istio通用的控制器接口Controller及服务发现接口ServiceDiscovery,接口定义如下:

// pilot/pkg/model/controller.go
// 控制器接口,用于注册事件处理回调函数
// 注册中心控制器会接收资源更新事件,并执行相应的事件处理回调函数
type Controller interface {
	// Note: AppendXXXHandler is used to register high level handlers.
	// For per cluster handlers, they should be registered by the `AppendXXXHandlerForCluster` interface.

	// AppendServiceHandler notifies about changes to the service catalog.
	// 注册服务的事件处理回调函数
	AppendServiceHandler(f ServiceHandler)

	// AppendWorkloadHandler notifies about changes to workloads. This differs from InstanceHandler,
	// which deals with service instances (the result of a merge of Service and Workload)
	// 注册服务实例的事件处理回调函数,主要是为了支持kubernetes service和istio serviceEntry交叉选择服务实例
	AppendWorkloadHandler(f func(*WorkloadInstance, Event))

	// Run until a signal is received
	// 运行控制器
	Run(stop <-chan struct{})

	// HasSynced returns true after initial cache synchronization is complete
	// 同步检查控制器的缓存
	HasSynced() bool
}
// pilot/pkg/model/service.go
// 服务发现接口提供对服务模型的查询功能
type ServiceDiscovery interface {
	NetworkGatewaysWatcher

	// Services list declarations of all services in the system
	// 查询网格中的所有服务
	Services() []*Service

	// GetService retrieves a service by host name if it exists
	// 根据hostname查询服务
	GetService(hostname host.Name) *Service

	// InstancesByPort retrieves instances for a service on the given ports with labels that match
	// any of the supplied labels. All instances match an empty tag list.
	//
	// For example, consider an example of catalog.mystore.com:
	// Instances(catalog.myservice.com, 80) ->
	//      --> IstioEndpoint(172.16.0.1:8888), Service(catalog.myservice.com), Labels(foo=bar)
	//      --> IstioEndpoint(172.16.0.2:8888), Service(catalog.myservice.com), Labels(foo=bar)
	//      --> IstioEndpoint(172.16.0.3:8888), Service(catalog.myservice.com), Labels(kitty=cat)
	//      --> IstioEndpoint(172.16.0.4:8888), Service(catalog.myservice.com), Labels(kitty=cat)
	//
	// Calling Instances with specific labels returns a trimmed list.
	// e.g., Instances(catalog.myservice.com, 80, foo=bar) ->
	//      --> IstioEndpoint(172.16.0.1:8888), Service(catalog.myservice.com), Labels(foo=bar)
	//      --> IstioEndpoint(172.16.0.2:8888), Service(catalog.myservice.com), Labels(foo=bar)
	//
	// Similar concepts apply for calling this function with a specific
	// port, hostname and labels.
	//
	// Introduced in Istio 0.8. It is only called with 1 port.
	// CDS (clusters.go) calls it for building 'dnslb' type clusters.
	// EDS calls it for building the endpoints result.
	// Consult istio-dev before using this for anything else (except debugging/tools)
	// 根据服务及端口获取服务实例
	InstancesByPort(svc *Service, servicePort int) []*ServiceInstance

	// GetProxyServiceInstances returns the service instances that co-located with a given Proxy
	//
	// Co-located generally means running in the same network namespace and security context.
	//
	// A Proxy operating as a Sidecar will return a non-empty slice.  A stand-alone Proxy
	// will return an empty slice.
	//
	// There are two reasons why this returns multiple ServiceInstances instead of one:
	// - A ServiceInstance has a single IstioEndpoint which has a single Port.  But a Service
	//   may have many ports.  So a workload implementing such a Service would need
	//   multiple ServiceInstances, one for each port.
	// - A single workload may implement multiple logical Services.
	//
	// In the second case, multiple services may be implemented by the same physical port number,
	// though with a different ServicePort and IstioEndpoint for each.  If any of these overlapping
	// services are not HTTP or H2-based, behavior is undefined, since the listener may not be able to
	// determine the intended destination of a connection without a Host header on the request.
	// 获取与sidecar代理相关的服务实例
	GetProxyServiceInstances(*Proxy) []*ServiceInstance
	// 获取proxy工作负载的标签
	GetProxyWorkloadLabels(*Proxy) labels.Instance

	// MCSServices returns information about the services that have been exported/imported via the
	// Kubernetes Multi-Cluster Services (MCS) ServiceExport API. Only applies to services in
	// Kubernetes clusters.
	MCSServices() []MCSServiceInfo
	AmbientIndexes
}

2)、ServiceController的初始化

Kubernetes ServiceController初始化流程如下:

在这里插入图片描述

核心方法是pilot/pkg/serviceregistry/kube/controller/controller.go中的NewController()方法,代码如下:

// pilot/pkg/serviceregistry/kube/controller/controller.go
func NewController(kubeClient kubelib.Client, options Options) *Controller {
	// 实例化kubernetes注册中心的控制器
	c := &Controller{
		opts:                       options,
		client:                     kubeClient,
		queue:                      queue.NewQueueWithID(1*time.Second, string(options.ClusterID)),
		servicesMap:                make(map[host.Name]*model.Service),
		nodeSelectorsForServices:   make(map[host.Name]labels.Instance),
		nodeInfoMap:                make(map[string]kubernetesNode),
		externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance),
		workloadInstancesIndex:     workloadinstances.NewIndex(),
		initialSyncTimedout:        atomic.NewBool(false),
		networkManager:             initNetworkManager(options),
		configCluster:              options.ConfigCluster,
	}

	c.namespaces = kclient.New[*v1.Namespace](kubeClient)
	if c.opts.SystemNamespace != "" {
		registerHandlers[*v1.Namespace](
			c,
			c.namespaces,
			"Namespaces",
			func(old *v1.Namespace, cur *v1.Namespace, event model.Event) error {
				if cur.Name == c.opts.SystemNamespace {
					return c.onSystemNamespaceEvent(old, cur, event)
				}
				return nil
			},
			nil,
		)
	}

	if c.opts.DiscoveryNamespacesFilter == nil {
		c.opts.DiscoveryNamespacesFilter = namespace.NewDiscoveryNamespacesFilter(c.namespaces, options.MeshWatcher.Mesh().DiscoverySelectors)
	}

	c.initDiscoveryHandlers(options.MeshWatcher, c.opts.DiscoveryNamespacesFilter)

	c.services = kclient.NewFiltered[*v1.Service](kubeClient, kclient.Filter{ObjectFilter: c.opts.DiscoveryNamespacesFilter.Filter})

	// 注册service对应的事件处理回调函数
	registerHandlers[*v1.Service](c, c.services, "Services", c.onServiceEvent, nil)

	switch options.EndpointMode {
	case EndpointSliceOnly:
		c.endpoints = newEndpointSliceController(c)
	default: // nolint: gocritic
		log.Errorf("unknown endpoints mode: %v", options.EndpointMode)
		fallthrough
	case EndpointsOnly:
		// 实例化endpointsController,注册endpoints对应的事件处理回调函数
		c.endpoints = newEndpointsController(c)
	}

	// This is for getting the node IPs of a selected set of nodes
	c.nodes = kclient.NewFiltered[*v1.Node](kubeClient, kclient.Filter{ObjectTransform: kubelib.StripNodeUnusedFields})
	// 注册node对应的事件处理回调函数
	registerHandlers[*v1.Node](c, c.nodes, "Nodes", c.onNodeEvent, nil)

	c.podsClient = kclient.NewFiltered[*v1.Pod](kubeClient, kclient.Filter{
		ObjectFilter:    c.opts.DiscoveryNamespacesFilter.Filter,
		ObjectTransform: kubelib.StripPodUnusedFields,
	})
	c.pods = newPodCache(c, c.podsClient, func(key types.NamespacedName) {
		c.queue.Push(func() error {
			return c.endpoints.sync(key.Name, key.Namespace, model.EventAdd, true)
		})
	})
	// 注册pod对应的事件处理回调函数
	registerHandlers[*v1.Pod](c, c.podsClient, "Pods", c.pods.onEvent, c.pods.labelFilter)

	if features.EnableMCSServiceDiscovery || features.EnableMCSHost {
		c.crdWatcher = crdwatcher.NewController(kubeClient)
	}
	if features.EnableAmbientControllers {
		c.configController = options.ConfigController
		c.ambientIndex = c.setupIndex()
	}
	c.exports = newServiceExportCache(c)
	c.imports = newServiceImportCache(c)

	c.meshWatcher = options.MeshWatcher
	if c.opts.MeshNetworksWatcher != nil {
		c.opts.MeshNetworksWatcher.AddNetworksHandler(func() {
			c.reloadMeshNetworks()
			c.onNetworkChange()
		})
		c.reloadMeshNetworks()
	}

	return c
}

NewController()方法中实例化了Kubernetes注册中心的控制器,Kubernetes注册中心的控制器定义如下:

// pilot/pkg/serviceregistry/kube/controller/controller.go
type Controller struct {
	opts Options

	client kubelib.Client

	// 控制器的任务队列
	queue queue.Instance

	namespaces kclient.Client[*v1.Namespace]
	services   kclient.Client[*v1.Service]

	// kubernetes的endpoints控制器抽象接口,支持endpoint和endpointSlice
	endpoints kubeEndpointsController

	// Used to watch node accessible from remote cluster.
	// In multi-cluster(shared control plane multi-networks) scenario, ingress gateway service can be of nodePort type.
	// With this, we can populate mesh's gateway address with the node ips.
	nodes kclient.Client[*v1.Node]

	crdWatcher *crdwatcher.Controller
	// 多集群服务serviceExport的资源处理接口
	exports serviceExportCache
	// 多集群服务serviceImport的资源处理接口
	imports serviceImportCache
	// 包含kclient.Client[*v1.Pod]
	pods *PodCache

	crdHandlers []func(name string)
	// service及pod实例的事件处理函数
	handlers                   model.ControllerHandlers
	namespaceDiscoveryHandlers []func(ns string, event model.Event)

	// This is only used for test
	stop chan struct{}

	sync.RWMutex
	// servicesMap stores hostname ==> service, it is used to reduce convertService calls.
	// istio服务模型的缓存
	servicesMap map[host.Name]*model.Service
	// nodeSelectorsForServices stores hostname => label selectors that can be used to
	// refine the set of node port IPs for a service.
	nodeSelectorsForServices map[host.Name]labels.Instance
	// map of node name and its address+labels - this is the only thing we need from nodes
	// for vm to k8s or cross cluster. When node port services select specific nodes by labels,
	// we run through the label selectors here to pick only ones that we need.
	// Only nodes with ExternalIP addresses are included in this map !
	// node的缓存
	nodeInfoMap map[string]kubernetesNode
	// externalNameSvcInstanceMap stores hostname ==> instance, is used to store instances for ExternalName k8s services
	// externalName类型的服务实例缓存
	externalNameSvcInstanceMap map[host.Name][]*model.ServiceInstance
	// index over workload instances from workload entries
	// 工作负载实例的索引
	workloadInstancesIndex workloadinstances.Index

	networkManager

	// initialSyncTimedout is set to true after performing an initial processing timed out.
	initialSyncTimedout *atomic.Bool
	meshWatcher         mesh.Watcher

	podsClient kclient.Client[*v1.Pod]

	ambientIndex     *AmbientIndex
	configController model.ConfigStoreController
	configCluster    bool
}

Controller中services、nodes、podsClient属性都是Client[T controllers.Object]类型的,Client[T controllers.Object]封装了对应的资源操作客户端,定义如下:

// pkg/kube/kclient/interfaces.go
// Client wraps a Kubernetes client providing cached read access and direct write access.
type Client[T controllers.Object] interface {
	Reader[T]
	Writer[T]
	Informer[T]
}

Kubernetes控制器关键属性的初始化方式如下图:

Kubernetes控制器的核心就是监听Kubernetes相关资源(Service、Endpoint、EndpointSlice、Pod、Node)的更新事件,执行相应的事件处理回调函数;并且进行从Kubernetes资源对象到Istio资源对象的转换,提供一定的缓存能力,主要是缓存Istio Service与WorkloadInstance

3)、ServiceController的工作机制

ServiceController为4种资源分别创建了Informer,用于监听Kubernetes资源的更新,并为其注册EventHandler

NewController()方法中调用registerHandlers()方法为4种资源注册EventHandler,registerHandlers()方法代码如下:

// pilot/pkg/serviceregistry/kube/controller/controller.go
func registerHandlers[T controllers.ComparableObject](c *Controller,
	informer kclient.Informer[T], otype string,
	handler func(T, T, model.Event) error, filter FilterOutFunc[T],
) {
	// 包装传入的handler方法
	wrappedHandler := func(prev, curr T, event model.Event) error {
		curr = informer.Get(curr.GetName(), curr.GetNamespace())
		if controllers.IsNil(curr) {
			// this can happen when an immediate delete after update
			// the delete event can be handled later
			return nil
		}
		return handler(prev, curr, event)
	}
	informer.AddEventHandler(
		controllers.EventHandler[T]{
			AddFunc: func(obj T) {
				incrementEvent(otype, "add")
				// 创建资源处理任务并将其推送到任务队列
				c.queue.Push(func() error {
					return wrappedHandler(ptr.Empty[T](), obj, model.EventAdd)
				})
			},
			UpdateFunc: func(old, cur T) {
				if filter != nil {
					if filter(old, cur) {
						incrementEvent(otype, "updatesame")
						return
					}
				}

				incrementEvent(otype, "update")
				c.queue.Push(func() error {
					return wrappedHandler(old, cur, model.EventUpdate)
				})
			},
			DeleteFunc: func(obj T) {
				incrementEvent(otype, "delete")
				c.queue.Push(func() error {
					return handler(ptr.Empty[T](), obj, model.EventDelete)
				})
			},
		})
}

当监听到Service、Endpoint、Pod、Node资源更新时,EventHandler会创建资源处理任务并将其推送到任务队列,然后由任务处理协程阻塞式地接收任务对象,最终调用任务处理函数完成对资源对象的事件处理

1)Service事件处理

// pilot/pkg/serviceregistry/kube/controller/controller.go
func (c *Controller) onServiceEvent(_, curr *v1.Service, event model.Event) error {
	log.Debugf("Handle event %s for service %s in namespace %s", event, curr.Name, curr.Namespace)

	// Create the standard (cluster.local) service.
	// 将kubernetes service转换成istio service
	svcConv := kube.ConvertService(*curr, c.opts.DomainSuffix, c.Cluster())
	switch event {
	case model.EventDelete:
		// 删除service
		c.deleteService(svcConv)
	default:
		// 创建或更新service
		c.addOrUpdateService(curr, svcConv, event, false)
	}

	return nil
}

func (c *Controller) addOrUpdateService(curr *v1.Service, currConv *model.Service, event model.Event, updateEDSCache bool) {
	needsFullPush := false
	// First, process nodePort gateway service, whose externalIPs specified
	// and loadbalancer gateway service
	if !currConv.Attributes.ClusterExternalAddresses.IsEmpty() {
		needsFullPush = c.extractGatewaysFromService(currConv)
	} else if isNodePortGatewayService(curr) {
		// We need to know which services are using node selectors because during node events,
		// we have to update all the node port services accordingly.
		nodeSelector := getNodeSelectorsForService(curr)
		c.Lock()
		// only add when it is nodePort gateway service
		c.nodeSelectorsForServices[currConv.Hostname] = nodeSelector
		c.Unlock()
		needsFullPush = c.updateServiceNodePortAddresses(currConv)
	}

	var prevConv *model.Service
	// instance conversion is only required when service is added/updated.
	instances := kube.ExternalNameServiceInstances(curr, currConv)
	c.Lock()
	prevConv = c.servicesMap[currConv.Hostname]
	c.servicesMap[currConv.Hostname] = currConv
	if len(instances) > 0 {
		c.externalNameSvcInstanceMap[currConv.Hostname] = instances
	}
	c.Unlock()

	// This full push needed to update ALL ends endpoints, even though we do a full push on service add/update
	// as that full push is only triggered for the specific service.
	if needsFullPush {
		// networks are different, we need to update all eds endpoints
		c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true, Reason: []model.TriggerReason{model.NetworksTrigger}})
	}

	shard := model.ShardKeyFromRegistry(c)
	ns := currConv.Attributes.Namespace
	// We also need to update when the Service changes. For Kubernetes, a service change will result in Endpoint updates,
	// but workload entries will also need to be updated.
	// TODO(nmittler): Build different sets of endpoints for cluster.local and clusterset.local.
	if updateEDSCache || features.EnableK8SServiceSelectWorkloadEntries {
		endpoints := c.buildEndpointsForService(currConv, updateEDSCache)
		if len(endpoints) > 0 {
			c.opts.XDSUpdater.EDSCacheUpdate(shard, string(currConv.Hostname), ns, endpoints)
		}
	}

	// 更新服务缓存
	c.opts.XDSUpdater.SvcUpdate(shard, string(currConv.Hostname), ns, event)
	// 触发service事件处理函数
	c.handlers.NotifyServiceHandlers(prevConv, currConv, event)
}

Service事件处理器会将根据事件的类型更新服务缓存,然后调用serviceHandlers的事件处理器进行回调。serviceHandlers是通过ServiceController的AppendServiceHandler()注册的,注册代码如下:

// pilot/pkg/bootstrap/server.go
func (s *Server) initRegistryEventHandlers() {
	log.Info("initializing registry event handlers")
	// Flush cached discovery responses whenever services configuration change.
	serviceHandler := func(prev, curr *model.Service, event model.Event) {
		needsPush := true
		if event == model.EventUpdate {
			needsPush = serviceUpdateNeedsPush(prev, curr)
		}
		if needsPush {
			// 触发xds全量更新
			pushReq := &model.PushRequest{
				Full:           true,
				ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: string(curr.Hostname), Namespace: curr.Attributes.Namespace}),
				Reason:         []model.TriggerReason{model.ServiceUpdate},
			}
			s.XDSServer.ConfigUpdate(pushReq)
		}
	}
	// 注册service的事件处理函数
	s.ServiceController().AppendServiceHandler(serviceHandler)
  ...

2)Endpoint事件处理

Endpoint事件处理器在NewController()中调用newEndpointsController()创建endpointsController的时候注册:

// pilot/pkg/serviceregistry/kube/controller/endpoints.go
func newEndpointsController(c *Controller) *endpointsController {
	endpoints := kclient.NewFiltered[*v1.Endpoints](c.client, kclient.Filter{ObjectFilter: c.opts.GetFilter()})
	out := &endpointsController{
		endpoints: endpoints,
		c:         c,
	}
	// 注册endpoint对应的事件处理回调函数
	registerHandlers[*v1.Endpoints](c, endpoints, "Endpoints", out.onEvent, endpointsEqual)
	return out
}

func (e *endpointsController) onEvent(_, ep *v1.Endpoints, event model.Event) error {
	return processEndpointEvent(e.c, e, ep.Name, ep.Namespace, event, ep)
}

Endpoint事件处理函数是processEndpointEvent(),实现如下:

// pilot/pkg/serviceregistry/kube/controller/endpointcontroller.go
func processEndpointEvent(c *Controller, epc kubeEndpointsController, name string, namespace string, event model.Event, ep any) error {
	// Update internal endpoint cache no matter what kind of service, even headless service.
	// As for gateways, the cluster discovery type is `EDS` for headless service.
	// 更新eds
	updateEDS(c, epc, ep, event)
	if svc := c.services.Get(name, namespace); svc != nil {
		// if the service is headless service, trigger a full push if EnableHeadlessService is true,
		// otherwise push endpoint updates - needed for NDS output.
		// 如果是headlessService,触发xds全量更新
		if svc.Spec.ClusterIP == v1.ClusterIPNone {
			for _, modelSvc := range c.servicesForNamespacedName(config.NamespacedName(svc)) {
				c.opts.XDSUpdater.ConfigUpdate(&model.PushRequest{
					Full: features.EnableHeadlessService,
					// TODO: extend and set service instance type, so no need to re-init push context
					ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: modelSvc.Hostname.String(), Namespace: svc.Namespace}),

					Reason: []model.TriggerReason{model.HeadlessEndpointUpdate},
				})
				return nil
			}
		}
	}

	return nil
}

func updateEDS(c *Controller, epc kubeEndpointsController, ep any, event model.Event) {
	namespacedName := epc.getServiceNamespacedName(ep)
	log.Debugf("Handle EDS endpoint %s %s in namespace %s", namespacedName.Name, event, namespacedName.Namespace)
	var forgottenEndpointsByHost map[host.Name][]*model.IstioEndpoint
	if event == model.EventDelete {
		forgottenEndpointsByHost = epc.forgetEndpoint(ep)
	}

	shard := model.ShardKeyFromRegistry(c)

	for _, hostName := range c.hostNamesForNamespacedName(namespacedName) {
		var endpoints []*model.IstioEndpoint
		if forgottenEndpointsByHost != nil {
			endpoints = forgottenEndpointsByHost[hostName]
		} else {
			// 将endpoint转换成istio endpoint
			endpoints = epc.buildIstioEndpoints(ep, hostName)
		}

		if features.EnableK8SServiceSelectWorkloadEntries {
			svc := c.GetService(hostName)
			if svc != nil {
				fep := c.collectWorkloadInstanceEndpoints(svc)
				endpoints = append(endpoints, fep...)
			} else {
				log.Debugf("Handle EDS endpoint: skip collecting workload entry endpoints, service %s/%s has not been populated",
					namespacedName.Namespace, namespacedName.Name)
			}
		}

		// 调用EDSUpdate
		c.opts.XDSUpdater.EDSUpdate(shard, string(hostName), namespacedName.Namespace, endpoints)
	}
}

最后调用XDSUpdater.EDSUpdate()进行EDS的缓存更新及触发xDS更新,代码如下:

// pilot/pkg/xds/eds.go
func (s *DiscoveryServer) EDSUpdate(shard model.ShardKey, serviceName string, namespace string,
	istioEndpoints []*model.IstioEndpoint,
) {
	inboundEDSUpdates.Increment()
	// Update the endpoint shards
	// 更新eds缓存
	pushType := s.edsCacheUpdate(shard, serviceName, namespace, istioEndpoints)
	// 触发xds更新
	if pushType == IncrementalPush || pushType == FullPush {
		// Trigger a push
		s.ConfigUpdate(&model.PushRequest{
			Full:           pushType == FullPush,
			ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: serviceName, Namespace: namespace}),
			Reason:         []model.TriggerReason{model.EndpointUpdate},
		})
	}
}

func (s *DiscoveryServer) edsCacheUpdate(shard model.ShardKey, hostname string, namespace string,
	istioEndpoints []*model.IstioEndpoint,
) PushType {
	if len(istioEndpoints) == 0 {
		// Should delete the service EndpointShards when endpoints become zero to prevent memory leak,
		// but we should not delete the keys from EndpointIndex map - that will trigger
		// unnecessary full push which can become a real problem if a pod is in crashloop and thus endpoints
		// flip flopping between 1 and 0.
		// 在endpoint变为0时,应该删除服务的endpointIndex,但是不能删除endpointIndex map中的键值,
		// 因为假如这时pod状态在crash loop和ready之间跳变,就会引起不必要、频繁的xds全量更新
		s.Env.EndpointIndex.DeleteServiceShard(shard, hostname, namespace, true)
		log.Infof("Incremental push, service %s at shard %v has no endpoints", hostname, shard)
		return IncrementalPush
	}

	pushType := IncrementalPush
	// Find endpoint shard for this service, if it is available - otherwise create a new one.
	// 找到服务的endpointShard,如果不存在,则创建一个新的
	ep, created := s.Env.EndpointIndex.GetOrCreateEndpointShard(hostname, namespace)
	// If we create a new endpoint shard, that means we have not seen the service earlier. We should do a full push.
	// 如果创建了endpointShard,则需要触发xds全量更新
	if created {
		log.Infof("Full push, new service %s/%s", namespace, hostname)
		pushType = FullPush
	}

	ep.Lock()
	defer ep.Unlock()
	newIstioEndpoints := istioEndpoints
	// 支持发送unhealthy endpoints
	if features.SendUnhealthyEndpoints.Load() {
		oldIstioEndpoints := ep.Shards[shard]
		newIstioEndpoints = make([]*model.IstioEndpoint, 0, len(istioEndpoints))

		// Check if new Endpoints are ready to be pushed. This check
		// will ensure that if a new pod comes with a non ready endpoint,
		// we do not unnecessarily push that config to Envoy.
		// Please note that address is not a unique key. So this may not accurately
		// identify based on health status and push too many times - which is ok since its an optimization.
		emap := make(map[string]*model.IstioEndpoint, len(oldIstioEndpoints))
		nmap := make(map[string]*model.IstioEndpoint, len(newIstioEndpoints))
		// Add new endpoints only if they are ever ready once to shards
		// so that full push does not send them from shards.
		for _, oie := range oldIstioEndpoints {
			emap[oie.Address] = oie
		}
		for _, nie := range istioEndpoints {
			nmap[nie.Address] = nie
		}
		needPush := false
		for _, nie := range istioEndpoints {
			if oie, exists := emap[nie.Address]; exists {
				// If endpoint exists already, we should push if it's health status changes.
				// 如果endpoint存在,判断其健康状态是否发生了变化,仅在发生变化时才需要进行xds推送
				if oie.HealthStatus != nie.HealthStatus {
					needPush = true
				}
				newIstioEndpoints = append(newIstioEndpoints, nie)
			} else if nie.HealthStatus == model.Healthy {
				// If the endpoint does not exist in shards that means it is a
				// new endpoint. Only send if it is healthy to avoid pushing endpoints
				// that are not ready to start with.
				// 如果endpoint原来不存在,仅当其健康时进行xds推送
				needPush = true
				newIstioEndpoints = append(newIstioEndpoints, nie)
			}
		}
		// Next, check for endpoints that were in old but no longer exist. If there are any, there is a
		// removal so we need to push an update.
		// 如果检查到endpoint原来存在,但是现在被删除了,则这时也需要进行xds推送
		for _, oie := range oldIstioEndpoints {
			if _, f := nmap[oie.Address]; !f {
				needPush = true
			}
		}

		if pushType != FullPush && !needPush {
			log.Debugf("No push, either old endpoint health status did not change or new endpoint came with unhealthy status, %v", hostname)
			pushType = NoPush
		}

	}

	ep.Shards[shard] = newIstioEndpoints

	// Check if ServiceAccounts have changed. We should do a full push if they have changed.
	// 检查serviceAccount的变化
	saUpdated := s.UpdateServiceAccount(ep, hostname)

	// For existing endpoints, we need to do full push if service accounts change.
	if saUpdated && pushType != FullPush {
		// Avoid extra logging if already a full push
		log.Infof("Full push, service accounts changed, %v", hostname)
		pushType = FullPush
	}

	// Clear the cache here. While it would likely be cleared later when we trigger a push, a race
	// condition is introduced where an XDS response may be generated before the update, but not
	// completed until after a response after the update. Essentially, we transition from v0 -> v1 ->
	// v0 -> invalidate -> v1. Reverting a change we pushed violates our contract of monotonically
	// moving forward in version. In practice, this is pretty rare and self corrects nearly
	// immediately. However, clearing the cache here has almost no impact on cache performance as we
	// would clear it shortly after anyways.
	// 清空xdsCache
	s.Cache.Clear(sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: hostname, Namespace: namespace}))

	return pushType
}

Endpoint事件处理器根据Endpoint的变化更新与服务相关的缓存,判断本次Endpoint资源的更新是否需要触发全量的xDS更新。在服务网各种变化最多、最快的往往是Endpoint,因为增量EDS的更新能够大大降低系统的资源(CPU、内存、带宽)开销,提升服务网格的稳定性

参考:

《Istio权威指南 下》

2.深入Istio源码:Pilot服务发现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/43949.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

OpenHarmony与HarmonyOS联系与区别

目录 1. 背景 2.OpenHarmony 3.HarmonyOS 4.鸿蒙生态 5.OpenHarmony与HarmonyOS的技术上实现区别 1.语言支持 2.SDK 的不同 3.运行调测方式不同 4.对APK的兼容性不同 5.包含关系 6.调试命令 6.何时选择OpenHarmony或是HarmonyOS&#xff1f; 1. 背景 开篇就说“关于…

2023最新谷粒商城笔记之Sentinel概述篇(全文总共13万字,超详细)

Sentinel概述 服务流控、熔断和降级 什么是熔断 当扇出链路的某个微服务不可用或者响应时间太长时&#xff0c;会进行服务的降级&#xff0c;**进而熔断该节点微服务的调用&#xff0c;快速返回错误的响应信息。**检测到该节点微服务调用响应正常后恢复调用链路。A服务调用B服…

Spring Security 身份验证的基本类/架构

目录 1、SecurityContextHolder 核心类 2、SecurityContext 接口 3、Authentication 用户认证信息接口 4、GrantedAuthority 拥有权限接口 5、AuthenticationManager 身份认证管理器接口 6、ProviderManager 身份认证管理器的实现 7、AuthenticationProvider 特定类型的…

数字孪生管控系统,智慧园区楼宇合集

智慧园区是指将物联网、大数据、人工智能等技术应用于传统建筑和基础设施&#xff0c;以实现对园区的全面监控、管理和服务的一种建筑形态。通过将园区内设备、设施和系统联网&#xff0c;实现数据的传输、共享和响应&#xff0c;提高园区的管理效率和运营效益&#xff0c;为居…

【Spring Cloud Gateway 新一代网关】—— 每天一点小知识

&#x1f4a7; S p r i n g C l o u d G a t e w a y 新一代网关 \color{#FF1493}{Spring Cloud Gateway 新一代网关} SpringCloudGateway新一代网关&#x1f4a7; &#x1f337; 仰望天空&#xff0c;妳我亦是行人.✨ &#x1f984; 个人主页——微风撞见云的博客&a…

中医药行业如何进行数字化转型?看天津同仁堂谈“有道有术有零代码”

张伯礼院士曾指出&#xff0c;中药制造的现代化水平&#xff0c;还停留在10%左右的阶段。中医药行业&#xff0c;老字号企业&#xff0c;该如何通过数字化焕发新活力&#xff1f; 天津同仁堂通过与伙伴云合作&#xff0c;零代码构建数字化系统&#xff0c;让技术与思维共同成长…

html,css初学

安装VSCODE ,插件&#xff1a;live server &#xff0c;html support html 然后为了更好地理解&#xff0c;请逐步输入&#xff0c;并及时查看效果 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><title>D…

PCB封装设计指导(十一)画出脚标,极性标识和特殊器件标识

PCB封装设计指导(十一)画出脚标,极性标识,特殊器件标识 定义完pin number之后,就需要画出器件的脚标,极性标识,特殊标识等丝印相关的信息了,这些说明对辅助PCB布局有很好的作用,当然对后续贴片也很有帮助。 如何添加,具体见如下说明 1.脚标一般都用数字表示,silks…

力扣热门100题之和为k的子数组【中等】

题目描述 给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 该数组中和为 k 的连续子数组的个数 。 示例 1&#xff1a; 输入&#xff1a;nums [1,1,1], k 2 输出&#xff1a;2 示例 2&#xff1a; 输入&#xff1a;nums [1,2,3], k 3 输出&#xff1a;2 …

Acwing.898 数字三角形(动态规划)

题目 给定一个如下图所示的数字三角形&#xff0c;从顶部出发&#xff0c;在每一结点可以选择移动至其左下方的结点或移动至其右下方的结点&#xff0c;一直走到底层&#xff0c;要求找出─条路径&#xff0c;使路径上的数字的和最大。 输入格式 第一行包含整数n&#xff0…

MES管理系统给汽配企业带来了哪些效益

汽车工业是一个庞大的社会经济系统工程&#xff0c;不同于普通产品&#xff0c;汽车产品是一个高度综合的最终产品&#xff0c;需要组织专业化协作的社会化大生产&#xff0c;需要相关工业产品与之配套。如何提高生产效率和产品质量成为了一个关键问题&#xff0c;而汽配企业ME…

EtherCAT转TCP/IP网关EtherCAT解决方案

你是否曾经为生产管理系统的数据互联互通问题烦恼过&#xff1f;曾经因为协议不同导致通讯问题而感到困惑&#xff1f;现在&#xff0c;我们迎来了突破性的进展&#xff01; 介绍捷米特JM-TCPIP-ECT&#xff0c;一款自主研发的Ethercat从站功能的通讯网关。它能够连接到Etherc…

RDIFramework.NET CS敏捷开发框架 V6.0发布(支持.NET6+、Framework双引擎,全网唯一)

全新RDIFramework.NET V6.0 CS敏捷开发框架发布&#xff0c;全网唯一支持.NET6&#xff0c;Framework双引擎&#xff0c;降低开发成本&#xff0c;提高产品质量&#xff0c;提升用户体验与开发团队稳定性&#xff0c;做软件就选RDIFramework.NET开发框架。 1、RDIFramework.NET…

毓恬冠佳冲刺上市:打破汽车天窗外商垄断,长安汽车为其主要客户

撰稿|行星 来源|贝多财经 7月23日&#xff0c;上海毓恬冠佳科技股份有限公司&#xff08;以下简称“毓恬冠佳”&#xff09;在深圳证券交易所的审核状态变更为“已问询”。据贝多财经了解&#xff0c;毓恬冠佳于2023年6月27日递交招股书&#xff0c;准备在创业板上市。 本次冲…

Linux---详解进程信号

进程信号 &#x1f373;信号理解&#x1f9c8;什么是信号&#xff1f;&#x1f95e;进程信号&#x1f953;查看系统信号&#x1f969;在技术角度理解信号&#x1f357;注意 &#x1f356;信号处理&#x1f9c7;信号异步机制 &#x1f354;信号产生&#x1f35f;通过终端按键产生…

vue中使用jsMind生成思维导图 截图功能踩坑

npm i jsmind先安装&#xff0c;再引入 import jsmind/style/jsmind.css import jsMind from jsmind/js/jsmind.js require(jsmind/js/jsmind.draggable.js) require(jsmind/js/jsmind.screenshot.js)正常引入是这样的&#xff0c;然后渲染也没问题 <template><div …

如何打开工业相机(海康)与halcon方式打开

使用海康相机&#xff0c;下载对应的客户端软件 地址&#xff1a;https://www.hikrobotics.com/cn/machinevision/service/download 界面如下&#xff1a; 使用 halcon 读取相机&#xff0c;需要将对应的动态链接库dll文件放入halcon的安装目录中&#xff0c;如下&#xff0c;…

全志F1C200S嵌入式驱动开发(spi-nor驱动)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing @163.com】 和v3s一样,f1c200s本身也支持spi-nor flash。当然,不管是norflash,还是nandflash,都是为了能够让程序脱离sd卡,直接依靠板子上面的flash,就可以完成正常地加载和运行工作。tf…

MySQL数据库优化

MySQL数据库优化 1.1 SQL及索引优化1.2 数据库表结构优化1.3 系统配置优化1.4 硬件配置优化 2 SQL及索引优化2.1 慢查日志2.1.1 检查慢查日志是否开启2.1.2 MySQL慢查日志的存储格式 2.2 MySQL慢查日志分析工具&#xff08;mysqldumpslow&#xff09;2.2.1 介绍2.2.2 用法 2.3 …

二进制子集题解

样例输入&#xff1a; 13样例输入&#xff1a; 0 1 4 5 8 9 12 13思路分析&#xff1a; 这道题大体就是进制转换然后按位 d f s dfs dfs。进制转换比较好理解&#xff0c;不懂得可以自行 b d f s ( 百度优先搜索 ) bdfs(百度优先搜索) bdfs(百度优先搜索)一下。 代码&#…