python基于ModBusTCP服务端的业务实现特定的client

python实现ModBusTCP协议的client是一件简单的事情,只要通过pymodbus、pyModbusTCP等模块都可以实现,本文采用pymodbus。但要基于ModBusTCP服务端的业务实现特定的client,那得看看服务端是否复杂。前面系列文章,我们学习了对服务端的简单交互,便是得力于服务端简单的业务流程,本文将实现有点复杂的业务流程。

一、业务描述

我们将使用python脚本来实现一个ModBusTCP client,他能够触发设备进行拍照,然后读取拍照情况。

1、ModBusTCP服务端交互流程

大概类似如下交互时序,其中PLC将用我们的脚本代替。

2、控制

根据上述业务,服务器需要有寄存器存储客户端写入的控制位,存储顺序如下。

3、状态

根据上述业务,服务器需要有寄存器存储状态位,让客户端来读取,存储顺序如下。

4、结果

根据上述业务,服务器需要有寄存器存储结果,让客户端来读取,存储顺序如下。

5、 地址空间

(1)控制

类型:HoldingRegisters、Coils

起始地址与寄存器数量:自定义

(2)状态

类型:HoldingRegisters、DiscreteInputs、InputRegisters

起始地址与寄存器数量:自定义

(3)结果

类型:HoldingRegisters、InputRegisters

起始地址与寄存器数量:自定义

二、程序整体设计

class myModBusTCPclient(object):

    def __init__(self, obj):
        self.obj = obj

    # 状态读取后转成二进制,分别对应TriggerReady等状态
    def custom_binary(self, num, length=16, ByteSwap=0):
        # 将整数转换为二进制字符串
        binary_str = bin(num)[2:]
        # 计算需要补充的零的个数
        zeros_to_add = length - len(binary_str)
        # 构造符合规则的二进制字符串
        result_str = '0' * zeros_to_add + binary_str
        # 翻转二进制,如01转为10,方便后续取值
        result_str = result_str[::-1]
        if ByteSwap==0:
            return result_str
        elif ByteSwap==1:  # 需要字节交换时
            return result_str[8:] + result_str[:8]
        else:
            raise ValueError("ByteSwap 的值错误!")

    # 控制写之前先将TriggerEnable等二进制控制位转成数值
    def custom_num(self, binary, length=16, ByteSwap=0):
        assert len(binary) == length, "输入的二进制长度不正确!"
        binary = binary[::-1]  # 翻转二进制,如01转为10,方便后续取值
        if ByteSwap==0:
            return int(binary, 2)
        elif ByteSwap==1:  # 需要字节交换时
            return int(binary[8:] + binary[:8], 2)
        else:
            raise ValueError("ByteSwap 的值错误!")

    def ctrl(self, addrtype="HoldingRegisters", ByteSwap=0, binary="0000000000000000", address=0, slave=1):
        if addrtype=="HoldingRegisters":
            value = self.custom_num(binary[0:16], ByteSwap=ByteSwap)
            self.obj.write_registers(address, value, slave=slave)
        elif addrtype=="Coils":
            ...
        else:
            raise ValueError("ctrl_addrtype的值错误!")

    def status(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=2, reg_nb=2, slave=1):
        if addrtype=="HoldingRegisters":
            value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)
            value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表
            print("status HoldingRegisters:", value_list)
            print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])
            return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]
        elif addrtype=="InputRegisters":
            ...
        elif addrtype=="DiscreteInputs":
            ...
        else:
            raise ValueError("status_addrtype的值错误!")

    def plc_out(self, addrtype="HoldingRegisters", ByteSwap=0):
        if addrtype=="HoldingRegisters":
            ...
        elif addrtype=="InputRegisters":
            ...
        else:
            raise ValueError("plc_out_addrtype的值错误!")

if __name__ == "__main__":
    # Modbus TCP服务器的IP地址和端口号
    server_ip = "192.168.1.196"
    port = 502
    station = 1

    # 创建Modbus TCP客户端
    MDclient = ModbusTcpClient(server_ip, port)
    if MDclient.connect():
        myclient = myModBusTCPclient(MDclient)
        myclient.ctrl(ByteSwap=1, binary="1100000000000000")
        time.sleep(1)
        myclient.status(ByteSwap=1)

