引言
OPC UA(开放平台通信统一架构)是一种跨平台的、开放的数据交换标准,广泛用于工业自动化领域。在工业4.0的大背景下,OPC UA服务器在网络中的部署日益增多,如何快速有效地发现这些服务器成为了一个实际需求。本文将介绍如何使用Python编写一个简单的OPC UA服务器扫描工具。
准备工作
在开始编写代码之前,需要确保您的Python环境中安装了opcua
库。可以通过以下命令进行安装:
pip install opcua
代码实现
我们的脚本将从扫描本地网络中的一系列IP地址开始,尝试连接到默认的OPC UA端口(4840),并记录下能够成功连接的服务器地址和端口。
导入库
首先,我们需要导入必要的库。
import sys
import time
from opcua import Client
import threading
import logging
日志设置
为了记录错误信息,我们设置了日志记录功能,将错误信息保存到文件中。
logger = logging.getLogger()
fh = logging.FileHandler("error.log", encoding="utf-8", mode="a")
formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.setLevel(logging.ERROR)
进度条函数
为了提供友好的用户界面,我们实现了一个打印进度条的函数。
def print_progress_bar(completed, total, length=50):
progress = int(length * completed / total)
bar = '扫描进度 [' + '=' * progress + '-' * (length - progress) + ']'
percent = round(100.0 * completed / total, 1)
sys.stdout.write(f'\r{bar} {percent}%')
sys.stdout.flush()
扫描服务器线程函数
扫描服务器的操作将在线程中执行,这样可以同时扫描多个地址,提高效率。
def scan_server(ip_address, default_port, servers, i, length):
try:
print_progress_bar(i, length)
client = Client(f"opc.tcp://{ip_address}:{default_port}")
client.connect()
# 如果成功连接,则将服务器地址和端口添加到列表中
servers.append((ip_address, default_port))
client.disconnect()
except Exception as e:
# 如果连接失败,则忽略该地址
logging.error("%s 异常信息: %s", str(ip_address), str(e))
主扫描函数
主函数scan_opc_servers
负责创建和管理工作线程,并收集扫描结果。
def scan_opc_servers():
# 扫描本地网络上的所有 OPC UA 服务器
servers = []
# 默认的 OPC UA 服务器端口为 4840
default_port = 4840
# 遍历本地网络上的可能的 IP 地址范围
ip_addresses = [f"192.168.1.{i}" for i in range(1, 256)] + [f"192.168.168.{i}" for i in range(1, 256)] # 您可能需要更改此处的 IP 地址范围
# 创建一个线程池
thread_pool = []
max_threads = 75
for i, ip_address in enumerate(ip_addresses):
time.sleep(0.05)
# 如果当前线程数量达到最大值,则等待一个线程完成
if len(thread_pool) >= max_threads:
thread_pool[0].join()
thread_pool.pop(0)
# 创建一个新的线程来扫描服务器
thread = threading.Thread(target=scan_server, args=(ip_address, default_port, servers, i, len(ip_addresses)),)
thread.start()
thread_pool.append(thread)
# 等待所有线程完成
for thread in thread_pool:
thread.join()
return servers
主程序入口
最后,我们定义了程序的入口点,调用扫描函数,并打印出找到的服务器列表。
if __name__ == "__main__":
opc_servers = scan_opc_servers()
print_progress_bar(1, 1)
print("\nFound OPC UA Servers:")
for server in opc_servers:
print(f"Address: {server[0]}, Port: {server[1]}")
结果
日志:
总结
通过以上代码,我们实现了一个简单的OPC UA服务器扫描工具。它可以在本地网络中快速扫描并发现OPC UA服务器,帮助工程师和开发人员更高效地进行网络管理和系统维护。
完整代码:
import sys
import time
from opcua import Client
import threading
import logging
logger = logging.getLogger()
fh = logging.FileHandler("error.log", encoding="utf-8", mode="a")
formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.setLevel(logging.ERROR)
def print_progress_bar(completed, total, length=50):
progress = int(length * completed / total)
bar = '扫描进度 [' + '=' * progress + '-' * (length - progress) + ']'
percent = round(100.0 * completed / total, 1)
sys.stdout.write(f'\r{bar} {percent}%')
sys.stdout.flush()
# 定义一个线程工作函数
def scan_server(ip_address, default_port, servers, i, length):
try:
print_progress_bar(i, length)
client = Client(f"opc.tcp://{ip_address}:{default_port}")
client.connect()
# 如果成功连接,则将服务器地址和端口添加到列表中
servers.append((ip_address, default_port))
client.disconnect()
except Exception as e:
# 如果连接失败,则忽略该地址
logging.error("%s 异常信息: %s", str(ip_address), str(e))
def scan_opc_servers():
# 扫描本地网络上的所有 OPC UA 服务器
servers = []
# 默认的 OPC UA 服务器端口为 4840
default_port = 4840
# 遍历本地网络上的可能的 IP 地址范围
ip_addresses = [f"192.168.1.{i}" for i in range(1, 256)] + [f"192.168.168.{i}" for i in range(1, 256)] # 您可能需要更改此处的 IP 地址范围
# 创建一个线程池
thread_pool = []
max_threads = 75
for i, ip_address in enumerate(ip_addresses):
time.sleep(0.05)
# 如果当前线程数量达到最大值,则等待一个线程完成
if len(thread_pool) >= max_threads:
thread_pool[0].join()
thread_pool.pop(0)
# 创建一个新的线程来扫描服务器
thread = threading.Thread(target=scan_server, args=(ip_address, default_port, servers, i, len(ip_addresses)),)
thread.start()
thread_pool.append(thread)
# 等待所有线程完成
for thread in thread_pool:
thread.join()
return servers
if __name__ == "__main__":
opc_servers = scan_opc_servers()
print_progress_bar(1, 1)
print("\nFound OPC UA Servers:")
for server in opc_servers:
print(f"Address: {server[0]}, Port: {server[1]}")