【Python】FANUC机器人OPC UA通信并记录数据

目录

    • 引言
    • 机器人仿真
    • 环境准备
    • 代码实现
      • 1. 导入库
      • 2. 设置参数
      • 3. 日志配置
      • 4. OPC UA通信
      • 5. 备份旧CSV文件
      • 6. 主函数
    • 总结

引言

 OPC UA(Open Platform Communications Unified Architecture)是一种跨平台的、开放的数据交换标准,常用于工业自动化领域。Python因其易用性和丰富的库支持,成为实现OPC UA通信的不错选择。本文将介绍如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。

机器人仿真

 FANUC机器人可以使用官方软件RoboGuide进行机器人仿真,启动后默认OPC UA地址为127.0.0.1:4880/FANUC/NanoUaServer
在这里插入图片描述

环境准备

  • Python 3.5+
  • opcua库:用于实现OPC UA通信
  • logging库:用于记录日志

安装opcua库:

pip install opcua

代码实现

1. 导入库

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua

2. 设置参数

SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
CSV_FILENAME = 'fanuc_opcua_data.csv'
FAUNC_LOG = 'fanuc.log'
LOG_DIR = 'log'
BACKUP_DIR = 'backup'

3. 日志配置

def getLogger(filename: str):
    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)
    logger = logging.Logger(filename[:-4].upper(), logging.INFO)
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")
    fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")
    fh.setFormatter(formatter)
    ch = logging.StreamHandler()
    ch.setFormatter(formatter)
    logger.addHandler(fh)
    logger.addHandler(ch)
    return logger
    
LOGGER = getLogger(FAUNC_LOG)

4. OPC UA通信

  • 连接到服务器
def connect_to_server(url):
    client = Client(url)
    client.connect()
    return client
  • 获取根节点和对象节点
def get_root_node(client: Client):
    return client.get_root_node()
def get_objects_node(client: Client):
    return client.get_objects_node()
  • 遍历所有子节点并返回变量节点的路径和数值
def get_variables(node: Node, path=""):
    variables = {}
    children: List[Node] = node.get_children()
    for child in children:
        try:
            name: ua.QualifiedName = child.get_browse_name()
            new_path = f"{path}/{name.Name}"
            if child.get_node_class() == ua.NodeClass.Variable:
                value = child.get_value()
                if isinstance(value, list):
                    value = ','.join(str(x) for x in value)
                if isinstance(value, str):
                    value = value.replace('\n', '\\n').replace(',', ' ')
                variables[new_path] = value
            else:
                variables.update(get_variables(child, new_path))
        except Exception as e:
            LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")
    return variables

5. 备份旧CSV文件

def backup_csv_file(filename):
    if not os.path.exists(BACKUP_DIR):
        os.makedirs(BACKUP_DIR)
    if os.path.exists(filename):
        modification_time = os.path.getmtime(filename)
        modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')
        new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"
        try:
            shutil.move(filename, new_filename)
            LOGGER.info(f"文件已移动到 {new_filename}")
        except Exception as e:
            LOGGER.error(f"移动文件出错: {new_filename}, Error: {e})

6. 主函数

if __name__ == "__main__":
    try:
        client = connect_to_server(SERVER_URL)
        root_node = get_root_node(client)
        objects_node = get_objects_node(client)
        backup_csv_file(CSV_FILENAME)
        with open(CSV_FILENAME, mode='w', newline='') as csvfile:
            num = 0
            while True:
                variables = get_variables(objects_node)
                if num == 1:
                    writer = csv.DictWriter(csvfile, fieldnames=variables.keys())
                    writer.writeheader()
                writer.writerow(variables)
                csvfile.flush()
                num += 1
                LOGGER.info("数据记录:" + str(num))
                time.sleep(1)
    except KeyboardInterrupt:
        print("程序被用户中断")
    finally:
        client.disconnect()

记录数据预览:
在这里插入图片描述

总结

 本文介绍了如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。通过使用opcua库,我们可以轻松地连接到OPC UA


完整代码:

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua

# OPC UA服务器的URL
SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
# CSV文件名
CSV_FILENAME = 'fanuc_opcua_data.csv'
# 日志文件
FAUNC_LOG = 'fanuc.log'
# 文件夹 
LOG_DIR = 'log'
BACKUP_DIR = 'backup'

def getLogger(filename: str):
    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)
    logger = logging.Logger(filename[:-4].upper(), logging.INFO)
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")
    fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")
    fh.setFormatter(formatter)
    ch = logging.StreamHandler()
    ch.setFormatter(formatter)
    logger.addHandler(fh)
    logger.addHandler(ch)
    return logger
    