1、程序结构

上述代码定义了一个名为 myModBusTCPclient 的类,用于与 Modbus TCP 服务器进行通信。下面是对程序结构的分析:

构造函数 __init__

接收一个参数 obj,表示 Modbus TCP 客户端对象。将这个对象存储在实例变量 self.obj 中。

custom_binary 方法:

将给定的整数 num 转换为指定长度 length 的二进制字符串。可选参数 ByteSwap 用于指定是否进行字节交换。如果 ByteSwap 为 1,则进行字节交换,否则不进行。返回构造好的二进制字符串。

custom_num 方法:

接收一个二进制字符串 binary,根据给定的长度 length 和是否进行字节交换 ByteSwap 将其转换为整数。返回转换得到的整数。

ctrl 方法:

根据给定的地址类型 addrtype(默认是 "HoldingRegisters")、是否进行字节交换 ByteSwap、二进制字符串 binary、Modbus 地址 address 和从站号 slave,向 Modbus 服务器写入数据。如果地址类型是 "HoldingRegisters",则使用 write_registers 方法写入寄存器。

status 方法:

根据给定的地址类型 addrtype、是否进行字节交换 ByteSwap、寄存器地址 reg_addr、寄存器数量 reg_nb 和从站号 slave,从 Modbus 服务器读取数据。如果地址类型是 "HoldingRegisters",则使用 read_holding_registers 方法读取寄存器。

plc_out 方法:

根据给定的地址类型 addrtype 和是否进行字节交换 ByteSwap,执行一些 Modbus 操作。具体操作需要根据地址类型的不同进行扩展。

if __name__ == "__main__": 部分:

在脚本独立运行时进行的操作。创建了一个 Modbus TCP 客户端对象 MDclient。通过 myModBusTCPclient 类创建了一个自定义的客户端对象 myclient。调用了 ctrl 方法,向 Modbus 服务器写入数据。等待了一秒钟。调用了 status 方法,从 Modbus 服务器读取数据。

2、不同地址空间的请求

在ModbusTCP中,对于不同的寄存器,请求方式是不一样的,因此需要根据服务端的设置相应更改。

为了满足业务,需要对原有的pymodbus进行封装改造。

为了满足大端序和小端序,需要加入字节交换的操作。

三、程序实现

import json
import time
import socket
from pymodbus.client import ModbusTcpClient

