VMiddleware/utils/ConnectPLC.py

327 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# import logging
import traceback
import re
from pylogix import PLC
from .LogT import logging, Log
logger = Log(stdout_log_level=logging.FATAL,
fil_log_level=logging.FATAL).getLogger()
class PLC_Tool:
def __init__(self, plcIp: str, plcSlot: int = 0, timeOut: float = 1):
self.plcIp = plcIp
self.plcSlot = plcSlot
self.timeOut = timeOut
self.comm = None
logger.debug("准备连接PLCIP%s, slot: %d, timeout: %d", self.plcIp, self.plcSlot, self.timeOut)
self.comm = PLC(self.plcIp, self.plcSlot, self.timeOut)
self.comm.ConnectionSize = 504
def __del__(self):
logger.debug("准备断开PLC连接")
self.comm.Close()
logger.debug("断开PLC连接完成")
def __isIP(self, ip: str):
"""
判断是否是IP地址
:param ip:
:return: bool
"""
pattern = r'^((?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]\d|\d)\.){3}(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]\d|\d)$'
if re.match(pattern, ip):
return True
else:
return False
def connect(self):
logger.warning("准备重新连接PLC")
self.comm = PLC(self.plcIp, self.plcSlot, self.timeOut)
self.comm.ConnectionSize = 504
def close(self):
logger.debug("准备断开PLC连接")
self.comm.Close()
logger.debug("断开PLC连接完成")
def is_connected(self):
return self.comm.conn.SocketConnected
def batch_write_tag(self, tags: list):
"""
Write tags to PLC . 批量写入标签
:param tags: [("AI.speed", 10.32),("AI.start", 1)]
:return: [int:(0:失败1:成功, 2:成功部分), str:异常信息, list: 修改的标签tuple]
"""
connect_count: int = 0
try:
if self.plcIp is None:
return 0, "请输入PLC的IP地址", None
if not self.__isIP(self.plcIp):
return 0, "PLC的IP地址格式异常", None
if tags is None:
return 0, "参数tags未写入需要修改的标签点位", None
if not isinstance(tags, list):
return 0, "参数tags应该是一个[{key:value},{key2:value2}]的格式", None
if len(tags) == 0:
return 0, "请输入标签tags:list", None
for tag in tags:
if not isinstance(tag, tuple):
return 0, "输入的参数格式不对,请参照:[(\"AI.speed\", 10.32),(\"AI.start\", 1)]", None
# print("准备写入PLC的数据%s" % str(tags))
logger.debug("准备写入PLC的数据%s", str(tags))
rets = self.comm.Write(tags)
hasNone = False
results = []
for ret in rets:
results.append(
{"tagName": ret.TagName,
"value": ret.Value,
"status": ret.Status
})
if ret.Status != "Success":
hasNone = True
if hasNone:
logger.warning("写入PLC的数据中存在部分失败%s", str(results))
return 2, "存在写入失败的标签", results
else:
logger.debug("写入PLC成功,写入的内容:%s", str(results))
return 1, "", results
except IOError as io_error:
if connect_count > 0:
print(traceback.format_exc())
logger.error("写入PLC异常计划写入的内容%s 。 异常反馈:%s", str(tags), traceback.format_exc())
return 0, str(io_error), None
connect_count += 1
return self.batch_write_tag(tags)
except Exception as e:
print(traceback.format_exc())
logger.error("写入PLC异常计划写入的内容%s 。 异常反馈:%s", str(tags), traceback.format_exc())
return 0, str(e), None
def write_tag(self, tag: str, value: [str, int, float]):
"""
Write tags to PLC . 批量写入标签
:param tag: "AI.speed"...
:param value: type is [str|int|float]
:return: [bool, str:异常信息, list: 修改的标签tuple]
"""
connect_count: int = 0
try:
if self.plcIp is None:
return False, "请输入PLC的IP地址"
if not self.__isIP(self.plcIp):
return False, "PLC的IP地址格式异常"
if len(tag) == 0:
return False, "请输入标签tag:str"
# if len(value) == 0:
# return False, "请输入标签'%s'的新值value:str" % tag
# print("需要写入的数据 %s:%s" % (tag, str(value)))
logger.debug("准备写入PLC的数据%s:%s", tag, str(value))
ret = self.comm.Write(tag, value)
if ret.Status == "Success":
logger.debug("写入PLC成功写入的数据%s:%s", tag, str(value))
return True, ""
else:
logger.error("存在写入失败的标签, 请重试。计划写入的信息:%s:%s", tag, str(value))
return False, "存在写入失败的标签,请重试。"
except IOError as io_error:
if connect_count > 0:
# print(traceback.format_exc())
logger.error("写入PLC异常请重试。计划写入的内容%s:%s 。 异常反馈:%s", tag, str(value), traceback.format_exc())
return False, "写入PLC异常请重试。计划写入的内容%s:%s 。 异常反馈:%s" % (tag, str(value), traceback.format_exc())
connect_count += 1
return self.write_tag(tag, value)
except Exception as e:
# print(traceback.format_exc())
logger.error("写入PLC异常请重试。计划写入的内容%s:%s 。 异常反馈:%s", tag, str(value), traceback.format_exc())
return False, "写入PLC异常请重试。计划写入的内容%s:%s 。 异常反馈:%s" % (tag, str(value), traceback.format_exc())
def write_array(self, tag: str, array: list):
"""
写入数组
:param tag: 标签的名字例如AI.speed[12]
:param array:具体数值
:return: [bool, str:异常信息]
"""
connect_count: int = 0
try:
if self.plcIp is None:
return False, "请输入PLC的IP地址"
if not self.__isIP(self.plcIp):
return False, "PLC的IP地址格式异常"
if len(tag) == 0:
return False, "请输入标签tag:str"
if len(array) == 0:
return False, "请输入要写入的参数值"
ret = self.comm.Write(tag, array)
# results = [{"tagName": ret.TagName,
# "value": ret.Value,
# "status": ret.Status
# }]
if ret.Status == "Success":
pass
else:
logger.error("数组信息写入失败,计划写入内容:%s:%s", tag, str(array))
return False, "数组信息写入失败,计划写入内容:%s:%s" % (tag, str(array))
logger.debug("数组信息写入成功,写入内容:%s:%s", tag, str(array))
return True, ""
except IOError as io_error:
if connect_count > 0:
# print(traceback.format_exc())
logger.error("数组信息写入失败,请重试。计划写入内容:%s:%s 反馈的异常信息:%s", tag, str(array), traceback.format_exc())
return False, "数组信息写入失败,请重试。计划写入内容:%s:%s 反馈的异常信息:%s" % (tag, str(array), traceback.format_exc())
connect_count += 1
return self.write_array(tag, array)
except Exception as e:
# print(traceback.format_exc())
logger.error("数组信息写入失败,请重试。计划写入内容:%s:%s 反馈的异常信息:%s", tag, str(array),
traceback.format_exc())
return False, "数组信息写入失败,请重试。计划写入内容:%s:%s 反馈的异常信息:%s" % (tag, str(array),
traceback.format_exc())
def get_tags(self, tags: [str, list]):
"""
获取标签或者标签列表
:param tags: 目标标签列表 [str|list]
:return: [bool, str, list]
"""
results = []
connect_count: int = 0
try:
if self.plcIp is None:
return False, "请输入PLC的IP地址", None
if not self.__isIP(self.plcIp):
return False, "PLC的IP地址格式异常", None
if len(tags) == 0:
return False, "请输入标签tags:list", None
logger.debug("读取PLC的点位信息所读点位%s", str(tags))
rets = self.comm.Read(tags)
for ret in rets:
results.append(
{"tagName": ret.TagName,
"value": ret.Value,
"status": ret.Status
})
if ret.Status != "Success":
logger.error("读取PLC的点位失败反馈结果%s", str(results))
return False, "读取PLC点位失败", results
logger.debug("已读取到PLC的点位信息读取结果%s", str(results))
return True, "", results
except IOError as io_error:
if connect_count > 0:
print(traceback.format_exc())
logger.error("读取PLC的点位失败所读点位%s 反馈错误信息:%s", str(tags), traceback.format_exc())
return False, str(io_error), None
connect_count += 1
print(traceback.format_exc())
return self.get_tags(tags)
except Exception as e:
print(traceback.format_exc())
logger.error("读取PLC的点位失败所读点位%s 反馈错误信息:%s", str(tags), traceback.format_exc())
return False, str(e), None
def get_array(self, tag: str, count: int = 1):
"""
读取数组
:param tag: 目标标签
:param count: 读取个数
:return: [bool, str, (dict, None)]
"""
connect_count: int = 0
try:
if self.plcIp is None:
return False, "请输入PLC的IP地址", None
if not self.__isIP(self.plcIp):
return False, "PLC的IP地址格式异常", None
if len(tag) == 0:
return False, "请输入标签tag:str", None
if count <= 0:
return False, "取数个数应大于0", None
logger.debug("读取PLC的数组信息所读点位%s[%d]", str(tag), count)
ret = self.comm.Read(tag, count)
results = {"tagName": ret.TagName,
"value": ret.Value,
"status": ret.Status
}
if ret.Status != "Success":
logger.error("读取PLC的数组失败所读点位%s[%d] 反馈结果:%s", tag, count, str(results))
return False, "读取PLC的数组失败", results
logger.debug("已读取PLC的数组信息读取结果%s", str(results))
return True, "", results
except IOError as io_error:
if connect_count > 0:
print(traceback.format_exc())
logger.error("读取PLC的数组失败所读点位%s[%d] 反馈错误信息:%s", tag, count, traceback.format_exc())
return False, str(io_error), None
connect_count += 1
return self.get_array(tag, count)
except Exception as e:
print(traceback.format_exc())
logger.error("读取PLC的数组失败所读点位%s[%d] 反馈错误信息:%s", tag, count, traceback.format_exc())
return False, str(e), None
def get_tag_list(self):
"""
获取标签列表
:return:
"""
connect_count: int = 0
results = []
try:
logger.debug("获取PLC所有标签")
tags = self.comm.GetTagList()
for t in tags.Value:
results.append({t.TagName: t.DataType})
logger.debug("已获取到PLC所有标签, 标签列表:%s", str(results))
return results
except IOError as io_error:
if connect_count > 0:
print(traceback.format_exc())
logger.error("获取PLC所有标签失败反馈错误信息%s", traceback.format_exc())
return None
connect_count += 1
return self.get_tag_list()
except Exception as e:
print(traceback.format_exc())
logger.error("获取PLC所有标签失败反馈错误信息%s", traceback.format_exc())
return None
def get_plc_time(self, raw=False):
"""
获取PLC时间
:param raw: 如果设置 raw=True则将返回原始微秒否则返回 [datetime] 2021-04-20 15:41:22.964380
:return:
"""
connect_count: int = 0
try:
logger.debug("开始获取PLC时间")
time = self.comm.GetPLCTime(raw)
logger.debug("获取PLC时间成功%s", str(time))
return time
except IOError as io_error:
if connect_count > 0:
print(traceback.format_exc())
logger.error("获取PLC时间失败反馈错误信息%s", traceback.format_exc())
return None
connect_count += 1
return self.get_plc_time()
except Exception as e:
print(traceback.format_exc())
logger.error("获取PLC时间失败反馈错误信息%s", traceback.format_exc())
return None
# if __name__ == "__main__":
# plc = PLC_Tool("", 0)
#
# success, msg = plc.write_tag("PMCD", 0)