VMiddleware/utils/ConnectPLC.py

278 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import traceback
import re
from pylogix import PLC
class PLC_Tool:
def __init__(self, plcIp: str, plcSlot: int = 0, timeOut: float = 5):
self.plcIp = plcIp
self.plcSlot = plcSlot
self.timeOut = timeOut
self.comm = None
self.comm = PLC(self.plcIp, self.plcSlot, self.timeOut)
def __del__(self):
self.comm.Close()
def __isIP(self, ip: str):
"""
判断是否是IP地址
:param ip:
:return: bool
"""
pattern = r'^((?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]\d|\d)\.){3}(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]\d|\d)$'
if re.match(pattern, ip):
return True
else:
return False
def connect(self):
self.comm = PLC(self.plcIp, self.plcSlot, self.timeOut)
def close(self):
self.comm.Close()
def batch_write_tag(self, tags: list):
"""
Write tags to PLC . 批量写入标签
:param tags: [("AI.speed", 10.32),("AI.start", 1)]
:return: [int:(0:失败1:成功, 2:成功部分), str:异常信息, list: 修改的标签tuple]
"""
connect_count: int = 0
try:
if self.plcIp is None:
return 0, "请输入PLC的IP地址", None
if not self.__isIP(self.plcIp):
return 0, "PLC的IP地址格式异常", None
if tags is None:
return 0, "参数tags未写入需要修改的标签点位", None
if not isinstance(tags, list):
return 0, "参数tags应该是一个[{key:value},{key2:value2}]的格式", None
if len(tags) == 0:
return 0, "请输入标签tags:list", None
for tag in tags:
if not isinstance(tag, tuple):
return 0, "输入的参数格式不对,请参照:[(\"AI.speed\", 10.32),(\"AI.start\", 1)]", None
print("需要写入的数据:", tags)
rets = self.comm.Write(tags)
hasNone = False
results = []
for ret in rets:
results.append(
{"tagName": ret.TagName,
"value": ret.Value,
"status": ret.Status
})
if ret.Status != "Success":
hasNone = True
if hasNone:
return 2, "存在写入失败的标签", results
else:
return 1, "", results
except IOError as io_error:
if connect_count > 0:
print(traceback.format_exc())
return 0, str(io_error), None
connect_count += 1
self.connect()
self.batch_write_tag(tags)
except Exception as e:
print(traceback.format_exc())
return 0, str(e), None
def write_tag(self, tag: str, value: [str, int, float]):
"""
Write tags to PLC . 批量写入标签
:param tag: [("AI.speed", 10.32),("AI.start", 1)]
:param value: type is [str|int|float]
:return: [bool, str:异常信息, list: 修改的标签tuple]
"""
connect_count: int = 0
try:
if self.plcIp is None:
return False, "请输入PLC的IP地址"
if not self.__isIP(self.plcIp):
return False, "PLC的IP地址格式异常"
if len(tag) == 0:
return False, "请输入标签tag:str"
# if len(value) == 0:
# return False, "请输入标签'%s'的新值value:str" % tag
print("需要写入的数据 %s:%s" % (tag, str(value)))
ret = self.comm.Write(tag, value)
if ret.Status != "Success":
return False, "存在写入失败的标签"
else:
return True, ""
except IOError as io_error:
if connect_count > 0:
print(traceback.format_exc())
return False, str(io_error)
connect_count += 1
self.connect()
self.write_tag(tag, value)
except Exception as e:
print(traceback.format_exc())
return False, str(e)
def write_array(self, tag: str, array: list):
"""
写入数组
:param tag: 标签的名字例如AI.speed[12]
:param array:具体数值
:return: [bool, str:异常信息]
"""
connect_count: int = 0
try:
if self.plcIp is None:
return False, "请输入PLC的IP地址"
if not self.__isIP(self.plcIp):
return False, "PLC的IP地址格式异常"
if len(tag) == 0:
return False, "请输入标签tag:str"
if len(array) == 0:
return False, "请输入要写入的参数值"
ret = self.comm.Write(tag, array)
# results = [{"tagName": ret.TagName,
# "value": ret.Value,
# "status": ret.Status
# }]
if ret.Status != "Success":
return False, "数组信息写入失败"
return True, ""
except IOError as io_error:
if connect_count > 0:
print(traceback.format_exc())
return False, str(io_error)
connect_count += 1
self.connect()
self.write_array(tag, array)
except Exception as e:
print(traceback.format_exc())
return False, str(e)
def get_tags(self, tags: [str, list]):
"""
获取标签或者标签列表
:param tags: 目标标签列表 [str|list]
:return: [bool, str, list]
"""
results = []
connect_count: int = 0
try:
if self.plcIp is None:
return False, "请输入PLC的IP地址", None
if not self.__isIP(self.plcIp):
return False, "PLC的IP地址格式异常", None
if len(tags) == 0:
return False, "请输入标签tags:list", None
rets = self.comm.Read(tags)
for ret in rets:
results.append(
{"tagName": ret.TagName,
"value": ret.Value,
"status": ret.Status
})
return True, "", results
except IOError as io_error:
if connect_count > 0:
print(traceback.format_exc())
return False, str(io_error), None
connect_count += 1
self.connect()
self.get_tags(tags)
print(traceback.format_exc())
except Exception as e:
print(traceback.format_exc())
return False, str(e), None
def get_array(self, tag: str, count: int = 1):
"""
读取数组
:param tag: 目标标签
:param count: 读取个数
:return: [bool, str, (dict, None)]
"""
connect_count: int = 0
try:
if self.plcIp is None:
return False, "请输入PLC的IP地址", None
if not self.__isIP(self.plcIp):
return False, "PLC的IP地址格式异常", None
if len(tag) == 0:
return False, "请输入标签tags:list", None
if count <= 0:
return False, "取数个数应大于0", None
ret = self.comm.Read(tag, count)
results = {"tagName": ret.TagName,
"value": ret.Value,
"status": ret.Status
}
return True, "", results
except IOError as io_error:
if connect_count > 0:
print(traceback.format_exc())
return False, str(io_error), None
connect_count += 1
self.connect()
self.get_array(tag, count)
except Exception as e:
print(traceback.format_exc())
return False, str(e), None
def get_tag_list(self):
"""
获取标签列表
:return:
"""
connect_count: int = 0
results = []
try:
tags = self.comm.GetTagList()
for t in tags.Value:
results.append({t.TagName: t.DataType})
return results
except IOError as io_error:
if connect_count > 0:
print(traceback.format_exc())
return None
connect_count += 1
self.connect()
self.get_tag_list()
except Exception as e:
print(traceback.format_exc())
return None
def get_plc_time(self, raw=False):
"""
获取PLC时间
:param raw: 如果设置 raw=True则将返回原始微秒否则返回 [datetime] 2021-04-20 15:41:22.964380
:return:
"""
connect_count: int = 0
try:
time = self.comm.GetPLCTime(raw)
return time
except IOError as io_error:
if connect_count > 0:
print(traceback.format_exc())
return None
connect_count += 1
self.connect()
self.get_plc_time()
except Exception as e:
print(traceback.format_exc())
return None
# if __name__ == "__main__":
# plc = PLC_Tool("", 0)
#
# success, msg = plc.write_tag("PMCD", 0)