目录
1.什么是OpenTelemetry
2.搭建jaeger
3.链路追踪
本地调用
远程调用
GRPC
proto
server端
client端
Gin-HTTP
调用流程
api1
api2
grpc
4.完整代码
1.什么是OpenTelemetry
参考:OpenTelemetry-1.介绍-CSDN博客
2.搭建jaeger
参考:Jaeger分布式链路追踪工具-CSDN博客
3.链路追踪
本地调用
package main
import (
"context"
"log"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)
const (
service = "local"
environment = "dev"
id = 1
)
// tracerProvider returns an OpenTelemetry TracerProvider configured to use
// the Jaeger exporter that will send spans to the provided url. The returned
// TracerProvider will also use a Resource configured with all the information
// about the application.
func tracerProvider(url string) (*tracesdk.TracerProvider, error) {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
// Always be sure to batch in production.
tracesdk.WithBatcher(exp),
// Record information about this application in a Resource.
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(service),
attribute.String("environment", environment),
attribute.Int64("ID", id),
)),
)
return tp, nil
}
func main() {
tp, err := tracerProvider("http://10.128.175.196:14268/api/traces")
if err != nil {
log.Fatal(err)
}
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it.
otel.SetTracerProvider(tp)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Cleanly shutdown and flush telemetry when the application exits.
defer func(ctx context.Context) {
// Do not make the application hang when it is shutdown.
ctx, cancel = context.WithTimeout(ctx, time.Second*5)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
log.Fatal(err)
}
}(ctx)
tr := tp.Tracer("component-main")
ctx, span := tr.Start(ctx, "foo")
defer span.End()
bar(ctx)
}
func bar(ctx context.Context) {
// Use the global TracerProvider.
tr := otel.Tracer("component-bar")
_, span := tr.Start(ctx, "bar")
span.SetAttributes(attribute.Key("testset").String("value"))
defer span.End()
// Do bar...
}
参考:opentelemetry-go/example/jaeger/main.go at v1.16.0 · open-telemetry/opentelemetry-go · GitHub
远程调用
grpc
proto
syntax = "proto3"; // 指定proto版本
package hello_grpc; // 指定默认包名
// 指定golang包名
option go_package = "/hello_grpc";
//定义rpc服务
service HelloService {
// 定义函数
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
// HelloRequest 请求内容
message HelloRequest {
string name = 1;
string message = 2;
}
// HelloResponse 响应内容
message HelloResponse{
string name = 1;
string message = 2;
}
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.33.0
// protoc v3.9.0
// source: grpc_proto/hello.proto
package hello_grpc
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// HelloRequest 请求内容
type HelloRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
}
func (x *HelloRequest) Reset() {
*x = HelloRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_grpc_proto_hello_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *HelloRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HelloRequest) ProtoMessage() {}
func (x *HelloRequest) ProtoReflect() protoreflect.Message {
mi := &file_grpc_proto_hello_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HelloRequest.ProtoReflect.Descriptor instead.
func (*HelloRequest) Descriptor() ([]byte, []int) {
return file_grpc_proto_hello_proto_rawDescGZIP(), []int{0}
}
func (x *HelloRequest) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *HelloRequest) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
// HelloResponse 响应内容
type HelloResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
}
func (x *HelloResponse) Reset() {
*x = HelloResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_grpc_proto_hello_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *HelloResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HelloResponse) ProtoMessage() {}
func (x *HelloResponse) ProtoReflect() protoreflect.Message {
mi := &file_grpc_proto_hello_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HelloResponse.ProtoReflect.Descriptor instead.
func (*HelloResponse) Descriptor() ([]byte, []int) {
return file_grpc_proto_hello_proto_rawDescGZIP(), []int{1}
}
func (x *HelloResponse) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *HelloResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
var File_grpc_proto_hello_proto protoreflect.FileDescriptor
var file_grpc_proto_hello_proto_rawDesc = []byte{
0x0a, 0x16, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x68, 0x65, 0x6c,
0x6c, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x5f,
0x67, 0x72, 0x70, 0x63, 0x22, 0x3c, 0x0a, 0x0c, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x22, 0x3d, 0x0a, 0x0d, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x32, 0x51, 0x0a, 0x0c, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63,
0x65, 0x12, 0x41, 0x0a, 0x08, 0x53, 0x61, 0x79, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x12, 0x18, 0x2e,
0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x5f, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x48, 0x65, 0x6c, 0x6c, 0x6f,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x5f,
0x67, 0x72, 0x70, 0x63, 0x2e, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x22, 0x00, 0x42, 0x0d, 0x5a, 0x0b, 0x2f, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x5f, 0x67,
0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_grpc_proto_hello_proto_rawDescOnce sync.Once
file_grpc_proto_hello_proto_rawDescData = file_grpc_proto_hello_proto_rawDesc
)
func file_grpc_proto_hello_proto_rawDescGZIP() []byte {
file_grpc_proto_hello_proto_rawDescOnce.Do(func() {
file_grpc_proto_hello_proto_rawDescData = protoimpl.X.CompressGZIP(file_grpc_proto_hello_proto_rawDescData)
})
return file_grpc_proto_hello_proto_rawDescData
}
var file_grpc_proto_hello_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_grpc_proto_hello_proto_goTypes = []interface{}{
(*HelloRequest)(nil), // 0: hello_grpc.HelloRequest
(*HelloResponse)(nil), // 1: hello_grpc.HelloResponse
}
var file_grpc_proto_hello_proto_depIdxs = []int32{
0, // 0: hello_grpc.HelloService.SayHello:input_type -> hello_grpc.HelloRequest
1, // 1: hello_grpc.HelloService.SayHello:output_type -> hello_grpc.HelloResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_grpc_proto_hello_proto_init() }
func file_grpc_proto_hello_proto_init() {
if File_grpc_proto_hello_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_grpc_proto_hello_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*HelloRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_grpc_proto_hello_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*HelloResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_grpc_proto_hello_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_grpc_proto_hello_proto_goTypes,
DependencyIndexes: file_grpc_proto_hello_proto_depIdxs,
MessageInfos: file_grpc_proto_hello_proto_msgTypes,
}.Build()
File_grpc_proto_hello_proto = out.File
file_grpc_proto_hello_proto_rawDesc = nil
file_grpc_proto_hello_proto_goTypes = nil
file_grpc_proto_hello_proto_depIdxs = nil
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// HelloServiceClient is the client API for HelloService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type HelloServiceClient interface {
// 定义函数
SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
}
type helloServiceClient struct {
cc grpc.ClientConnInterface
}
func NewHelloServiceClient(cc grpc.ClientConnInterface) HelloServiceClient {
return &helloServiceClient{cc}
}
func (c *helloServiceClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error) {
out := new(HelloResponse)
err := c.cc.Invoke(ctx, "/hello_grpc.HelloService/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// HelloServiceServer is the server API for HelloService service.
type HelloServiceServer interface {
// 定义函数
SayHello(context.Context, *HelloRequest) (*HelloResponse, error)
}
// UnimplementedHelloServiceServer can be embedded to have forward compatible implementations.
type UnimplementedHelloServiceServer struct {
}
func (*UnimplementedHelloServiceServer) SayHello(context.Context, *HelloRequest) (*HelloResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented")
}
func RegisterHelloServiceServer(s *grpc.Server, srv HelloServiceServer) {
s.RegisterService(&_HelloService_serviceDesc, srv)
}
func _HelloService_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HelloRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(HelloServiceServer).SayHello(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/hello_grpc.HelloService/SayHello",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HelloServiceServer).SayHello(ctx, req.(*HelloRequest))
}
return interceptor(ctx, in, info, handler)
}
var _HelloService_serviceDesc = grpc.ServiceDesc{
ServiceName: "hello_grpc.HelloService",
HandlerType: (*HelloServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _HelloService_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "grpc_proto/hello.proto",
}
server端
package main
import (
"context"
"fmt"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go_otel/2go_grpc/grpc_proto/hello_grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"log"
"net"
)
const (
service = "grpc_server"
environment = "dev"
id = 2
)
var tracer = otel.Tracer(service)
// HelloServer 得有一个结构体,需要实现这个服务的全部方法,叫什么名字不重要
type HelloServer struct {
}
func (HelloServer) SayHello(ctx context.Context, request *hello_grpc.HelloRequest) (*hello_grpc.HelloResponse, error) {
fmt.Println("入参:", request.Name, request.Message)
_, span := tracer.Start(ctx, "grpc")
span.SetAttributes(attribute.Key("request.Name").String(request.Name))
span.SetAttributes(attribute.Key("request.Message").String(request.Message))
defer span.End()
return &hello_grpc.HelloResponse{
Name: "server",
Message: "hello " + request.Name,
}, nil
}
func tracerProvider() (*tracesdk.TracerProvider, error) {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://10.128.175.196:14268/api/traces")))
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
// Always be sure to batch in production.
tracesdk.WithSampler(tracesdk.AlwaysSample()),
tracesdk.WithBatcher(exp),
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(service),
attribute.String("environment", environment),
attribute.Int64("ID", id),
),
),
)
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it.
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, nil
}
func main() {
tp, err := tracerProvider()
if err != nil {
log.Fatal(err)
}
// Cleanly shutdown and flush telemetry when the application exits.
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Fatal(err)
}
}()
// grpc
listen, err := net.Listen("tcp", ":8080")
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
// 创建一个gRPC服务器实例。
s := grpc.NewServer(
//引入grpc-middleware定义的拦截器
grpc.ChainUnaryInterceptor(
//openTelemetry 链路追踪
otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(tp)),
),
)
server := HelloServer{}
// 将server结构体注册为gRPC服务。
hello_grpc.RegisterHelloServiceServer(s, &server)
fmt.Println("grpc server running :8080")
// 开始处理客户端请求。
err = s.Serve(listen)
}
client端
package main
import (
"context"
"fmt"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv/v1.17.0"
"go_otel/2go_grpc/grpc_proto/hello_grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"os"
"time"
)
const (
service = "grpc_client"
environment = "dev"
id = 2
)
func tracerProvider() (*tracesdk.TracerProvider, error) {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://10.128.175.196:14268/api/traces")))
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
// Always be sure to batch in production.
tracesdk.WithSampler(tracesdk.AlwaysSample()),
tracesdk.WithBatcher(exp),
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(service),
attribute.String("environment", environment),
attribute.Int64("ID", id),
),
),
)
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it.
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, nil
}
func main() {
//创建traceProvider
tp, err := tracerProvider()
if err != nil {
fmt.Println("NewTracerProvider err:", err)
os.Exit(1)
}
conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()),
//Unary拦截器
grpc.WithChainUnaryInterceptor(
//openTelemetry 链路追踪
otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(tp)),
),
//Stream拦截器
grpc.WithChainStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
// Pre-processing logic
s := time.Now()
cs, err := streamer(ctx, desc, cc, method, opts...)
// Post-processing logic
log.Printf("method: %s, latency: %s\n", method, time.Now().Sub(s))
return cs, err
}),
)
defer func() {
println("关闭TracerProvider。所有注册的跨度处理器都会按照它们注册的顺序关闭,并释放所有持有的计算资源。")
if err := tp.Shutdown(context.Background()); err != nil {
panic(err)
}
}()
if err != nil {
log.Fatalf("connection failed,err:%s", err)
}
// 初始化客户端
client := hello_grpc.NewHelloServiceClient(conn)
result, err := client.SayHello(context.Background(), &hello_grpc.HelloRequest{
Name: "client",
Message: "hello",
})
fmt.Println(result, err)
}
gin-http
调用流程
api1 ----> api2 ----> grpc
api1
package main
import (
"context"
"fmt"
"github.com/gin-gonic/gin"
"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
oteltrace "go.opentelemetry.io/otel/trace"
"log"
"net/http"
modahttp "github.com/webws/go-moda/transport/http"
)
const (
service = "api1"
environment = "dev"
id = 3
)
var tracer = otel.Tracer(service)
func tracerProvider() (*tracesdk.TracerProvider, error) {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://10.128.175.196:14268/api/traces")))
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
// Always be sure to batch in production.
tracesdk.WithSampler(tracesdk.AlwaysSample()),
tracesdk.WithBatcher(exp),
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(service),
attribute.String("environment", environment),
attribute.Int64("ID", id),
),
),
)
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it.
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, nil
}
func main() {
tp, err := tracerProvider()
if err != nil {
log.Fatal(err)
}
// Cleanly shutdown and flush telemetry when the application exits.
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Fatal(err)
}
}()
r := gin.New()
r.Use(otelgin.Middleware("gin-api1"))
loadRoutes(r)
fmt.Println("api1 start 8081")
r.Run(":8081")
}
func loadRoutes(r *gin.Engine) {
r.GET("/ping", pingFunc)
}
func pingFunc(c *gin.Context) {
tracerCtx, span := tracer.Start(c.Request.Context(), "pingFunc",
oteltrace.WithAttributes(attribute.String("key-pingFunc", "val-pingFunc")))
defer span.End()
callApi2(tracerCtx)
c.JSON(http.StatusOK, gin.H{
"message": "pong",
})
}
func callApi2(ctx context.Context) {
// tracer
fmt.Println("on callApi2")
tracerCtx, span := tracer.Start(ctx, "callApi2")
span.AddEvent("starting callApi2")
span.SetAttributes(attribute.Key("key_callApi2").String("value_callApi2"))
defer span.End()
// 打印TraceID SpanID
spanCtx := oteltrace.SpanContextFromContext(ctx)
fmt.Println(spanCtx.TraceID())
fmt.Println(spanCtx.SpanID())
// 调用api2
url := fmt.Sprintf("http://localhost:8082/ping")
defer span.End()
_, err := modahttp.CallAPI(tracerCtx, url, "GET", nil)
if err != nil {
fmt.Printf("call api2 error: %v", err)
span.SetStatus(codes.Error, err.Error())
}
}
api2
package main
import (
"context"
"fmt"
"github.com/gin-gonic/gin"
"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
oteltrace "go.opentelemetry.io/otel/trace"
"go_otel/3Gin-HTTP/grpc/grpc_proto/hello_grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"net/http"
)
const (
service = "api2"
environment = "dev"
id = 2
)
var tracer = otel.Tracer(service)
func tracerProvider() (*tracesdk.TracerProvider, error) {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://10.128.175.196:14268/api/traces")))
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
// Always be sure to batch in production.
tracesdk.WithSampler(tracesdk.AlwaysSample()),
tracesdk.WithBatcher(exp),
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(service),
attribute.String("environment", environment),
attribute.Int64("ID", id),
),
),
)
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it.
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, nil
}
func main() {
tp, err := tracerProvider()
if err != nil {
log.Fatal(err)
}
// Cleanly shutdown and flush telemetry when the application exits.
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Fatal(err)
}
}()
r := gin.New()
r.Use(otelgin.Middleware("gin-api2"))
loadRoutes(r)
fmt.Println("api2 start 8082")
r.Run(":8082")
}
func loadRoutes(r *gin.Engine) {
r.GET("/ping", pingFunc)
}
func pingFunc(c *gin.Context) {
fmt.Println("on pingFunc")
ctx, span := tracer.Start(c.Request.Context(), "pingFunc")
defer span.End()
callGrpc(ctx)
c.JSON(http.StatusOK, gin.H{
"message": "pong",
})
}
func callGrpc(ctx context.Context) {
fmt.Println("on callGrpc")
ctx, span := tracer.Start(ctx, "callGrpc")
defer span.End()
// 打印TraceID SpanID
spanCtx := oteltrace.SpanContextFromContext(ctx)
fmt.Println(spanCtx.TraceID())
fmt.Println(spanCtx.SpanID())
// 连接服务器
options := make([]grpc.DialOption, 0)
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()))
conn, err := grpc.DialContext(ctx, "localhost:8083", options...)
if err != nil {
panic(err)
}
defer conn.Close()
// 初始化客户端
client := hello_grpc.NewHelloServiceClient(conn)
result, err := client.SayHello(ctx, &hello_grpc.HelloRequest{
Name: "client",
Message: "hello",
})
fmt.Println(result, err)
}
grpc
package main
import (
"context"
"fmt"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
oteltrace "go.opentelemetry.io/otel/trace"
"go_otel/3Gin-HTTP/grpc/grpc_proto/hello_grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"log"
"net"
)
const (
service = "remote-grpc"
environment = "dev"
id = 3
)
var tracer = otel.Tracer(service)
// HelloServer 得有一个结构体,需要实现这个服务的全部方法,叫什么名字不重要
type HelloServer struct {
}
func (HelloServer) SayHello(ctx context.Context, request *hello_grpc.HelloRequest) (*hello_grpc.HelloResponse, error) {
fmt.Println("on grpc SayHello")
_, span := tracer.Start(ctx, "SayHello",
oteltrace.WithAttributes(attribute.String("key-pingFunc", "val-pingFunc")))
defer span.End()
spanCtx := oteltrace.SpanContextFromContext(ctx)
fmt.Println(spanCtx.SpanID())
fmt.Println(spanCtx.TraceID())
fmt.Println("入参:", request.Name, request.Message)
return &hello_grpc.HelloResponse{
Name: "server",
Message: "hello " + request.Name,
}, nil
}
func tracerProvider() (*tracesdk.TracerProvider, error) {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://10.128.175.196:14268/api/traces")))
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
// Always be sure to batch in production.
tracesdk.WithSampler(tracesdk.AlwaysSample()),
tracesdk.WithBatcher(exp),
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(service),
attribute.String("environment", environment),
attribute.Int64("ID", id),
),
),
)
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it.
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return tp, nil
}
func main() {
tp, err := tracerProvider()
if err != nil {
log.Fatal(err)
}
// Cleanly shutdown and flush telemetry when the application exits.
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Fatal(err)
}
}()
// grpc
fmt.Println("grpc start 8083")
listen, err := net.Listen("tcp", ":8083")
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
// 创建一个gRPC服务器实例。
// 添加拦截器(grpc集成OpenTelemetry主要是调用(otelgrpc.UnaryServerInterceptor())
s := grpc.NewServer(grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()))
server := HelloServer{}
// 将server结构体注册为gRPC服务。
hello_grpc.RegisterHelloServiceServer(s, &server)
// 开始处理客户端请求。
err = s.Serve(listen)
}
4.完整代码
go_opentelemetry_study: Golang使用OpenTelemetry