class myModBusTCPclient(object):

    def __init__(self, obj):
        self.obj = obj

    # 状态读取后转成二进制,分别对应TriggerReady等状态
    def custom_binary(self, num, length=16, ByteSwap=0):
        # 将整数转换为二进制字符串
        binary_str = bin(num)[2:]
        # 计算需要补充的零的个数
        zeros_to_add = length - len(binary_str)
        # 构造符合规则的二进制字符串
        result_str = '0' * zeros_to_add + binary_str
        # 根据业务逻辑需要,翻转二进制,如01转为10,方便后续取值
        result_str = result_str[::-1]
        if ByteSwap==0:
            return result_str
        elif ByteSwap==1:  # 需要字节交换时
            return result_str[8:] + result_str[:8]
        else:
            raise ValueError("ByteSwap 的值错误!")

    # 控制写之前先将TriggerEnable等二进制控制位转成数值
    def custom_num(self, binary, length=16, ByteSwap=0):
        assert len(binary) == length, "输入的二进制长度不正确!"
        binary = binary[::-1]  # 根据业务逻辑需要,翻转二进制,如01转为10,方便后续取值
        if ByteSwap==0:
            return int(binary, 2)
        elif ByteSwap==1:  # 需要字节交换时
            return int(binary[8:] + binary[:8], 2)
        else:
            raise ValueError("ByteSwap 的值错误!")

    def result_ByteSwap(self, num, length=16, ByteSwap=0):
        # 将整数转换为二进制字符串
        binary_str = bin(num)[2:]
        # 计算需要补充的零的个数
        zeros_to_add = length - len(binary_str)
        # 构造符合规则的二进制字符串
        result_str = '0' * zeros_to_add + binary_str
        if ByteSwap == 1:
            return result_str
        elif ByteSwap == 0:  # 需要字节交换时
            return result_str[8:] + result_str[:8]
        else:
            raise ValueError("ByteSwap 的值错误!")

    def ctrl(self, addrtype="HoldingRegisters", ByteSwap=0, binary="0000000000000000", address=0, slave=1):
        if addrtype=="HoldingRegisters":
            value = self.custom_num(binary[0:16], ByteSwap=ByteSwap)
            self.obj.write_registers(address, value, slave=slave)
        elif addrtype=="Coils":
            values = []
            for i in binary:
                if i == "0":
                    values.append(False)
                elif i == "1":
                    values.append(True)
            print(values)
            # 线圈不存在字节交换
            # value = self.custom_num(binary[0:16], ByteSwap=0)
            # print("------------------------------values :", [i for i in bin(value)[2:]])
            # self.obj.write_coils(address=address, values=[i for i in bin(value)[2:]], slave=slave)
            self.obj.write_coils(address=address, values=values, slave=slave)
        else:
            raise ValueError("ctrl_addrtype的值错误!")

    def status(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=2, reg_nb=2, slave=1):
        if addrtype=="HoldingRegisters":
            value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)
            value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表
            print("status HoldingRegisters:", value_list)
            print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])
            return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]
        elif addrtype=="InputRegisters":
            value = self.obj.read_input_registers(address=reg_addr, count=reg_nb, slave=slave)
            value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表
            print("result InputRegisters:", value_list)
            print([self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list])
            return [self.custom_binary(i, ByteSwap=ByteSwap) for i in value_list]
        elif addrtype=="DiscreteInputs":
            # 离散寄存器不存在字节交换
            value = self.obj.read_discrete_inputs(address=reg_addr, count=reg_nb, slave=slave)
            print([value.bits[i] for i in range(reg_nb)])
            result = ""
            for i in [value.bits[i] for i in range(reg_nb)]:
                if i == False:
                    result += "0"
                elif i == True:
                    result += "1"
            # [value.bits[i] for i in range(reg_nb)]
            return [result[i:i+16] for i in range(0, len(result), 16)]
        else:
            raise ValueError("status_addrtype的值错误!")

    def plc_out(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=100, reg_nb=100, slave=1):
        if addrtype=="HoldingRegisters":
            value = self.obj.read_holding_registers(reg_addr, reg_nb, slave=slave)
            value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表
            print("result HoldingRegisters:", value_list)
            print([int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list])
            return [int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list]
        elif addrtype=="InputRegisters":
            value = self.obj.read_input_registers(reg_addr, reg_nb, slave=slave)
            value_list = [value.registers[i] for i in range(reg_nb)]  # 整型列表
            print("result InputRegisters:", value_list)
            print([int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list])
            return [int(self.result_ByteSwap(i, ByteSwap=ByteSwap), 2) for i in value_list]
        else:
            raise ValueError("plc_out_addrtype的值错误!")

    def plc_out_only_result(self, addrtype="HoldingRegisters", ByteSwap=0, reg_addr=100, reg_nb=100, slave=1):
        result_list = self.plc_out(addrtype=addrtype, ByteSwap=ByteSwap, reg_addr=reg_addr, reg_nb=reg_nb, slave=slave)[5:]
        result_list_have = [i for i in result_list if i != 0]
        result_binary = ""
        result = ""
        for num in result_list_have:
            # 将整数转换为二进制字符串
            binary_str = bin(num)[2:]
            # 计算需要补充的零的个数
            zeros_to_add = 16 - len(binary_str)
            # 构造符合规则的二进制字符串
            result_str = '0' * zeros_to_add + binary_str
            result_binary += result_str
        for binary in [result_binary[i:i+8] for i in range(0, len(result_binary), 8)]:  # 以长度8进行分割
            if binary != "00000000":
                result += chr(int(binary, 2))
        return result

