import json import requests class interfaceToWeb: def __init__(self): self.url: str = "" self.username: str = "" self.password: str = "" self.authorization: str = "" self.token: str = "" def loginWeb(self, url: str, username: str, password: str, authorization: str): """ 登录web :param url: 请求地址 :param username: 用户名 :param password: 密码 :param authorization: 授权 :return: """ self.url: str = url self.username: str = username self.password: str = password self.authorization: str = authorization headers = {'Authorization': "Basic %s" % str(authorization)} data = {"username": str(username), "password": str(password), "tenantId": "000000", "grant_type": "password"} try: res = requests.post(url=url, headers=headers, data=data, timeout=1) if res.status_code == requests.codes.ok: jsonData = json.loads(res.text) if jsonData.get('access_token', None) is not None and jsonData.get('token_type', None) is not None: self.token = "%s %s" % (jsonData.get('token_type'), jsonData.get('access_token')) return "", jsonData.get('token_type'), jsonData.get('access_token') else: return "【登录web】请求成功,但返回信息中不含。请求信息%s token :%s" % (str(data), str(res.text)), None, None else: return "【登录web】请求登录失败,状态码:%d,请求信息:%s, 返回信息:%s" % ( res.status_code, str(data), str(res.text)), None, None except Exception as e: return "【登录web】访问登录失败!疑似网络问题!", None, None def upload_pinch_coal(self, url: str, load_time: str, carriage_order: int, pinch_coal: float, track_name: str): """ 上传掐煤量 :param url: 请求地址 :param load_time: 装车时间 :param carriage_order: 车厢节号 :param pinch_coal: 掐煤量 :param track_name: 股道名 :return: 请求结果:str """ # url = "http://localhost:20004/train-manual/train-manual-detail/api-save" headers = {'Content-Type': 'application/json', 'blade-auth': self.token} data = {"loadTime": str(load_time), "carriageOrder": carriage_order, "carriageWeightHouse": pinch_coal, # "houseStatus": 4, "poundNo": track_name} try: res = requests.post(url=url, headers=headers, json=data, timeout=1) if res.status_code == requests.codes.ok: jsonData = json.loads(res.text) if not jsonData['success']: if res.status_code == 401: self.loginWeb(self.url, self.username, self.password, self.authorization) if self.upload_pinch_coal(url, load_time, carriage_order, pinch_coal, track_name) is "": return "" return "【上传掐煤量】联通请求,但返回失败:%s,请求信息:%s" % (jsonData['msg'], str(data)) else: return "" else: return "【上传掐煤量】请求失败,状态码:%d,请求信息:%s, 返回信息:%s" % ( res.status_code, str(data), str(res.text)) except Exception as e: return "【上传掐煤量】访问接口失败!疑似网络问题!" def upload_gross_weight(self, url: str, load_time: str, carriage_order: int, gross_weight: float, track_name: str): """ 上传装车毛重 :param url: 请求地址 :param load_time: 装车时间 :param carriage_order: 车厢节号 :param gross_weight: 掐煤量 :param track_name: 股道名 :return: 请求结果:str """ # url = "http://localhost:20004/train-manual/train-manual-detail/api-save" headers = {'Content-Type': 'application/json', 'blade-auth': self.token} data = {"loadTime": str(load_time), "carriageOrder": carriage_order, "grossWeightHouse": gross_weight, "houseStatus": 4, "poundNo": track_name} try: res = requests.post(url=url, headers=headers, json=data, timeout=1) if res.status_code == requests.codes.ok: jsonData = json.loads(res.text) if not jsonData['success']: if res.status_code == 401: self.loginWeb(self.url, self.username, self.password, self.authorization) if self.upload_gross_weight(url, load_time, carriage_order, gross_weight, track_name) is "": return "" return "【上传装车毛重】联通请求,但返回失败:%s,请求信息:%s" % (jsonData['msg'], str(data)) else: return "" else: return "【上传装车毛重】请求失败,状态码:%d,请求信息:%s, 返回信息:%s" % ( res.status_code, str(data), str(res.text)) except Exception as e: return "【上传装车毛重】访问接口失败!疑似网络问题!" def update_load_type(self, url: str, load_time: str, carriage_order: int, house_status: int, track_name: str): """ 修改装车状态 :param url: 请求地址 :param load_time: 装车时间 :param carriage_order: 车厢节号 :param house_status: 装车状态 :param track_name: 股道名 :return: 请求结果:str """ headers = {'Content-Type': 'application/json', 'blade-auth': self.token} data = {"loadTime": str(load_time), "carriageOrder": carriage_order, "houseStatus": house_status, "poundNo": track_name} try: res = requests.post(url=url, headers=headers, json=data, timeout=1) if res.status_code == requests.codes.ok: jsonData = json.loads(res.text) if not jsonData['success']: if res.status_code == 401: self.loginWeb(self.url, self.username, self.password, self.authorization) if self.update_load_type(url, load_time, carriage_order, house_status, track_name) is "": return "" return "【修改装车状态】联通请求,但返回失败:%s,请求信息:%s" % (jsonData['msg'], str(data)) else: return "" else: return "【修改装车状态】请求失败,状态码:%d,请求信息:%s, 返回信息:%s" % ( res.status_code, str(data), str(res.text)) except Exception as e: return "【修改装车状态】访问接口失败!疑似网络问题!" def upload_device_type(self, url: str, data): """ :param url: 请求地址 :param data: 设备状态集合 :return: 请求结果:str """ headers = {'Content-Type': 'application/json', 'blade-auth': self.token} try: res = requests.post(url=url, headers=headers, json=data, timeout=1) if res.status_code == requests.codes.ok: jsonData = json.loads(res.text) if not jsonData['success']: if res.status_code == 401: self.loginWeb(self.url, self.username, self.password, self.authorization) if self.upload_device_type(url, data) is "": return "" return "【上传设备状态】联通请求,但返回失败:%s,请求信息:%s" % (jsonData['msg'], str(data)) else: return "" else: return "【上传设备状态】请求失败,状态码:%d,请求信息:%s, 返回信息:%s" % ( res.status_code, str(data), str(res.text)) except Exception as e: return "【上传设备状态】访问接口失败!疑似网络问题!"