写在前面
阅读本文需要最起码了解envoy
相关的概念
本文只是一个类似于demo
的测试,只为了学习istio
,更好的理解istio
中的控制面和数据面(pilot -> proxy)是如何交互的,下图的蓝色虚线
先说go-control-plane
是什么,它是一个用于实现xDS API
的golang
库,xDS API
是Envoy
用于动态配置的协议。我们实现了go-control-plane
就可以做到
- 动态配置管理:允许控制面动态更新数据面代理的配置
- 支持多种 xDS 资源
- 缓存和版本管理:提供快照缓存机制,管理配置的版本和更新
使用go-control-plane
我们可以自定义一个控制面来管理Envoy
,从而实现动态的服务发现、负载均衡和路由等等…
xds
则是一系列服务发现的总称
- lds 监听器服务发现
- rds 路由服务发现
- cds 集群服务发现
- eds 端点服务发现
- ads 聚合服务发现
等等等等,还有一些其他的服务发现,本文不涉及就没有说到,如果不理解这些概念,建议先去官网了解一下 https://www.envoyproxy.io
实现 go-control-plane
功能描述
上文是envoy
代理的架构,程序中的逻辑我使用倒叙的方式描述,主要的步骤如下
- 创建
endpoint
地址是www.envoyproxy.io
端口是443
- 创建
cluster
叫做xds_demo_cluster
它的端点就是上面创建的 - 创建路由在
filter_chins
下的http_connection_manager
中名称叫做xds_demo_route
,没有任何的路由规则,路由的cluster
名称(请求转发的目的地)叫做xds_demo_cluster
- 最后创建
listener
名称是listener_xds_demo
监听的地址是0.0.0.0:12000
整体流程就是当我们访问localhost:12000
的时候会将我们的请求转发到www.envoyproxy.io
。
如果运行过默认的envoy
的用户可能就会发现我程序中下发的配置就是默认运行envoy
时的配置,只不过默认运行envoy
是静态配置文件
的方式就是所有的配置都写在envoy.yaml
中,而本文是动态的方式。
envoy
有多种运行方式本文不做赘述
功能实现
package main
import (
"context"
"log"
"net"
"time"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
routerv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
http_connection_manager "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
tlsv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
listenerservice "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3"
routeservice "github.com/envoyproxy/go-control-plane/envoy/service/route/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
)
func main() {
ctx := context.Background()
lis, err := net.Listen("tcp", ":18000")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
sc := cache.NewSnapshotCache(true, cache.IDHash{}, nil)
srv := server.NewServer(ctx, sc, nil)
// new grpc server
gs := grpc.NewServer()
clusterservice.RegisterClusterDiscoveryServiceServer(gs, srv)
endpointservice.RegisterEndpointDiscoveryServiceServer(gs, srv)
listenerservice.RegisterListenerDiscoveryServiceServer(gs, srv)
routeservice.RegisterRouteDiscoveryServiceServer(gs, srv)
discoverygrpc.RegisterAggregatedDiscoveryServiceServer(gs, srv)
err = setSnapshot(ctx, "xds-node-id", sc)
if err != nil {
log.Fatalf("set snapshot error: %v", err)
} else {
log.Println("set snapshot success")
}
log.Println("Starting control plane server...")
if err := gs.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}
func setSnapshot(ctx context.Context, nodeId string, sc cache.SnapshotCache) error {
clusterName := "xds_demo_cluster"
manager := buildHttpManager(clusterName)
fcs := buildFilterChain(manager)
serviceListener := buildListener("0.0.0.0", 12000, fcs)
serviceEndpoint := buildEndpoint()
serviceCluster := buildCluster(clusterName, serviceEndpoint)
rs := map[resource.Type][]types.Resource{
resource.ClusterType: {serviceCluster},
resource.EndpointType: {serviceEndpoint},
resource.ListenerType: {serviceListener},
resource.RouteType: {manager},
}
snapshot, err := cache.NewSnapshot("1", rs)
if err != nil {
log.Fatalf("new snapshot error: %v", err)
}
return sc.SetSnapshot(ctx, nodeId, snapshot)
}
func buildFilterChain(manager *http_connection_manager.HttpConnectionManager) []*listener.FilterChain {
managerPB, err := anypb.New(manager)
if err != nil {
log.Fatalf("Failed to marshal HttpConnectionManager: %v", err)
}
fcs := make([]*listener.FilterChain, 0, 0)
fcs = append(fcs, &listener.FilterChain{
Filters: []*listener.Filter{
{
Name: "envoy.filters.network.http_connection_manager",
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: managerPB,
},
},
},
})
return fcs
}
func buildHttpManager(clusterName string) *http_connection_manager.HttpConnectionManager {
xdsRoute := &route.RouteConfiguration{
Name: "xds_demo_route",
VirtualHosts: []*route.VirtualHost{
{
Name: "xds_demo_service",
Domains: []string{"*"},
Routes: []*route.Route{
{
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
},
Action: &route.Route_Route{
Route: &route.RouteAction{
HostRewriteSpecifier: &route.RouteAction_HostRewriteLiteral{
HostRewriteLiteral: "www.envoyproxy.io",
},
// 集群要去下文一致
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: clusterName,
},
},
},
},
},
},
},
}
routerConfig, _ := anypb.New(&routerv3.Router{})
// http 链接管理器
manager := &http_connection_manager.HttpConnectionManager{
StatPrefix: "ingress_http",
RouteSpecifier: &http_connection_manager.HttpConnectionManager_RouteConfig{
RouteConfig: xdsRoute,
},
HttpFilters: []*http_connection_manager.HttpFilter{
{
Name: "envoy.filters.http.router",
ConfigType: &http_connection_manager.HttpFilter_TypedConfig{
TypedConfig: routerConfig,
},
},
},
SchemeHeaderTransformation: &corev3.SchemeHeaderTransformation{
Transformation: &corev3.SchemeHeaderTransformation_SchemeToOverwrite{
SchemeToOverwrite: "https",
},
},
}
return manager
}
func buildEndpoint() *endpoint.LbEndpoint {
epTarget := &endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &corev3.Address{
Address: &corev3.Address_SocketAddress{
SocketAddress: &corev3.SocketAddress{
Address: "www.envoyproxy.io",
PortSpecifier: &corev3.SocketAddress_PortValue{
PortValue: 443,
},
},
},
},
},
},
}
return epTarget
}
func buildCluster(clusterName string, ep *endpoint.LbEndpoint) *cluster.Cluster {
serviceCluster := &cluster.Cluster{
Name: clusterName,
ConnectTimeout: durationpb.New(time.Second * 3),
ClusterDiscoveryType: &cluster.Cluster_Type{
Type: cluster.Cluster_STRICT_DNS,
},
DnsLookupFamily: cluster.Cluster_V4_ONLY,
LbPolicy: cluster.Cluster_ROUND_ROBIN,
LoadAssignment: &endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []*endpoint.LocalityLbEndpoints{
{
LbEndpoints: []*endpoint.LbEndpoint{ep},
},
},
},
TransportSocket: &corev3.TransportSocket{
Name: "envoy.transport_sockets.tls",
ConfigType: nil,
},
}
us := &tlsv3.UpstreamTlsContext{
Sni: "www.envoyproxy.io",
}
tlsConfig, _ := anypb.New(us)
serviceCluster.TransportSocket.ConfigType = &corev3.TransportSocket_TypedConfig{
TypedConfig: tlsConfig,
}
return serviceCluster
}
func buildListener(ip string, port uint32, fcs []*listener.FilterChain) *listener.Listener {
return &listener.Listener{
Name: "listener_xds_demo",
Address: &corev3.Address{
Address: &corev3.Address_SocketAddress{
SocketAddress: &corev3.SocketAddress{
Address: ip,
PortSpecifier: &corev3.SocketAddress_PortValue{
PortValue: port,
},
},
},
},
// 过滤器链
FilterChains: fcs,
}
}
cache.NewSnapshotCache
返回一个SnapshotCache
是go-control-plane
中的一个核心组件,用于管理和存储Envoy
所需的xDS
资源的快照,并且向Envoy
实例推送更新server.NewServer
创建xDS
服务实例sc.SetSnapshot
用于将生成的配置快照设置到指定的节点上,是动态配置和更新Envoy
代理的行为,入参有一个id
是下文envoy
引导配置中的id
截止到现在我们就可以启动这个服务,我们要记住当前服务监听的地址,因为在envoy.yaml
中会需要使用到的
为 envoy 设置引导配置
引导配置(bootstrap configuration),引导配置文件通常指定控制面地址(如xDS
服务器地址)、监听器、集群、管理接口等基本信息。
node:
id: xds-node-id
cluster: xds-node-cluster
dynamic_resources:
ads_config:
api_type: GRPC
grpc_services:
- envoy_grpc:
cluster_name: xds_cluster
cds_config:
ads: {}
lds_config:
ads: {}
static_resources:
clusters:
- name: xds_cluster
connect_timeout: 1s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
http2_protocol_options: {}
load_assignment:
cluster_name: xds_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 18000
admin:
access_log_path: /dev/null
address:
socket_address:
address: 0.0.0.0
port_value: 9901
解释
- static_resources
- 定义静态集群
xds_cluster
,用于与xDS
服务器通信。这里的xDS
服务器运行在127.0.0.1:18000
就是我们上面服务监听的地址
- 定义静态集群
- dynamic_resources
- ads_config: 配置聚合发现服务(ADS)来动态获取配置,使用
gRPC
服务与xds_cluster
进行通信 cds_config
和lds_config
分别表示使用ADS
来获取配置
- ads_config: 配置聚合发现服务(ADS)来动态获取配置,使用
- admin
- 定义管理接口,监听
0.0.0.0:9901
,用于查看Envoy
的状态和配置
- 定义管理接口,监听
- node
- id 节点的唯一标识符,用于在
xDS
服务器中区分不同的Envoy
实例 - cluster 节点所属的集群名称。
- id 节点的唯一标识符,用于在
然后启动envoy
,从输出的日志中我们可以看到通过控制面下发的配置,数据面已经加载成功了。
[2024-06-27 07:47:53.524][1][info][main] [source/server/server.cc:977] starting main dispatch loop
[2024-06-27 07:47:53.526][1][info][upstream] [source/common/upstream/cds_api_helper.cc:32] cds: add 1 cluster(s), remove 1 cluster(s)
[2024-06-27 07:47:53.527][1][info][upstream] [source/common/upstream/cds_api_helper.cc:71] cds: added/updated 1 cluster(s), skipped 0 unmodified cluster(s)
[2024-06-27 07:47:53.544][1][info][upstream] [source/common/upstream/cluster_manager_impl.cc:240] cm init: all clusters initialized
[2024-06-27 07:47:53.544][1][info][main] [source/server/server.cc:957] all clusters initialized. initializing init manager
[2024-06-27 07:47:53.546][1][info][upstream] [source/common/listener_manager/lds_api.cc:106] lds: add/update listener 'listener_xds_demo'
[2024-06-27 07:47:53.546][1][info][config] [source/common/listener_manager/listener_manager_impl.cc:930] all dependencies initialized. starting workers
下一步我们可以访问一下监听的地址,可以看到成功转发到了envoy
的官方网站。
~ $ curl http://localhost:12000 | grep title
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
19 15571 19 3098 0 0 5286 0 0:00:02 --:--:-- 0:00:02 5286 <title>Envoy proxy - home</title>
100 15571 100 15571 0 0 21685 0 --:--:-- --:--:-- --:--:-- 21686
我们还可以通过 ~ $ curl http://localhost:9901/config_dump
来查看envoy
的实时配置
写在最后
动态配置的方式是在内存中加载配置,不会更新到静态的文件中。
更高级、复杂的用法可以参考istio
;具体来说pilot
watch
集群中的服务、端点、配置等资源的变化。当检测到这些资源的变化时,pilot
会生成新的配置,并通过xDS API
将更新推送到相应的Envoy
实例,从而实现动态配置和管理服务网格中的流量控制和路由规则。这样可以确保 Envoy
始终具有最新的服务发现信息和路由配置。
源码目录 https://github.com/istio/istio/tree/master/pilot