if __name__ == "__main__":

    # Modbus TCP服务器的IP地址和端口号
    server_ip = "192.168.1.196"
    port = 502
    station = 1

    # 创建Modbus TCP客户端
    MDclient = ModbusTcpClient(server_ip, port)
    if MDclient.connect():
        ByteSwap = 0
        ctrl_addrtype = "Coils"  # HoldingRegisters、Coils
        status_addrtype = "HoldingRegisters"  # HoldingRegisters、DiscreteInputs、InputRegisters
        result_addrtype = "InputRegisters"  # HoldingRegisters、InputRegisters

        myclient = myModBusTCPclient(MDclient)
        # 初始化控制位状态
        ctrl = "0000000000000000"
        myclient.ctrl(addrtype=ctrl_addrtype, ByteSwap=ByteSwap, binary="0000000000000000")

        # 使能TriggerEnable并常置为1
        TriggerEnable = "1"
        ctrl = TriggerEnable + ctrl[1:]

        while True:
            print("ctrl: ", ctrl)
            # 写入控制
            myclient.ctrl(addrtype=ctrl_addrtype, ByteSwap=ByteSwap, binary=ctrl, address=0)
            ResultACK = "0"
            ctrl = ctrl[:3] + ResultACK + ctrl[4:]
            # 查询状态
            status = myclient.status(addrtype=status_addrtype, ByteSwap=ByteSwap, reg_addr=2, reg_nb=2)[0]
            TriggerReady = status[0]
            print("------TriggerReady", TriggerReady)
            TriggerACK = status[1]
            ResultOKorNG = status[11]
            # print("---------------------------------ResultOKorNG:", ResultOKorNG)
            if TriggerReady == "1":
                Trigger = "1"
                ctrl = ctrl[:1] + TriggerEnable + ctrl[2:]
                print("-----------------ctrl")
            if TriggerACK == "1":
                Trigger = "0"
                ctrl = ctrl[:1] + Trigger + ctrl[2:]
            if ResultOKorNG == "1":
                # print("---------------------------------------")
                ResultACK = "1"
                ctrl = ctrl[:3] + ResultACK + ctrl[4:]
            # myclient.plc_out(ByteSwap=1)
            result = myclient.plc_out_only_result(addrtype=result_addrtype, ByteSwap=ByteSwap)
            print(result)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/236765.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

无需公网IP联机Minecraft,我的世界服务器本地搭建教程

目录 前言 1.Mcsmanager安装 2.创建Minecraft服务器 3.本地测试联机 4. 内网穿透 4.1 安装cpolar内网穿透 4.2 创建隧道映射内网端口 5.远程联机测试 6. 配置固定远程联机端口地址 6.1 保留一个固定TCP地址 6.2 配置固定TCP地址 7. 使用固定公网地址远程联机 8.总…

开关量防抖滤波器(梯形图和SCL源代码)

模拟量防抖超限报警功能块请查看下面文章链接: https://rxxw-control.blog.csdn.net/article/details/133969425https://rxxw-control.blog.csdn.net/article/details/133969425 1、开关量防抖滤波器 2、防抖滤波 3、梯形图代码

【conda】利用Conda创建虚拟环境,Pytorch各版本安装教程(Ubuntu)

TOC conda 系列: 1. conda指令教程 2. 利用Conda创建虚拟环境,安装Pytorch各版本教程(Ubuntu) 1. 利用Conda创建虚拟环境 nolonolo:~/sun/SplaTAM$ conda create -n splatam python3.10查看结果: (splatam) nolonolo:~/sun/SplaTAM$ cond…

手把手教你玩转ESP8266(原理+驱动)

在嵌入式开发中,无线通信的方式有很多,其中 WIFI 是绕不开的话题。说到 WIFI 通信,就不得不提 ESP8266了。 ESP8266 是一款高性能的 WIFI 串口模块,实现透明传输。只要有一定的串口知识,不需要知道 WIFI 原理就可以上…

【Qt开发流程】之UI风格、预览及QPalette使用

概述 一个优秀的应用程序不仅要有实用的功能,还要有一个漂亮美腻的外观,这样才能使应用程序更加友善、操作性良好,更加符合人体工程学。作为一个跨平台的UI开发框架,Qt提供了强大而且灵活的界面外观设计机制,能够帮助…

如何快速访问未知世界【献给我的一位尚未谋面的“故友”】

