文章目录
- 前言
- 正文
- 实现架构
- 实现模型
- OAP 同步 Apollo
- ConfigWatcherRegister
- ConfigChangeWatcher
- Agent 侧
前言
本文代码 OAP 基于 v9.7,Java Agent 基于 v9.1,配置中心使用 apollo。
看本文需要配合代码“食用”。
正文
Skywalking 中就使用这种模型实现了 Agent 同步Apollo 配置,本文介绍下提供的功能以及代码实现,一起学习下。
Skywalking 支持 agent 动态更新配置,使 agent 可以依据业务需求进行自定义配置;更重要的是建立起这一个通信机制,那么 agent 的可管理性、扩展性都大大提升。
目前 Skywalking 提供了以下配置项
按照文档描述,主要为以下内容:
-
控制采样速率
-
忽略指定后缀的请求,注意必须是 first span 的 opretationName 匹配到
针对 web 服务,有些静态资源是放在服务端,那么可以过滤掉这些请求
-
忽略某些 path 的 trace
-
限定每个 segment 中的 span 最大数量
-
是否收集执行 sql 的参数
样例配置
configurations:
serviceA:
trace.sample_n_per_3_secs: 1000
trace.ignore_path: /a/b/c,/a1/b1/c1
serviceB:
trace.sample_n_per_3_secs: 1000
trace.ignore_path: /a/b/c,/a1/b1/c1
注意:这个是按照服务来进行逐项配置,如果不需要变动,不要添加对应 key,会使用默认值。
实现架构
-
OAP 同步 Apollo 配置
-
Agent 同步 OAP 配置。
每阶段的操作无关联,都是作为 Client 的一端发起的请求来同步数据。
实现模型
配置动态变更实际上是一个订阅发布模型,简单描述就是有发布者和订阅者两种角色,之间交互一般是:有一个注册接口,方便订阅者注册自身,以及发布者可以获取到订阅者列表;一个通知接口,方便发布者发送消息给订阅者。
例如需要订水,只要给订水公司留下自己的电话、地址及数量(发布者知道如何找到你),之后就有人送水上门(有水时进行派送)。
这种模型理解起来很简单,实现上难度也不大,且使用场景很广泛。
OAP 同步 Apollo
首先看下 OAP 是如何同步 apollo 数据。
ConfigWatcherRegister
这是一个抽象类,代表的是配置中心的角色,实现上有 apollo、nacos、zk 等方式。
先看下 notifySingleValue 方法:
protected void notifySingleValue(final ConfigChangeWatcher watcher, ConfigTable.ConfigItem configItem) {
String newItemValue = configItem.getValue();
if (newItemValue == null) {
if (watcher.value() != null) {
// Notify watcher, the new value is null with delete event type.
// 调用 watcher 的 notify 进行处理
watcher.notify(
new ConfigChangeWatcher.ConfigChangeEvent(null, ConfigChangeWatcher.EventType.DELETE));
} else {
// Don't need to notify, stay in null.
}
} else {
if (!newItemValue.equals(watcher.value())) {
watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(
newItemValue,
ConfigChangeWatcher.EventType.MODIFY
));
} else {
// Don't need to notify, stay in the same config value.
}
}
}
该方法的逻辑是:读取 configItem 中的值,并且与 watcher 中的值进行比较,不相等之后判定是 DELETE、还是 UPDATE 操作,并封装成一个 ConfigChangeEvent 发送给 ConfigChangeWatcher,那么可以看出 ConfigChangeWatcher 是个订阅者的角色。
继续看下调用 notifySingleValue 方法的地方:
FetchingConfigWatcherRegister#singleConfigsSync
private final Register singleConfigChangeWatcherRegister = new Register();
public abstract Optional<ConfigTable> readConfig(Set<String> keys);
private void singleConfigsSync() {
// 1. 读取配置数据
Optional<ConfigTable> configTable = readConfig(singleConfigChangeWatcherRegister.keys());
// Config table would be null if no change detected from the implementation.
configTable.ifPresent(config -> {
config.getItems().forEach(item -> {
// 2. 遍历获取配置中的 itemName
String itemName = item.getName();
// 3. 依据 itemName 找到 WatcherHolder
WatcherHolder holder = singleConfigChangeWatcherRegister.get(itemName);
if (holder == null) {
return;
}
ConfigChangeWatcher watcher = holder.getWatcher();
// 从 WatcherHolder 得到 ConfigChangeWatcher,发送通知
notifySingleValue(watcher, item);
});
});
}
该方法执行的逻辑就是:
- 依据 singleConfigChangeWatcherRegister.keys() 作为参数读取配置信息
- 遍历配置信息,依据配置中的 name(即 itemName)找到 WatcherHolder,进而获取 ConfigChangeWatcher
- 调用 notifySingleValue。
readConfig 是个抽象方法,由具体的配置中心插件实现,本例中使用的 apollo,具体实现就是 ApolloConfigWatcherRegister。
读取到的内容类型 ConfigTable,并且可以知道是存储的 k-v 集合,那么 ConfigItem 就是每个配置项,itemName 就是 apollo 中配置的 key。
再看看调用 singleConfigsSync 的逻辑:
// FetchingConfigWatcherRegister.java
public void start() {
isStarted = true;
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this::configSync, // 启动定时任务来执行
t -> log.error("Sync config center error.", t)
), 0, syncPeriod, TimeUnit.SECONDS);
}
void configSync() {
singleConfigsSync();
groupConfigsSync();
}
再回到 singleConfigsSync 中,读取配置时需要先获取到配置项的 key 的集合:singleConfigChangeWatcherRegister.keys()
先看下 singleConfigChangeWatcherRegister 的具体实现:FetchingConfigWatcherRegister$Register 内部就是一个 Map<String, WatcherHolder> 来存储。
static class Register {
private Map<String, WatcherHolder> register = new HashMap<>();
private boolean containsKey(String key) {
return register.containsKey(key);
}
private void put(String key, WatcherHolder holder) {
register.put(key, holder);
}
public WatcherHolder get(String name) {
return register.get(name);
}
public Set<String> keys() {
return register.keySet();
}
}
有读取就有存储,看看调用 put 逻辑:
// FetchingConfigWatcherRegister
synchronized public void registerConfigChangeWatcher(ConfigChangeWatcher watcher) {
WatcherHolder holder = new WatcherHolder(watcher);
if (singleConfigChangeWatcherRegister.containsKey(
holder.getKey()) || groupConfigChangeWatcherRegister.containsKey(holder.getKey())) {
}
switch (holder.getWatcher().getWatchType()) {
case SINGLE:
// put 调用
singleConfigChangeWatcherRegister.put(holder.getKey(), holder);
break;
case GROUP:
groupConfigChangeWatcherRegister.put(holder.getKey(), holder);
break;
default:
}
}
registerConfigChangeWatcher 方法,用于注册 ConfigChangeWatcher ,内部处理逻辑:先将 watcher 放入 watchHolder 中,再以 holder key 分开存储 holder(放入 FetchingConfigWatcherRegister$Register 中)。
WatcherHolder 是 ConfigWatcherRegister 一个内部类,代码如下,重点是 key 生成规则:String.join(".", watcher.getModule(), watcher.getProvider().name(), watcher.getItemName());
,每个 itemName 对应一个 watcher。
@Getter
protected static class WatcherHolder {
private ConfigChangeWatcher watcher;
private final String key;
public WatcherHolder(ConfigChangeWatcher watcher) {
this.watcher = watcher;
this.key = String.join(
".", watcher.getModule(), watcher.getProvider().name(),
watcher.getItemName()
);
}
}
总结:OAP 启动定时任务,同步 apollo 的配置数据,遍历每个配置项(configItem),找到对应的 ConfigChangerWater,将 watcher 中的值与 configItem 中的值进行比较,不相等之后继续判定是 DELETE、还是 UPDATE 操作,封装成一个 ConfigChangeEvent 发送给对应的 ConfigChangeWatcher。
ConfigChangeWatcher
抽象类,依据命名,表示的是关注配置变化的 watcher,是 OAP 中定义的用于对不同配置的具体实现;对于 Apollo 上的每个 Key 都有对应的 ConfigChangeWatcher。
具体的 ConfigChangeWatcher 获取到 ConfigChangeEvent,处理逻辑各有不同,本次具体看下 AgentConfigurationsWatcher。
private volatile String settingsString;
private volatile AgentConfigurationsTable agentConfigurationsTable;
public void notify(ConfigChangeEvent value) {
if (value.getEventType().equals(EventType.DELETE)) {
settingsString = null;
this.agentConfigurationsTable = new AgentConfigurationsTable();
} else {
settingsString = value.getNewValue();
AgentConfigurationsReader agentConfigurationsReader =
new AgentConfigurationsReader(new StringReader(value.getNewValue()));
this.agentConfigurationsTable = agentConfigurationsReader.readAgentConfigurationsTable();
}
}
方法逻辑为:config value 存储到了 agentConfigurationsTable。
apollo value 是什么样子呢?
configurations:
serviceA:
trace.sample_n_per_3_secs: 1000
trace.ignore_path: /a/b/c,/a1/b1/c1
serviceB:
trace.sample_n_per_3_secs: 1000
trace.ignore_path: /a/b/c,/a1/b1/c1
AgentConfigurationsTable 如下具体实现
public class AgentConfigurationsTable {
private Map<String, AgentConfigurations> agentConfigurationsCache;
public AgentConfigurationsTable() {
this.agentConfigurationsCache = new HashMap<>();
}
}
public class AgentConfigurations {
private String service;
private Map<String, String> configuration;
/**
* The uuid is calculated by the dynamic configuration of the service.
*/
private volatile String uuid;
public AgentConfigurations(final String service, final Map<String, String> configuration, final String uuid) {
this.service = service;
this.configuration = configuration;
this.uuid = uuid;
}
}
将 agentConfigurationsTable 转换成 json 展示更容里理解数据存储的结构:
{
"serviceB": {
"service": "serviceB",
"configuration": {
"trace.sample_n_per_3_secs": "1000",
"trace.ignore_path": "/a/b/c,/a1/b1/c1"
},
"uuid": "92670f1ccbdee60e14ffc0"
},
"serviceA": {
"service": "serviceA",
"configuration": {
"trace.sample_n_per_3_secs": "1000",
"trace.ignore_path": "/a/b/c,/a1/b1/c1"
},
"uuid": "92670f1ccbdee60e14ffc0"
}
}
查看读取 agentConfigurationsTable 值的逻辑:
// AgentConfigurationsWatcher#getAgentConfigurations
public AgentConfigurations getAgentConfigurations(String service) {
// 依据 service 获取数据
AgentConfigurations agentConfigurations = this.agentConfigurationsTable.getAgentConfigurationsCache().get(service);
if (null == agentConfigurations) {
return emptyAgentConfigurations;
} else {
return agentConfigurations;
}
}
继续查看调用 getAgentConfigurations 的代码,并且将 value 包装成 ConfigurationDiscoveryCommand 返回。
// ConfigurationDiscoveryServiceHandler#fetchConfigurations
public void fetchConfigurations(final ConfigurationSyncRequest request,
final StreamObserver<Commands> responseObserver) {
Commands.Builder commandsBuilder = Commands.newBuilder();
AgentConfigurations agentConfigurations = agentConfigurationsWatcher.getAgentConfigurations(
request.getService());
if (null != agentConfigurations) {
// 请求时会带有 uuid,会跟现有配置的 uuid 进行比对,如果不同,则获取最新值
if (disableMessageDigest || !Objects.equals(agentConfigurations.getUuid(), request.getUuid())) {
ConfigurationDiscoveryCommand configurationDiscoveryCommand =
newAgentDynamicConfigCommand(agentConfigurations);
commandsBuilder.addCommands(configurationDiscoveryCommand.serialize().build());
}
}
responseObserver.onNext(commandsBuilder.build());
responseObserver.onCompleted();
}
ConfigurationDiscoveryServiceHandler 属于 GRPCHandler,类似 SpringBoot 中 Controller,暴露接口,外部就可以获取数据。
ConfigurationDiscoveryCommand 这个方法中有个属性来标识 command 的具体类型,这个在 agent 端接收到 command 需要依据 command 类型找到真正的处理器。
public static final String NAME = "ConfigurationDiscoveryCommand";
总结:当 AgentConfigurationsWatcher 收到订阅的 ConfigChangeEvent 时,会将值存储至 AgentConfigurationsTable,之后通过 ConfigurationDiscoveryServiceHandler 暴露接口,以方便 agent 可以获取到相应服务的配置。
至此,OAP 与 Apollo 间的配置更新逻辑以及值的处理逻辑大致理清了。
接下来看看 agent 与 oap 间的交互。
Agent 侧
找到调用 ConfigurationDiscoveryServiceGrpc#fetchConfigurations 的代码,看到 ConfigurationDiscoveryService,查看具体调用逻辑:
// ConfigurationDiscoveryService
private void getAgentDynamicConfig() {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
try {
// 准备参数
ConfigurationSyncRequest.Builder builder = ConfigurationSyncRequest.newBuilder();
builder.setService(Config.Agent.SERVICE_NAME);
if (configurationDiscoveryServiceBlockingStub != null) {
final Commands commands = configurationDiscoveryServiceBlockingStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).fetchConfigurations(builder.build()); // 方法调用
// 结果处理
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
} catch (Throwable t) {
}
}
}
而 getAgentDynamicConfig 是在 ConfigurationDiscoveryService#boot 执行时 init 了一个定时任务调用。
public void boot() throws Throwable {
getDynamicConfigurationFuture = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("ConfigurationDiscoveryService")
).scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this::getAgentDynamicConfig,
t -> LOGGER.error("Sync config from OAP error.", t)
),
Config.Collector.GET_AGENT_DYNAMIC_CONFIG_INTERVAL,
Config.Collector.GET_AGENT_DYNAMIC_CONFIG_INTERVAL,
TimeUnit.SECONDS
);
}
获取结果后的处理逻辑:CommandService 接收 Commands,先是放入到队列中,
private LinkedBlockingQueue<BaseCommand> commands = new LinkedBlockingQueue<>(64);
public void receiveCommand(Commands commands) {
for (Command command : commands.getCommandsList()) {
try {
BaseCommand baseCommand = CommandDeserializer.deserialize(command);
// 将结果放入队列中
boolean success = this.commands.offer(baseCommand);
if (!success && LOGGER.isWarnEnable()) {
}
} catch (UnsupportedCommandException e) {
}
}
}
新开线程来消费队列,commandExecutorService 处理 Commands,通过代码调用链看到,最后依据 command 的类型找到真正指令执行器。
// CommandService#run
public void run() {
final CommandExecutorService commandExecutorService = ServiceManager.INSTANCE.findService(CommandExecutorService.class);
while (isRunning) {
try {
// 消费队列
BaseCommand command = this.commands.take();
// 判断是否已经执行过了
if (isCommandExecuted(command)) {
continue;
}
// 分发 command
commandExecutorService.execute(command);
serialNumberCache.add(command.getSerialNumber());
} catch (CommandExecutionException e) {
}
}
}
// CommandExecutorService#execute
public void execute(final BaseCommand command) throws CommandExecutionException {
this.executorForCommand(command).execute(command);
}
// CommandExecutorService#executorForCommand
private CommandExecutor executorForCommand(final BaseCommand command) {
final CommandExecutor executor = this.commandExecutorMap.get(command.getCommand());
if (executor != null) {
return executor;
}
return NoopCommandExecutor.INSTANCE;
}
依据指令类型获取具体的指令执行器,这里为 ConfigurationDiscoveryService,发现又调用了 ConfigurationDiscoveryService#handleConfigurationDiscoveryCommand 处理。
// ConfigurationDiscoveryService#handleConfigurationDiscoveryCommand
public void handleConfigurationDiscoveryCommand(ConfigurationDiscoveryCommand configurationDiscoveryCommand) {
final String responseUuid = configurationDiscoveryCommand.getUuid();
List<KeyStringValuePair> config = readConfig(configurationDiscoveryCommand);
// 遍历配置项
config.forEach(property -> {
String propertyKey = property.getKey();
List<WatcherHolder> holderList = register.get(propertyKey);
for (WatcherHolder holder : holderList) {
if (holder != null) {
// 依据配置项找到对应的 AgentConfigChangeWatcher,封装成 ConfigChangeEvent
AgentConfigChangeWatcher watcher = holder.getWatcher();
String newPropertyValue = property.getValue();
if (StringUtil.isBlank(newPropertyValue)) {
if (watcher.value() != null) {
// Notify watcher, the new value is null with delete event type.
watcher.notify(
new AgentConfigChangeWatcher.ConfigChangeEvent(
null, AgentConfigChangeWatcher.EventType.DELETE
));
}
} else {
if (!newPropertyValue.equals(watcher.value())) {
watcher.notify(new AgentConfigChangeWatcher.ConfigChangeEvent(
newPropertyValue, AgentConfigChangeWatcher.EventType.MODIFY
));
}
}
}
}
});
this.uuid = responseUuid;
}
ConfigurationDiscoveryService#handleConfigurationDiscoveryCommand 进行处理,遍历配置项列表,依据 Key 找到对应的 AgentConfigChangeWatcher,进行 notify。
这个过程是不是很熟悉,跟 OAP 中处理逻辑不能说是完全一样,简直一模一样。
AgentConfigChangeWatcher 是个抽象类,查看其具体实现,关注其注册以及处理 value 的逻辑即可。
具体逻辑就不再展开细说了,需要自行了解下。
总之,agent 可以进行动态配置,能做的事情就多了,尤其是对 agent.config 中的配置大部分就可以实现动态管理了。