generated from zhangwei/Train_Identify
			
		
			
	
	
		
			278 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
		
		
			
		
	
	
			278 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
|  | import traceback | |||
|  | import re | |||
|  | from pylogix import PLC | |||
|  | 
 | |||
|  | 
 | |||
|  | class PLC_Tool: | |||
|  |     def __init__(self, plcIp: str, plcSlot: int = 0, timeOut: float = 5): | |||
|  |         self.plcIp = plcIp | |||
|  |         self.plcSlot = plcSlot | |||
|  |         self.timeOut = timeOut | |||
|  |         self.comm = None | |||
|  |         self.comm = PLC(self.plcIp, self.plcSlot, self.timeOut) | |||
|  | 
 | |||
|  |     def __del__(self): | |||
|  |         self.comm.Close() | |||
|  | 
 | |||
|  |     def __isIP(self, ip: str): | |||
|  |         """
 | |||
|  |         判断是否是IP地址 | |||
|  |         :param ip: | |||
|  |         :return: bool | |||
|  |         """
 | |||
|  |         pattern = r'^((?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]\d|\d)\.){3}(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]\d|\d)$' | |||
|  |         if re.match(pattern, ip): | |||
|  |             return True | |||
|  |         else: | |||
|  |             return False | |||
|  | 
 | |||
|  |     def connect(self): | |||
|  |         self.comm = PLC(self.plcIp, self.plcSlot, self.timeOut) | |||
|  | 
 | |||
|  |     def close(self): | |||
|  |         self.comm.Close() | |||
|  | 
 | |||
|  |     def batch_write_tag(self, tags: list): | |||
|  |         """
 | |||
|  |         Write tags to PLC . 批量写入标签 | |||
|  |         :param tags: [("AI.speed", 10.32),("AI.start", 1)] | |||
|  |         :return: [int:(0:失败,1:成功, 2:成功部分), str:异常信息, list: 修改的标签tuple] | |||
|  |         """
 | |||
|  |         connect_count: int = 0 | |||
|  |         try: | |||
|  |             if self.plcIp is None: | |||
|  |                 return 0, "请输入PLC的IP地址", None | |||
|  |             if not self.__isIP(self.plcIp): | |||
|  |                 return 0, "PLC的IP地址格式异常", None | |||
|  |             if tags is None: | |||
|  |                 return 0, "参数tags未写入需要修改的标签点位", None | |||
|  |             if not isinstance(tags, list): | |||
|  |                 return 0, "参数tags应该是一个[{key:value},{key2:value2}]的格式", None | |||
|  |             if len(tags) == 0: | |||
|  |                 return 0, "请输入标签tags:list", None | |||
|  |             for tag in tags: | |||
|  |                 if not isinstance(tag, tuple): | |||
|  |                     return 0, "输入的参数格式不对,请参照:[(\"AI.speed\", 10.32),(\"AI.start\", 1)]", None | |||
|  | 
 | |||
|  |             print("需要写入的数据:", tags) | |||
|  |             rets = self.comm.Write(tags) | |||
|  | 
 | |||
|  |             hasNone = False | |||
|  |             results = [] | |||
|  |             for ret in rets: | |||
|  |                 results.append( | |||
|  |                     {"tagName": ret.TagName, | |||
|  |                      "value": ret.Value, | |||
|  |                      "status": ret.Status | |||
|  |                      }) | |||
|  |                 if ret.Status != "Success": | |||
|  |                     hasNone = True | |||
|  |             if hasNone: | |||
|  |                 return 2, "存在写入失败的标签", results | |||
|  |             else: | |||
|  |                 return 1, "", results | |||
|  |         except IOError as io_error: | |||
|  |             if connect_count > 0: | |||
|  |                 print(traceback.format_exc()) | |||
|  |                 return 0, str(io_error), None | |||
|  |             connect_count += 1 | |||
|  |             self.connect() | |||
|  |             self.batch_write_tag(tags) | |||
|  |         except Exception as e: | |||
|  |             print(traceback.format_exc()) | |||
|  |             return 0, str(e), None | |||
|  | 
 | |||