1.下载Chrome浏览器 想要访问未知世界,强烈推荐Chrome,好用无广告,点击链接即可下载: 点我访问下载页面 2.安装插件 下一步,我们需要安装一个插件,这个插件能为Chrome浏览器插上腾飞的翅膀 [doge] (可以…

FTR223限时回归?经典三花再加金翅膀,CL500特别款亮相

FTR223可以说是非常经典的一款本田小攀爬车型了,之前我还有幸玩过一段时间,最近本田在泰国车展上展出了CL500的特别版,其中FTR223纪念版的版画让人眼前一亮,经典的白、红、蓝三色搭配让人眼前一亮。 CL500这台车在国内今年刚上市&…

18.Java程序设计-基于Springboot的电影院售票系统的设计与实现

摘要 电影产业在当今社会中占据着重要地位,电影院作为观影的主要场所,其售票系统的高效性和用户体验至关重要。本文基于Spring Boot框架设计并实现了一款电影院售票系统,旨在提高售票效率、优化用户体验,并解决传统售票方式存在的…

Panalog 日志审计系统 sprog_deletevent.php SQL 注入漏洞复现

0x01 产品简介 Panalog大数据日志审计系统定位于将大数据产品应用于高校、 公安、 政企、 医疗、 金融、 能源等行业之中,针对网络流量的信息进行日志留存,可对用户上网行为进行审计,逐渐形成大数据采集、 大数据分析、 大数据整合的工作模式…

Rocket MQ 架构介绍

文章目录 为什么选择Rocket MQ基本概念优点缺点架构图编程模型发送者发送消息固定步骤消费者消费消息固定步骤 为什么选择Rocket MQ Rocket MQ是阿帕奇顶级的开源项目,由阿里开发并开源。它的研发背景是Active MQ与Kafka不能很好的解决当时的业务场景。官网上是这么…

游戏中小地图的制作__unity基础开发教程

小地图的制作 Icon标识制作制作摄像机映射创建地图UI效果“不一样的效果” 在游戏中经常可以看到地图视角的存在,那么地图视角是如何让实现的呢? 这一期教大家制作一个简易的小地图。 💖点关注,不迷路。 老样子,我们还…

c语言为什么要引入变量

大家好,今天给大家介绍c语言为什么要引入变量,文章末尾附有分享大家一个资料包,差不多150多G。里面学习内容、面经、项目都比较新也比较全!可进群免费领取。 C语言引入变量的原因主要是为了存储数据并且方便后续的操作和计算。 变…

机器学习基础介绍

百度百科: 机器学习是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。 …

BluetoothDevice 序列化问题

文章目录 前言思考分析定位 前言 在做蓝牙设备通信时,遇到一个奇葩的问题,公司另一个部门开发的蓝牙组件库,把蓝牙设备BluetoothDevice进行了序列化,在连接时候又进行反序列化。但是当我去调试我的项目时,发现发序列化…

网贷教父判无期,千家万户哭成狗

作者|翻篇 新熔财经快评: 真是太气人了 e租宝崩盘后 比它更大的雷又来了 “网贷教父”周世平 非法吸收公众存款1090亿 被判了无期 48万多人的血汗钱啊 就这样血本无归了 要知道 当年周世平做p2p 就靠着全额垫付 这颗定心丸 大量的宝妈 上…

[陇剑杯 2021]日志分析

[陇剑杯 2021]日志分析 题目做法及思路解析(个人分享) 问一:单位某应用程序被攻击,请分析日志,进行作答: 网络存在源码泄漏,源码文件名是_____________。(请提交带有文件后缀的文件名&…

日志门面slf4j和各日志框架

简介 简单日志门面(Simple Logging Facade For Java) SLF4J主要是为了给Java日志访问提供一套标准、规范的API框架, 其主要意义在于提供接口,具体的实现可以交由其他日志框架,如log4j、logback、log4j2。 对于一般的Java项目而言&#xff…

JavaEE之多线程编程:1. 基础篇

文章目录 一、关于操作系统一、认识进程 process二、认识线程三、进程和线程的区别(重点!)四、Java的线程和操作系统线程的关系五、第一个多线程编程 一、关于操作系统 【操作系统】 驱动程序: 如:我们知道JDBC的驱动程…

点评项目——分布式锁

2023.12.10 集群模式下的并发安全问题及解决 随着现在分布式系统越来越普及,一个应用往往会部署在多台机器上(多节点),通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。见下图&#xff1a…

算法Day27 身材管理(三维背包)

身材管理(三维背包) Description Input Output Sample 代码 import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner scanner new Scanner(System.in);int n scanner.nextInt(); // 输入n的值int money sca…