LOGGER = getLogger(FAUNC_LOG)
def connect_to_server(url):
    """创建客户端实例并连接到服务端"""
    client = Client(url)
    client.connect()
    return client

def get_root_node(client: Client):
    """获取服务器命名空间中的根节点"""
    return client.get_root_node()

def get_objects_node(client: Client):
    """获取服务器的对象节点"""
    return client.get_objects_node()

def get_variables(node: Node, path=""):
    """遍历所有子节点并返回变量节点的路径和数值"""
    variables = {}
    children: List[Node] = node.get_children()
    for child in children:
        try:
            name: ua.QualifiedName = child.get_browse_name()
            new_path = f"{path}/{name.Name}"
            if child.get_node_class() == ua.NodeClass.Variable:
                value = child.get_value()
                if isinstance(value, list):
                    value = ','.join(str(x) for x in value)
                if isinstance(value, str):
                    value = value.replace('\n', '\\n').replace(',', ' ')
                variables[new_path] = value
            else:
                variables.update(get_variables(child, new_path))
        except Exception as e:
            LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")
    return variables

def backup_csv_file(filename):
    """如果CSV文件已存在则备份"""
    if not os.path.exists(BACKUP_DIR):
        os.makedirs(BACKUP_DIR)
    if os.path.exists(filename):
        modification_time = os.path.getmtime(filename)
        modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')
        new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"
        try:
            shutil.move(filename, new_filename)
            LOGGER.info(f"文件已移动到 {new_filename}")
        except Exception as e:
            LOGGER.error(f"移动文件出错: {new_filename}, Error: {e}")
        

if __name__ == "__main__":
    try:
        client = connect_to_server(SERVER_URL)
        root_node = get_root_node(client)
        objects_node = get_objects_node(client)

        backup_csv_file(CSV_FILENAME)

        with open(CSV_FILENAME, mode='w', newline='') as csvfile:
            num = 0

            while True:
                variables = get_variables(objects_node)

                if num == 1:
                    writer = csv.DictWriter(csvfile, fieldnames=variables.keys())
                    writer.writeheader()

                writer.writerow(variables)
                csvfile.flush()
                num += 1
                LOGGER.info("数据记录:" + str(num))
                time.sleep(1)

    except KeyboardInterrupt:
        print("程序被用户中断")
    finally:
        client.disconnect()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/532139.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Vue - 2( 10000 字 Vue 入门级教程)

一&#xff1a;初识 Vue 1.1 绑定样式 1.1.1 绑定 class 样式 <!DOCTYPE html> <html><head><meta charset"UTF-8" /><title>绑定样式</title><style>......</style><script type"text/javascript"…

AOF文件重写

1.2.3.AOF文件重写 因为是记录命令&#xff0c;AOF文件会比RDB文件大的多。而且AOF会记录对同一个key的多次写操作&#xff0c;但只有最后一次写操作才有意义。通过执行bgrewriteaof命令&#xff0c;可以让AOF文件执行重写功能&#xff0c;用最少的命令达到相同效果。 如图&am…

互联网产品经理必备知识详解

1. 前言 本文档全面探讨了产品经理在产品管理过程中的关键环节,包括市场调研、产品定义及设计、项目管理、产品宣介、产品市场以及产品生命周期。通过深入剖析这些方面,本文旨在帮助产品经理系统地理解和掌握产品管理的核心要素,从而提升产品开发的效率和成功率。在市场调研…

分布式锁-redission

5、分布式锁-redission 5.1 分布式锁-redission功能介绍 基于setnx实现的分布式锁存在下面的问题&#xff1a; 重入问题&#xff1a;重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中&#xff0c;可重入锁的意义在于防止死锁&#xff0c;比如HashTable这样的代码…

多维 HighCharts

1&#xff1a;showHighChart.html <!DOCTYPE html> <html lang"zh-CN"><head><meta charset"UTF-8"><!-- js脚本都是官方的,后两个是highchart脚本 --><script type"text/javascript" src"jquery1.7.1.mi…

Unity 九宫格

1. 把图片拖拽进资源文件夹 2.选中图片&#xff0c;然后设置图片 3.设置九宫格 4.使用图片&#xff0c;在界面上创建2个相同的Image,然后使用图片&#xff0c;修改Image Type 为Sliced

书生·浦语大模型第二期实战营第二课笔记和基础作业

来源&#xff1a; 作业要求:Homework - Demo 文档教程:轻松玩转书生浦语大模型趣味 Demo B站教程:轻松玩转书生浦语大模型趣味 Demo 1. 笔记 2.基础作业 2.1 作业要求 2.2 算力平台 2.3 新建demo目录&#xff0c;以及新建目录下的文件&#xff0c;下载模型参数 2.4 Intern…

