Etcd入门
什么是Etcd
GitHub:https://github.com/etcd-io/etcd
Etcd数据结构与特性
键值对格式,类似文件层次结构。
Etcd如何保证数据一致性?
表面来看,Etcd支持事务操作
,能够保证数据一致性。
底层来看,Etcd使用Raft一致性算法
保证数据一致性。
官方可视化地址:http://play.etcd.io/play
可以深度了解,raft算法运行机制。
现在是一主两从两stop。
停止主节点。
此时主节点挂了,并没有选择新的主节点上线,因为还剩两个节点,一人一票,都没有胜出无法选择出新的Leader,这种现象也成为“
脑裂
”。
启动node2,发现node3成为了Leader,此时不会有平票的情况。
Etcd基本操作
增删改查。
写数据
读数据
前缀搜索
Etcd安装
安装:https://github.com/etcd-io/etcd/releases
有不同系统安装启动脚本。
安装完成会有三个脚本:
- etcd: etcd服务本身
- etcdctl:客户端,用户操作etcd,如读写数据
- etcdutl:备份恢复工具
执行etcd脚本,会启动etcd服务,服务默认占用2379和2380两个端口
2379:提供HTTP API服务,和etcdctl交互
2380:集群中节点通讯
Etcd可视化工具
etcdkeeper:https://github.com/evildecay/etcdkeeper
下载安装启动完毕,访问http://127.0.0.1:8080/etcdkeeper
Etcd Java客户端
jtecd:https://github.com/etcd-io/jetcd
1)引入依赖
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.7.7</version>
</dependency>
2)demo
public class EtcdRegistry {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// create client using endpoints
Client client = Client.builder().endpoints("http://localhost:2379")
.build();
KV kvClient = client.getKVClient();
ByteSequence key = ByteSequence.from("likelong".getBytes());
ByteSequence value = ByteSequence.from("666".getBytes());
// put the key-value
kvClient.put(key, value).get();
// get the CompletableFuture
CompletableFuture<GetResponse> getFuture = kvClient.get(key);
// get the value from CompletableFuture
GetResponse response = getFuture.get();
System.out.println("value = " + response);
// delete the key
kvClient.delete(key).get();
}
}
上述代码使用KVClient操作Etcd读写数据,除了KVClient客户端外,Etcd还提供了其他客户端。
3)常用客户端
绝大多数情况,前三个就够用。
Java Etcd数据结构
除了有基本的KV,还有版本、创建版本、修改版本等字段。Etcd中每个键都有一个与之关联的版本号,用于跟踪键的修改历史。当键值发生变化,版本号也会随之增加。
存储结构设计
存储结构设计几个要点:
- key如何设计?
- value如何设计?
- key什么时候过期?
结合Etcd数据存储结构特点(支持层级查询),以及一个服务会有多个服务提供者实例(负载均衡),可以设计为层级结构。
层级结构:将服务理解为文件夹、将服务对应的一个节点理解为文件夹下的文件,可以通过服务名称,用前缀查询的方式查询到某个服务的所有节点。
如下:键名规则可以为:业务前缀/服务名/服务节点地址
如果是Redis作为注册中心,可以设计为列表结构(Redis本身支持列表数据结构)。
列表结构:将所有服务节点以列表的形式整体作为value。
设置key过期超时时间,如30s,当服务宕机时,超时自动移除。
etcd选择层级结构。
开发实现
1. 注册中心开发
1)注册信息定义
ServiceMetaInfo
类,封装服务注册信息,包括服务名称、服务版本号、服务地址(域名和端口)、服务分组等。
/**
* 服务元信息(注册信息)
*/
public class ServiceMetaInfo {
/**
* 服务名称
*/
private String serviceName;
/**
* 服务版本号
*/
private String serviceVersion = "1.0";
/**
* 服务域名
*/
private String serviceHost;
/**
* 服务端口号
*/
private Integer servicePort;
/**
* 服务分组(暂未实现)
*/
private String serviceGroup = "default";
}
添加方法,获取服务键名、获取服务注册节点键名以及获取服务访问地址。
/**
* 获取服务键名
*/
public String getServiceKey() {
// 后续可扩展服务分组
// return String.format("%s:%s:%s", serviceName, serviceVersion, serviceGroup);
return String.format("%s:%s", serviceName, serviceVersion);
}
/**
* 获取服务注册节点键名
*/
public String getServiceNodeKey() {
return String.format("%s/%s:%s", getServiceKey(), serviceHost, servicePor t);
}
/**
* 获取完整服务地址(服务调用会用到)
*/
public String getServiceAddress() {
if (!StrUtil.contains(serviceHost, "http")) {
return String.format("http://%s:%s", serviceHost, servicePort);
}
return String.format("%s:%s", serviceHost, servicePort);
}
2)注册中心配置
/**
* RPC 框架注册中心配置
*/
public class RegistryConfig {
/**
* 注册中心类别
*/
private String registry = "etcd";
/**
* 注册中心地址
*/
private String address = "http://localhost:2380";
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 超时时间(单位毫秒)
*/
private Long timeout = 10000L;
}
3)注册中心接口
定义注册中心接口,后续可以实现多种不同的注册中心。可以使用SPI机制,动态加载。
提供注册中心初始化、注册服务、注销服务、服务发现(获取服务节点列表)、服务销毁
等方法。
/**
* 注册中心接口
*/
public interface Registry {
/**
* 初始化
*
* @param registryConfig
*/
void init(RegistryConfig registryConfig);
/**
* 注册服务(服务端)
*
* @param serviceMetaInfo
*/
void register(ServiceMetaInfo serviceMetaInfo) throws Exception;
/**
* 注销服务(服务端)
*
* @param serviceMetaInfo
*/
void unRegister(ServiceMetaInfo serviceMetaInfo);
/**
* 服务发现(获取某服务的所有节点,消费端)
*
* @param serviceKey 服务键名
* @return
*/
List<ServiceMetaInfo> serviceDiscovery(String serviceKey);
/**
* 服务销毁
*/
void destroy();
}
4)Etcd注册中心实现
public class EtcdRegistry implements Registry {
private static final Logger logger = LoggerFactory.getLogger(EtcdRegistry.class);
private Client client;
private KV kvClient;
/**
* 根节点
*/
private static final String ETCD_ROOT_PATH = "/rpc/";
@Override
public void init(RegistryConfig registryConfig) {
logger.info("etcd注册中心初始化...");
client = Client.builder().endpoints(registryConfig.getAddress()).connectTimeout(Duration.ofMillis(registryConfig.getTimeout())).build();
kvClient = client.getKVClient();
}
/**
* 服务注册(默认30s自动剔除)
*
* @param serviceMetaInfo 服务元信息
*/
@Override
public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
// 创建 Lease 和 KV 客户端
Lease leaseClient = client.getLeaseClient();
// 创建一个 30 秒的租约
long leaseId = leaseClient.grant(30).get().getID();
// 设置要存储的键值对
String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);
ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);
// 将键值对与租约关联起来,并设置过期时间
PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
kvClient.put(key, value, putOption).get();
}
public void unRegister(ServiceMetaInfo serviceMetaInfo) {
kvClient.delete(ByteSequence.from(ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey(), StandardCharsets.UTF_8));
}
public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
// 前缀搜索,结尾一定要加 '/'
String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";
try {
// 前缀查询
GetOption getOption = GetOption.builder().isPrefix(true).build();
List<KeyValue> keyValues = kvClient.get(
ByteSequence.from(searchPrefix, StandardCharsets.UTF_8),
getOption)
.get()
.getKvs();
// 解析服务信息
return keyValues.stream()
.map(keyValue -> {
String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
return JSONUtil.toBean(value, ServiceMetaInfo.class);
})
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException(String.format("serviceKey=%s, 获取服务列表失败", serviceKey), e);
}
}
public void destroy() {
logger.info("etcd注册中心下线...");
// 释放资源
if (kvClient != null) {
kvClient.close();
}
if (client != null) {
client.close();
}
}
}
2.SPI机制配置和扩展注册中心
使用SPI机制,读取配置文件,初始化对应的注册中心。
对应类属性位置:RpcConfig#registryConfig#registry
可以自己实现接口,自行扩展注册中心,也可以使用默认etcd注册中心。
SPI机制:添加对应SPI配置文件。
注册中心工厂
类似序列化器,创建注册中心工厂,可以通过配置文件配置注册中心类型,指定对应注册中心。
/**
* 注册中心工厂(用于获取注册中心对象)
*/
public class RegistryFactory {
static {
SpiLoader.load(Registry.class);
}
/**
* 默认注册中心
*/
private static final Registry DEFAULT_REGISTRY = new EtcdRegistry();
/**
* 获取实例
*
* @param key 注册中心键值
* @return 注册中心实例
*/
public static Registry getInstance(String key) {
return SpiLoader.getInstance(Registry.class, key);
}
}
3.RPC调用
1)服务代理类
服务代理类,使用jdk动态代理(实现InvocationHandler类),用于生成代理对象实现远程调用
。
/**
* 服务代理(JDK 动态代理)
*/
public class ServiceProxy implements InvocationHandler {
/**
* 调用代理
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
// 指定序列化器
final Serializer serializer = SerializerFactory.getInstance(rpcConfig.getSerializer());
// 构造请求
String serviceName = method.getDeclaringClass().getName();
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setServiceName(serviceName);
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setArgs(args);
try {
// 序列化
byte[] bodyBytes = serializer.serialize(rpcRequest);
// 从注册中心获取服务提供者请求地址
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(RpcConstants.DEFAULT_SERVICE_VERSION);
List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
if (CollUtil.isEmpty(serviceMetaInfoList)) {
throw new RuntimeException("暂无服务地址");
}
// 先默认取第一个,后续优化
ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);
// 发送请求
try (HttpResponse httpResponse = HttpRequest.post(selectedServiceMetaInfo.getServiceAddress())
.body(bodyBytes)
.execute()) {
byte[] result = httpResponse.bodyBytes();
// 反序列化
RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
return rpcResponse.getData();
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
2)服务代理工厂
/**
* 服务代理工厂(工厂模式,用于创建代理对象)
*/
public class ServiceProxyFactory {
/**
* 根据服务类获取代理对象
*
* @param serviceClass
* @param <T>
* @return
*/
public static <T> T getProxy(Class<T> serviceClass) {
return (T) Proxy.newProxyInstance(
serviceClass.getClassLoader(),
new Class[]{serviceClass},
new ServiceProxy());
}
}
基于服务代理类,生成代理对象,实现服务远程调用
3)本地注册中心
用于存放接口名与具体实现类映射,便于获取
/**
* 本地注册中心
*/
public class LocalRegistry {
/**
* 注册信息存储
*/
private static final Map<String, Class<?>> map = new ConcurrentHashMap<>();
/**
* 注册服务
*
* @param serviceName 接口
* @param implClass 实现类
*/
public static void register(String serviceName, Class<?> implClass) {
map.put(serviceName, implClass);
}
/**
* 获取服务
*
* @param serviceName
* @return
*/
public static Class<?> get(String serviceName) {
return map.get(serviceName);
}
/**
* 删除服务
*
* @param serviceName
*/
public static void remove(String serviceName) {
map.remove(serviceName);
}
}
4)web服务器
Vertx官方文档:https://vertx.io/
使用Vertx实现web服务器
添加依赖
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.5.1</version>
</dependency>
/**
* HTTP 服务器接口
*/
public interface HttpServer {
/**
* 启动服务器
*
* @param port
*/
void doStart(int port);
}
/**
* Vertx HTTP 服务器
*/
public class VertxHttpServer implements HttpServer {
private static final Logger logger = LoggerFactory.getLogger(VertxHttpServer.class);
/**
* 启动服务器
*
* @param port 端口
*/
public void doStart(int port) {
// 创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
// 创建 HTTP 服务器
io.vertx.core.http.HttpServer server = vertx.createHttpServer();
// 处理请求
server.requestHandler(new HttpServerHandler());
// 启动 HTTP 服务器并监听指定端口
server.listen(port, result -> {
if (result.succeeded()) {
logger.info("Server is now listening on port " + port);
} else {
logger.error("Failed to start server: " + result.cause());
}
});
}
}
具体业务处理逻辑
/**
* HTTP 请求处理器
*/
public class HttpServerHandler implements Handler<HttpServerRequest> {
private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
@Override
public void handle(HttpServerRequest request) {
// 指定序列化器
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
// 记录日志
logger.info("Received request: " + request.method() + " " + request.uri());
// 异步处理 HTTP 请求
request.bodyHandler(body -> {
byte[] bytes = body.getBytes();
RpcRequest rpcRequest = null;
try {
rpcRequest = serializer.deserialize(bytes, RpcRequest.class);
} catch (Exception e) {
e.printStackTrace();
}
// 构造响应结果对象
RpcResponse rpcResponse = new RpcResponse();
// 如果请求为 null,直接返回
if (rpcRequest == null) {
rpcResponse.setMessage("rpcRequest is null");
doResponse(request, rpcResponse, serializer);
return;
}
try {
// 获取要调用的服务实现类,通过反射调用
Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
// 封装返回结果
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType());
rpcResponse.setMessage("ok");
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setMessage(e.getMessage());
rpcResponse.setException(e);
}
// 响应
doResponse(request, rpcResponse, serializer);
});
}
/**
* 响应
*
* @param request
* @param rpcResponse
* @param serializer
*/
void doResponse(HttpServerRequest request, RpcResponse rpcResponse, Serializer serializer) {
HttpServerResponse httpServerResponse = request.response()
.putHeader("content-type", "application/json");
try {
// 序列化
byte[] serialized = serializer.serialize(rpcResponse);
httpServerResponse.end(Buffer.buffer(serialized));
} catch (IOException e) {
e.printStackTrace();
httpServerResponse.end(Buffer.buffer());
}
}
}
5)服务提供者服务注册
创建服务提供者module,引入starlink-rpc-core
模块
RPC相关配置application.properties
rpc.name=starlink
rpc.version=1.0
rpc.serverPort=8081
rpc.serializer=hessian
服务接口及实现类
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String name) {
return "hello, " + name;
}
}
服务注册
public class ServiceStarter {
public static void main(String[] args) {
RpcApplication.init();
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
// 注册服务
String serviceName = HelloService.class.getName();
LocalRegistry.register(serviceName, HelloServiceImpl.class);
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceHost(rpcConfig.getServerHost());
serviceMetaInfo.setServicePort(rpcConfig.getServerPort());
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(rpcConfig.getVersion());
try {
registry.register(serviceMetaInfo);
} catch (Exception e) {
throw new RuntimeException(e);
}
// 启动Http服务
HttpServer httpServer = new VertxHttpServer();
httpServer.doStart(rpcConfig.getServerPort());
}
}
客户端查看,注册成功,类似文件夹结构。
6)服务调用者远程调用
依旧创建module,引入starlink-rpc-core
及服务提供者example-consumer依赖
先启动服务提供者,再远程调用。
public class TestService {
public static void main(String[] args) {
// 创建代理对象
HelloService helloService = ServiceProxyFactory.getProxy(HelloService.class);
System.out.println(helloService.hello("Jack"));
}
}
远程调用成功。
核心:
动态代理【ServiceProxy】 + 反射调用【HttpServerHandler】
基本可用,后续优化。