如果使用nacos-sdk-python(注意适用nacos版本),需要按照下面的链接修改源码的bug
https://github.com/nacos-group/nacos-sdk-python/issues/135
代码如下:
import nacos
import threading
import socket
import requests
import json
from flask import Flask
app = Flask(__name__)
class NacosReg:
def __init__(self):
self.SERVER_HOST = "127.0.0.1"
self.SERVER_PORT = 8848
self.SERVER_ADDRESSES = f"http://{self.SERVER_HOST}:{self.SERVER_PORT}"
self.SERVICE_NAME = "test_server"
self.GROUP_NAME = "DEFAULT_GROUP"
self.username = "nacos"
self.password = "nacos"
self.client = nacos.NacosClient(self.SERVER_ADDRESSES, username=self.username, password=self.password)
self.model_port = 8000
def get_host_ip(self):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((self.SERVER_HOST, self.SERVER_PORT))
ip = s.getsockname()[0]
finally:
s.close()
return ip
def service_register_and_check(self):
self.service_register_()
self.set_http_check()
def service_register(self):
self.client.add_naming_instance(self.SERVICE_NAME,
self.get_host_ip(),
self.model_port, ephemeral=False, group_name=self.GROUP_NAME)
def service_register_(self):
params = {
"ip": self.get_host_ip(),
"port": self.model_port,
"serviceName": self.SERVICE_NAME,
"weight": 1.0,
"enable": True,
"healthy": True,
"clusterName": "None",
"ephemeral": False,
"groupName": self.GROUP_NAME,
'username': self.username,
'password': self.password
}
res = requests.post(f"{self.SERVER_ADDRESSES}/nacos/v1/ns/instance", params=params)
print(res.text)
def service_unregister(self):
self.client.remove_naming_instance(self.SERVICE_NAME,
self.get_host_ip(),
self.model_port, ephemeral=False, group_name=self.GROUP_NAME)
def set_http_check(self):
data = {
"serviceName": f"{self.GROUP_NAME}@@{self.SERVICE_NAME}",
"clusterName": "None",
"checkPort": 80,
"useInstancePort4Check": True,
"healthChecker": json.dumps({"type": "HTTP", "path": "/test"}),
"namespaceId": ""
}
res = requests.put(f"{self.SERVER_ADDRESSES}/nacos/v1/ns/cluster",
params={'username': self.username, 'password': self.password}, data=data)
print(res.text)
def service_list(self):
return self.client.list_naming_instance(self.SERVICE_NAME, healthy_only=True)
@app.route("/test", methods=['GET'])
def test():
return "ok"
if __name__ == '__main__':
threading.Timer(1, NacosReg().service_register_and_check).start()
app.run(port=8000)
服务器运行情况:
nacos服务器情况:
HTTP探活配置-- 单击上图集群配置:
参考资料:
GitHub - nacos-group/nacos-sdk-python: nacos python sdk
Open API 指南 | Nacos