怎样系统地学习自动化测试?

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 关注公众号&#xff1a;互联网杂货铺&#xff0c;回复1 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 平时的测试工作其实细分一下&#xff0c;大概有三个领域…

【面试题】redis在工作中的使用场景有哪些?

前言&#xff1a;在实际工作中&#xff0c;Redis作为一种高性能的内存数据库和缓存系统&#xff0c;可以应用于多种场景&#xff0c;同时在面试过程中也经常被问到类似的问题&#xff0c;我们经常会被问的一脸懵逼&#xff0c;那今天我们就来总结一下redis的一些使用场景。 数据…

Linux--进程间的通信-匿名管道

进程间的通信 进程间通信&#xff08;IPC&#xff0c;Interprocess Communication&#xff09;是指在不同进程之间传输数据和交换信息的一种机制。它允许多个进程在同一操作系统中同时运行&#xff0c;并实现彼此之间的协作。 进程间通信方式&#xff1a; 管道&#xff08;Pi…

应该如何进行POC测试?—【DBA从入门到实践】第三期

在数据库选型过程中&#xff0c;为确保能够灵活应对数据规模的不断扩大和处理需求的日益复杂化&#xff0c;企业和技术人员会借助POC测试来评估不同数据库系统的性能。在测试过程中&#xff0c;性能、并发处理能力、存储成本以及高可用性等核心要素通常会成为大家关注的焦点&am…

20240326-1-KNN面试题

KNN面试题 1.简述一下KNN算法的原理 KNN算法利用训练数据集对特征向量空间进行划分。KNN算法的核心思想是在一个含未知样本的空间&#xff0c;可以根据样本最近的k个样本的数据类型来确定未知样本的数据类型。 该算法涉及的3个主要因素是&#xff1a;k值选择&#xff0c;距离度…

dwd_traffic_page_view_inc装载数据很慢

dwd_traffic_page_view_inc dwd_traffic_page_view_inc 装载数据很慢 可能原因 1.加载数据的小文件太多&#xff0c;saprk 加载这些小文件时&#xff0c;需要消耗资源。

ITK 重采样 resample

Itk 重新采样有二多种情况&#xff0c;这里说二种情况 1. 输入参数 &#xff0c;和输出相关数据&#xff0c;输出范围&#xff0c;spacing &#xff1b; typedef itk::Image< float, 3 > itkFloatImageType;typedef itk::ResampleImageFilter < itkFloatImageType, i…

【Gem5】获取构建教程

gem5-tutorial-hpca-2023 1 介绍 1.1 Gem5是什么1.2 Gem5可以用来做什么1.3 获取并构建gem5 gem5-tutorial-hpca-2023 打开网址&#xff1a; github 创建教程代码空空间 “Code” -> “Codespaces” -> “Create Codespace on master” GitHub Codespaces 是一个由…

网络安全加密算法---对称加密

三位同学一组完成数据的对称加密传输。 三位同学分别扮演图中 A、B 和 KDC 三个角色&#xff0c;说明 KA、KB&#xff0c;KAB 和发送的数据Data 的内容。 给出图中 2 和 3 中的数据&#xff0c;以及 Data 加密后的密文。可以完成多轮角色互换的通信 过程。其中一轮过程要求 K…

jni 开发 调用dll 函数的流程

jni 调用dll方法以及dll内调用java方法的流程 编写java类 public class abc{static{System.loadLibrary("abc.dll");}public String getResponse(String ReqStr) {return "ok";}public native void InitDiagObj();public native void CarryabcEntry(Stri…

Vulnhub:DEVCONTAINER: 1

目录 信息收集 arp nmap nikto whatweb WEB 信息收集 dirmap 文件上传 提权 系统信息收集 横向提权 信息泄露 get root 信息收集 arp ┌──(root㉿ru)-[~/kali/vulnhub] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:50:56:2f:dd…

一例简单的文件夹病毒的分析

概述 这是一个典型的文件夹病毒&#xff0c;使用xp时代的文件夹图标&#xff0c;通过可移动存储介质传播&#xff0c;会向http://fionades.com/ABIUS/setup.exe下载恶意载荷执行。 其病毒母体只是一个加载器&#xff0c;会在内存是解密加载一个反射型的dll&#xff0c;主要的…

python使用uiautomator2操作雷电模拟器9找图

接上篇文章python使用uiautomator2操作雷电模拟器9并遇到解决adb 连接emulator-5554 unauthorized问题-CSDN博客 搭建好uiautomator2后&#xff0c;主要就是使用了。 本文就利用uiautomator2的截屏、模拟点击和aircv的找图功能&#xff0c;实现对指定寻找的图片的位置的点击。…