|  |     def write_tag(self, tag: str, value: [str, int, float]): | |||
|  |         """
 | |||
|  |         Write tags to PLC . 批量写入标签 | |||
|  |         :param tag: [("AI.speed", 10.32),("AI.start", 1)] | |||
|  |         :param value: type is [str|int|float] | |||
|  |         :return: [bool, str:异常信息, list: 修改的标签tuple] | |||
|  |         """
 | |||
|  |         connect_count: int = 0 | |||
|  |         try: | |||
|  |             if self.plcIp is None: | |||
|  |                 return False, "请输入PLC的IP地址" | |||
|  |             if not self.__isIP(self.plcIp): | |||
|  |                 return False, "PLC的IP地址格式异常" | |||
|  |             if len(tag) == 0: | |||
|  |                 return False, "请输入标签tag:str" | |||
|  |             # if len(value) == 0: | |||
|  |             #     return False, "请输入标签'%s'的新值value:str" % tag | |||
|  | 
 | |||
|  |             print("需要写入的数据 %s:%s" % (tag, str(value))) | |||
|  |             ret = self.comm.Write(tag, value) | |||
|  | 
 | |||
|  |             if ret.Status != "Success": | |||
|  |                 return False, "存在写入失败的标签" | |||
|  |             else: | |||
|  |                 return True, "" | |||
|  |         except IOError as io_error: | |||
|  |             if connect_count > 0: | |||
|  |                 print(traceback.format_exc()) | |||
|  |                 return False, str(io_error) | |||
|  |             connect_count += 1 | |||
|  |             self.connect() | |||
|  |             self.write_tag(tag, value) | |||
|  |         except Exception as e: | |||
|  |             print(traceback.format_exc()) | |||
|  |             return False, str(e) | |||
|  | 
 | |||
|  |     def write_array(self, tag: str, array: list): | |||
|  |         """
 | |||
|  |         写入数组 | |||
|  |         :param tag: 标签的名字,例如AI.speed[12] | |||
|  |         :param array:具体数值 | |||
|  |         :return: [bool, str:异常信息] | |||
|  |         """
 | |||
|  |         connect_count: int = 0 | |||
|  |         try: | |||
|  |             if self.plcIp is None: | |||
|  |                 return False, "请输入PLC的IP地址" | |||
|  |             if not self.__isIP(self.plcIp): | |||
|  |                 return False, "PLC的IP地址格式异常" | |||
|  |             if len(tag) == 0: | |||
|  |                 return False, "请输入标签tag:str" | |||
|  |             if len(array) == 0: | |||
|  |                 return False, "请输入要写入的参数值" | |||
|  | 
 | |||
|  |             ret = self.comm.Write(tag, array) | |||
|  | 
 | |||
|  |             # results = [{"tagName": ret.TagName, | |||
|  |             #             "value": ret.Value, | |||
|  |             #             "status": ret.Status | |||
|  |             #             }] | |||
|  |             if ret.Status != "Success": | |||
|  |                 return False, "数组信息写入失败" | |||
|  | 
 | |||
|  |             return True, "" | |||
|  |         except IOError as io_error: | |||
|  |             if connect_count > 0: | |||
|  |                 print(traceback.format_exc()) | |||
|  |                 return False, str(io_error) | |||
|  |             connect_count += 1 | |||
|  |             self.connect() | |||
|  |             self.write_array(tag, array) | |||
|  |         except Exception as e: | |||
|  |             print(traceback.format_exc()) | |||
|  |             return False, str(e) | |||
|  | 
 | |||
|  |     def get_tags(self, tags: [str, list]): | |||
|  |         """
 | |||
|  |         获取标签或者标签列表 | |||
|  |         :param tags: 目标标签列表 [str|list] | |||
|  |         :return: [bool, str, list] | |||
|  |         """
 | |||
|  |         results = [] | |||
|  |         connect_count: int = 0 | |||
|  |         try: | |||
|  |             if self.plcIp is None: | |||
|  |                 return False, "请输入PLC的IP地址", None | |||
|  |             if not self.__isIP(self.plcIp): | |||
|  |                 return False, "PLC的IP地址格式异常", None | |||
|  |             if len(tags) == 0: | |||
|  |                 return False, "请输入标签tags:list", None | |||
|  | 
 | |||
