# import logging import traceback import re from pylogix import PLC from .LogT import logging, Log logger = Log(stdout_log_level=logging.FATAL, fil_log_level=logging.FATAL).getLogger() class PLC_Tool: def __init__(self, plcIp: str, plcSlot: int = 0, timeOut: float = 1): self.plcIp = plcIp self.plcSlot = plcSlot self.timeOut = timeOut self.comm = None logger.info("准备连接PLC,IP:%s, slot: %d, timeout: %d", self.plcIp, self.plcSlot, self.timeOut) self.comm = PLC(self.plcIp, self.plcSlot, self.timeOut) logger.info("连接PLC成功") def __del__(self): logger.info("准备断开PLC连接") self.comm.Close() logger.info("断开PLC连接完成") def __isIP(self, ip: str): """ 判断是否是IP地址 :param ip: :return: bool """ pattern = r'^((?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]\d|\d)\.){3}(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]\d|\d)$' if re.match(pattern, ip): return True else: return False def connect(self): logger.warning("准备重新连接PLC") self.comm = PLC(self.plcIp, self.plcSlot, self.timeOut) logger.info("连接PLC成功") def close(self): logger.info("准备断开PLC连接") self.comm.Close() logger.info("断开PLC连接完成") def batch_write_tag(self, tags: list): """ Write tags to PLC . 批量写入标签 :param tags: [("AI.speed", 10.32),("AI.start", 1)] :return: [int:(0:失败,1:成功, 2:成功部分), str:异常信息, list: 修改的标签tuple] """ connect_count: int = 0 try: if self.plcIp is None: return 0, "请输入PLC的IP地址", None if not self.__isIP(self.plcIp): return 0, "PLC的IP地址格式异常", None if tags is None: return 0, "参数tags未写入需要修改的标签点位", None if not isinstance(tags, list): return 0, "参数tags应该是一个[{key:value},{key2:value2}]的格式", None if len(tags) == 0: return 0, "请输入标签tags:list", None for tag in tags: if not isinstance(tag, tuple): return 0, "输入的参数格式不对,请参照:[(\"AI.speed\", 10.32),(\"AI.start\", 1)]", None # print("准备写入PLC的数据:%s" % str(tags)) logger.info("准备写入PLC的数据:%s", str(tags)) rets = self.comm.Write(tags) hasNone = False results = [] for ret in rets: results.append( {"tagName": ret.TagName, "value": ret.Value, "status": ret.Status }) if ret.Status != "Success": hasNone = True if hasNone: logger.warning("写入PLC的数据中,存在部分失败:%s", str(results)) return 2, "存在写入失败的标签", results else: logger.info("写入PLC成功,写入的内容:%s", str(results)) return 1, "", results except IOError as io_error: if connect_count > 0: print(traceback.format_exc()) logger.error("写入PLC异常,计划写入的内容:%s 。 异常反馈:%s", str(tags), traceback.format_exc()) return 0, str(io_error), None connect_count += 1 self.connect() return self.batch_write_tag(tags) except Exception as e: print(traceback.format_exc()) logger.error("写入PLC异常,计划写入的内容:%s 。 异常反馈:%s", str(tags), traceback.format_exc()) return 0, str(e), None def write_tag(self, tag: str, value: [str, int, float]): """ Write tags to PLC . 批量写入标签 :param tag: "AI.speed"... :param value: type is [str|int|float] :return: [bool, str:异常信息, list: 修改的标签tuple] """ connect_count: int = 0 try: if self.plcIp is None: return False, "请输入PLC的IP地址" if not self.__isIP(self.plcIp): return False, "PLC的IP地址格式异常" if len(tag) == 0: return False, "请输入标签tag:str" # if len(value) == 0: # return False, "请输入标签'%s'的新值value:str" % tag # print("需要写入的数据 %s:%s" % (tag, str(value))) logger.info("准备写入PLC的数据:%s:%s", tag, str(value)) ret = self.comm.Write(tag, value) if ret.Status != "Success": logger.error("存在写入失败的标签, 计划写入的信息:%s:%s", tag, str(value)) return False, "存在写入失败的标签" else: logger.info("写入PLC成功,写入的数据:%s:%s", tag, str(value)) return True, "" except IOError as io_error: if connect_count > 0: print(traceback.format_exc()) logger.error("写入PLC异常,计划写入的内容:%s:%s 。 异常反馈:%s", tag, str(value), traceback.format_exc()) return False, str(io_error) connect_count += 1 self.connect() return self.write_tag(tag, value) except Exception as e: print(traceback.format_exc()) logger.error("写入PLC异常,计划写入的内容:%s:%s 。 异常反馈:%s", tag, str(value), traceback.format_exc()) return False, str(e) def write_array(self, tag: str, array: list): """ 写入数组 :param tag: 标签的名字,例如AI.speed[12] :param array:具体数值 :return: [bool, str:异常信息] """ connect_count: int = 0 try: if self.plcIp is None: return False, "请输入PLC的IP地址" if not self.__isIP(self.plcIp): return False, "PLC的IP地址格式异常" if len(tag) == 0: return False, "请输入标签tag:str" if len(array) == 0: return False, "请输入要写入的参数值" ret = self.comm.Write(tag, array) # results = [{"tagName": ret.TagName, # "value": ret.Value, # "status": ret.Status # }] if ret.Status != "Success": logger.error("数组信息写入失败,计划写入内容:%s:%s", tag, str(array)) return False, "数组信息写入失败" logger.info("数组信息写入成功,写入内容:%s:%s", tag, str(array)) return True, "" except IOError as io_error: if connect_count > 0: print(traceback.format_exc()) logger.error("数组信息写入失败,计划写入内容:%s:%s, 反馈的异常信息:%s", tag, str(array), traceback.format_exc()) return False, str(io_error) connect_count += 1 self.connect() return self.write_array(tag, array) except Exception as e: print(traceback.format_exc()) logger.error("数组信息写入失败,计划写入内容:%s:%s, 反馈的异常信息:%s", tag, str(array), traceback.format_exc()) return False, str(e) def get_tags(self, tags: [str, list]): """ 获取标签或者标签列表 :param tags: 目标标签列表 [str|list] :return: [bool, str, list] """ results = [] connect_count: int = 0 try: if self.plcIp is None: return False, "请输入PLC的IP地址", None if not self.__isIP(self.plcIp): return False, "PLC的IP地址格式异常", None if len(tags) == 0: return False, "请输入标签tags:list", None logger.info("读取PLC的点位信息,所读点位:%s", str(tags)) rets = self.comm.Read(tags) for ret in rets: results.append( {"tagName": ret.TagName, "value": ret.Value, "status": ret.Status }) if ret.Status != "Success": logger.error("读取PLC的点位失败,反馈结果:%s", str(results)) return False, "读取PLC点位失败", results logger.info("已读取到PLC的点位信息,读取结果:%s", str(results)) return True, "", results except IOError as io_error: if connect_count > 0: print(traceback.format_exc()) logger.error("读取PLC的点位失败,所读点位:%s, 反馈错误信息:%s", str(tags), traceback.format_exc()) return False, str(io_error), None connect_count += 1 print(traceback.format_exc()) self.connect() return self.get_tags(tags) except Exception as e: print(traceback.format_exc()) logger.error("读取PLC的点位失败,所读点位:%s, 反馈错误信息:%s", str(tags), traceback.format_exc()) return False, str(e), None def get_array(self, tag: str, count: int = 1): """ 读取数组 :param tag: 目标标签 :param count: 读取个数 :return: [bool, str, (dict, None)] """ connect_count: int = 0 try: if self.plcIp is None: return False, "请输入PLC的IP地址", None if not self.__isIP(self.plcIp): return False, "PLC的IP地址格式异常", None if len(tag) == 0: return False, "请输入标签tag:str", None if count <= 0: return False, "取数个数应大于0", None logger.info("读取PLC的数组信息,所读点位:%s[%d]", str(tag), count) ret = self.comm.Read(tag, count) results = {"tagName": ret.TagName, "value": ret.Value, "status": ret.Status } if ret.Status != "Success": logger.error("读取PLC的数组失败,所读点位:%s[%d], 反馈结果:%s", tag, count, str(results)) return False, "读取PLC的数组失败", results logger.info("已读取PLC的数组信息,读取结果:%s", str(results)) return True, "", results except IOError as io_error: if connect_count > 0: print(traceback.format_exc()) logger.error("读取PLC的数组失败,所读点位:%s[%d], 反馈错误信息:%s", tag, count, traceback.format_exc()) return False, str(io_error), None connect_count += 1 self.connect() return self.get_array(tag, count) except Exception as e: print(traceback.format_exc()) logger.error("读取PLC的数组失败,所读点位:%s[%d], 反馈错误信息:%s", tag, count, traceback.format_exc()) return False, str(e), None def get_tag_list(self): """ 获取标签列表 :return: """ connect_count: int = 0 results = [] try: logger.info("获取PLC所有标签") tags = self.comm.GetTagList() for t in tags.Value: results.append({t.TagName: t.DataType}) logger.info("已获取到PLC所有标签, 标签列表:%s", str(results)) return results except IOError as io_error: if connect_count > 0: print(traceback.format_exc()) logger.error("获取PLC所有标签失败,反馈错误信息:%s", traceback.format_exc()) return None connect_count += 1 self.connect() return self.get_tag_list() except Exception as e: print(traceback.format_exc()) logger.error("获取PLC所有标签失败,反馈错误信息:%s", traceback.format_exc()) return None def get_plc_time(self, raw=False): """ 获取PLC时间 :param raw: 如果设置 raw=True,则将返回原始微秒,否则返回 [datetime] 2021-04-20 15:41:22.964380 :return: """ connect_count: int = 0 try: logger.info("开始获取PLC时间") time = self.comm.GetPLCTime(raw) logger.info("获取PLC时间成功:%s", str(time)) return time except IOError as io_error: if connect_count > 0: print(traceback.format_exc()) logger.error("获取PLC时间失败,反馈错误信息:%s", traceback.format_exc()) return None connect_count += 1 self.connect() return self.get_plc_time() except Exception as e: print(traceback.format_exc()) logger.error("获取PLC时间失败,反馈错误信息:%s", traceback.format_exc()) return None # if __name__ == "__main__": # plc = PLC_Tool("", 0) # # success, msg = plc.write_tag("PMCD", 0)