要实现配置自动实时刷新,需要改造之前的代码。
服务端改造
服务端增加一个版本号version,新增配置的时候为1,每次更新配置就加1。
@Override
public long insertConfigDO(ConfigDO configDO) {
insertLock.lock();
try {
long id = 1;
List<ConfigDO> configList = getAllConfig();
if (!configList.isEmpty()) {
id = configList.get(configList.size() - 1).getId() + 1;
}
configDO.setId(id);
configDO.setVersion(1);
Optional.of(configDO).filter(c -> c.getCreateTime() == null).ifPresent(c -> c.setCreateTime(LocalDateTime.now()));
String configPathStr = standalonePath + "/config";
Files.createDirectories(Paths.get(configPathStr));
Path path = Paths.get(configPathStr + "/" + id + ".conf");
Files.write(path, JSON.toJSONString(configDO).getBytes(StandardCharsets.UTF_8), StandardOpenOption.CREATE_NEW);
return id;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
insertLock.unlock();
}
}
@Override
public void updateConfig(ConfigDO configDO) {
ConfigDO dbConfigDO = getConfig(configDO.getId());
Optional.ofNullable(dbConfigDO).map(c -> {
c.setName(configDO.getName());
c.setVersion(c.getVersion() + 1);
c.setUpdateTime(LocalDateTime.now());
c.setUpdateUid(configDO.getUpdateUid());
c.setConfigData(configDO.getConfigData());
return c;
}).ifPresent(this::updateConfigDO);
}
再增加一个接口判断verion是否发生变化
@GetMapping("/change/get")
public Result<List<ConfigVO>> getChangeConfig(@RequestBody Map<Long, Integer> configIdMap) {
if (configIdMap == null || configIdMap.isEmpty()) {
return Result.fail("配置参数错误");
}
Result<List<ConfigBO>> result = configService.getAllValidConfig();
if (result.failed()) {
return Result.resultToFail(result);
}
return Result.success(result.getData().stream()
.filter(c -> configIdMap.containsKey(c.getId()))
.filter(c -> c.getVersion() > configIdMap.get(c.getId()))
.map(this::configBO2ConfigVO).collect(Collectors.toList()));
}
客户端改造
客户端对获取到的配置,做了一下改造,把json转换成了property格式,即user.name=xxx。并且存储到了一个以配置ID为key,配置对象ConfigBO为value的map configMap里。具体的结构如下
@Data
public class ConfigBO {
/**
* 配置id
*/
private long id;
/**
* 配置版本号
*/
private int version;
/**
* 配置项列表
*/
private List<ConfigDataBO> configDataList;
}
@Data
public class ConfigDataBO {
/**
* 配置key
*/
private String key;
/**
* 配置值
*/
private String value;
/**
* 自动刷新的bean字段列表
*/
List<RefreshFieldBO> refreshFieldList;
public void addRefreshField(RefreshFieldBO refreshFieldBO) {
Optional.ofNullable(refreshFieldList).orElseGet(() -> refreshFieldList = new ArrayList<>()).add(refreshFieldBO);
}
}
@Data
@AllArgsConstructor
public class RefreshFieldBO {
/**
* 对象实例
*/
private Object bean;
/**
* 字段
*/
private Field field;
}
获取配置和之前一样,只不过调用的位置改成了ConfigCenterClient中,将配置转换成<配置key,配置值>的map提供给外部程序调用
public ConfigCenterClient(String url) {
this.url = url;
//将配置中心的配置转换成property格式,即user.name=xxx
List<ConfigVO> configList = getAllValidConfig();
this.configMap = Optional.ofNullable(configList).map(list -> list.stream().map(configVO -> {
Map<String, Object> result = new HashMap<>();
DataTransUtil.buildFlattenedMap(result, configVO.getConfigData(), "");
ConfigBO configBO = new ConfigBO();
configBO.setId(configVO.getId());
configBO.setVersion(configVO.getVersion());
configBO.setConfigDataList(result.entrySet().stream().map(e -> {
ConfigDataBO configDataBO = new ConfigDataBO();
configDataBO.setKey(e.getKey());
configDataBO.setValue(e.getValue().toString());
return configDataBO;
}).collect(Collectors.toList()));
return configBO;
}).collect(Collectors.toMap(ConfigBO::getId, Function.identity(), (k1, k2) -> k1))).orElseGet(HashMap::new);
}
public Map<String, String> getConfigProperty() {
return configMap.values().stream().map(ConfigBO::getConfigDataList).filter(Objects::nonNull)
.flatMap(List::stream).collect(Collectors.toMap(ConfigDataBO::getKey, ConfigDataBO::getValue, (k1, k2) -> k1));
}
使用方式
public class ClientTest {
private String userName;
private String userAge;
private List<Object> education;
public ClientTest() {
ConfigCenterClient configCenterClient = new ConfigCenterClient("http://localhost:8088");
Map<String, String> configProperty = configCenterClient.getConfigProperty();
this.userName = configProperty.get("user.name");
this.userAge = configProperty.get("user.age");
this.education = new ArrayList<>();
int i = 0;
while (configProperty.containsKey("user.education[" + i + "]")) {
education.add(configProperty.get("user.education[" + (i++) + "]"));
}
}
public String toString() {
return "姓名:" + userName + ",年龄:" + userAge + ",教育经历:" + education;
}
public static void main(String[] args) {
ClientTest clientTest = new ClientTest();
System.out.println(clientTest);
}
}
好了改造完毕,下面开始进入正题
短轮询
短轮询就是客户端不断的去请求/config/change/get接口判断配置是否发生了变化,如果发生了变化返回给客户端,客户端拿到新配置后通过反射修改对象的成员变量
首先将需要实时刷新的配置加入到自动刷新的bean字段列表中,然后启动一个定时任务1秒钟访问一次/config/change/get接口,如果有变化,更新本地配置map,并刷新对象中的配置成员变量
public void addRefreshField(String key, RefreshFieldBO refreshFieldBO) {
configMap.values().stream().map(ConfigBO::getConfigDataList).filter(Objects::nonNull)
.flatMap(List::stream).filter(configDataBO -> configDataBO.getKey().equals(key))
.findFirst().ifPresent(configDataBO -> configDataBO.addRefreshField(refreshFieldBO));
}
public void startShortPolling() {
Thread thread = new Thread(() -> {
while (!Thread.interrupted()) {
try {
Thread.sleep(1000);
Map<Long, List<ConfigDataBO>> refreshConfigMap = new HashMap<>();
configMap.values().forEach(configBO -> {
Optional.ofNullable(configBO.getConfigDataList()).ifPresent(cdList -> cdList.stream()
.filter(cd -> cd.getRefreshFieldList() != null && !cd.getRefreshFieldList().isEmpty())
.forEach(refreshConfigMap.computeIfAbsent(configBO.getId(), k1 -> new ArrayList<>())::add));
});
if (refreshConfigMap.isEmpty()) {
return;
}
Map<String, Integer> configIdMap = refreshConfigMap.keySet().stream()
.collect(Collectors.toMap(String::valueOf, configId -> configMap.get(configId).getVersion()));
HttpRespBO httpRespBO = HttpUtil.httpPostJson(url + "/config/change/get", JSON.toJSONString(configIdMap));
List<ConfigVO> configList = httpResp2ConfigVOList(httpRespBO);
if (configList.isEmpty()) {
continue;
}
configList.forEach(configVO -> {
Map<String, Object> result = new HashMap<>();
DataTransUtil.buildFlattenedMap(result, configVO.getConfigData(), "");
ConfigBO configBO = this.configMap.get(configVO.getId());
configBO.setVersion(configVO.getVersion());
List<ConfigDataBO> configDataList = configBO.getConfigDataList();
Map<String, ConfigDataBO> configDataMap = configDataList.stream()
.collect(Collectors.toMap(ConfigDataBO::getKey, Function.identity()));
result.forEach((key, value) -> {
ConfigDataBO configDataBO = configDataMap.get(key);
if (configDataBO == null) {
configDataList.add(new ConfigDataBO(key, value.toString()));
} else {
configDataBO.setValue(value.toString());
List<RefreshFieldBO> refreshFieldList = configDataBO.getRefreshFieldList();
if (refreshFieldList == null) {
refreshFieldList = new ArrayList<>();
configDataBO.setRefreshFieldList(refreshFieldList);
}
refreshFieldList.forEach(refreshFieldBO -> {
try {
Field field = refreshFieldBO.getField();
field.setAccessible(true);
field.set(refreshFieldBO.getBean(), value.toString());
} catch (Exception e) {
log.error("startShortPolling set Field error", e);
}
});
}
});
});
} catch (Exception e) {
log.error("startShortPolling error", e);
}
}
});
thread.setName("startShortPolling");
thread.setDaemon(true);
thread.start();
}
public class ClientTest {
private String userName;
private String userAge;
private List<Object> education;
public ClientTest() throws NoSuchFieldException {
ConfigCenterClient configCenterClient = new ConfigCenterClient("http://localhost:8088");
Map<String, String> configProperty = configCenterClient.getConfigProperty();
this.userName = configProperty.get("user.name");
this.userAge = configProperty.get("user.age");
this.education = new ArrayList<>();
int i = 0;
while (configProperty.containsKey("user.education[" + i + "]")) {
education.add(configProperty.get("user.education[" + (i++) + "]"));
}
configCenterClient.addRefreshField("user.name", new RefreshFieldBO(this, ClientTest.class.getDeclaredField("userName")));
configCenterClient.startShortPolling();
}
public String toString() {
return "姓名:" + userName + ",年龄:" + userAge + ",教育经历:" + education;
}
public static void main(String[] args) throws NoSuchFieldException, InterruptedException {
ClientTest clientTest = new ClientTest();
while (!Thread.interrupted()) {
System.out.println(clientTest);
Thread.sleep(1000);
}
}
}
效果
修改配置