|  |             rets = self.comm.Read(tags) | |||
|  |             for ret in rets: | |||
|  |                 results.append( | |||
|  |                     {"tagName": ret.TagName, | |||
|  |                      "value": ret.Value, | |||
|  |                      "status": ret.Status | |||
|  |                      }) | |||
|  |             return True, "", results | |||
|  |         except IOError as io_error: | |||
|  |             if connect_count > 0: | |||
|  |                 print(traceback.format_exc()) | |||
|  |                 return False, str(io_error), None | |||
|  |             connect_count += 1 | |||
|  |             self.connect() | |||
|  |             self.get_tags(tags) | |||
|  |             print(traceback.format_exc()) | |||
|  |         except Exception as e: | |||
|  |             print(traceback.format_exc()) | |||
|  |             return False, str(e), None | |||
|  | 
 | |||
|  |     def get_array(self, tag: str, count: int = 1): | |||
|  |         """
 | |||
|  |         读取数组 | |||
|  |         :param tag: 目标标签 | |||
|  |         :param count: 读取个数 | |||
|  |         :return: [bool, str, (dict, None)] | |||
|  |         """
 | |||
|  |         connect_count: int = 0 | |||
|  |         try: | |||
|  |             if self.plcIp is None: | |||
|  |                 return False, "请输入PLC的IP地址", None | |||
|  |             if not self.__isIP(self.plcIp): | |||
|  |                 return False, "PLC的IP地址格式异常", None | |||
|  |             if len(tag) == 0: | |||
|  |                 return False, "请输入标签tags:list", None | |||
|  |             if count <= 0: | |||
|  |                 return False, "取数个数应大于0", None | |||
|  | 
 | |||
|  |             ret = self.comm.Read(tag, count) | |||
|  |             results = {"tagName": ret.TagName, | |||
|  |                        "value": ret.Value, | |||
|  |                        "status": ret.Status | |||
|  |                        } | |||
|  |             return True, "", results | |||
|  |         except IOError as io_error: | |||
|  |             if connect_count > 0: | |||
|  |                 print(traceback.format_exc()) | |||
|  |                 return False, str(io_error), None | |||
|  |             connect_count += 1 | |||
|  |             self.connect() | |||
|  |             self.get_array(tag, count) | |||
|  |         except Exception as e: | |||
|  |             print(traceback.format_exc()) | |||
|  |             return False, str(e), None | |||
|  | 
 | |||
|  |     def get_tag_list(self): | |||
|  |         """
 | |||
|  |         获取标签列表 | |||
|  |         :return: | |||
|  |         """
 | |||
|  |         connect_count: int = 0 | |||
|  |         results = [] | |||
|  |         try: | |||
|  |             tags = self.comm.GetTagList() | |||
|  |             for t in tags.Value: | |||
|  |                 results.append({t.TagName: t.DataType}) | |||
|  |             return results | |||
|  |         except IOError as io_error: | |||
|  |             if connect_count > 0: | |||
|  |                 print(traceback.format_exc()) | |||
|  |                 return None | |||
|  |             connect_count += 1 | |||
|  |             self.connect() | |||
|  |             self.get_tag_list() | |||
|  |         except Exception as e: | |||
|  |             print(traceback.format_exc()) | |||
|  |             return None | |||
|  | 
 | |||
|  |     def get_plc_time(self, raw=False): | |||
|  |         """
 | |||
|  |         获取PLC时间 | |||
|  |         :param raw: 如果设置 raw=True,则将返回原始微秒,否则返回 [datetime] 2021-04-20 15:41:22.964380 | |||
|  |         :return: | |||
|  |         """
 | |||
|  |         connect_count: int = 0 | |||
|  |         try: | |||
|  |             time = self.comm.GetPLCTime(raw) | |||
|  |             return time | |||
|  |         except IOError as io_error: | |||
|  |             if connect_count > 0: | |||
|  |                 print(traceback.format_exc()) | |||
|  |                 return None | |||
|  |             connect_count += 1 | |||
|  |             self.connect() | |||
|  |             self.get_plc_time() | |||
|  |         except Exception as e: | |||
|  |             print(traceback.format_exc()) | |||
|  |             return None | |||
|  | 
 | |||
|  | # if __name__ == "__main__": | |||
|  | #     plc = PLC_Tool("", 0) | |||
|  | # | |||
|  | #     success, msg = plc.write_tag("PMCD", 0) |