VMiddleware/service/interfaceToWeb.py

191 lines
8.4 KiB
Python
Raw Normal View History

import json
import requests
class interfaceToWeb:
def __init__(self):
self.url: str = ""
self.username: str = ""
self.password: str = ""
self.authorization: str = ""
self.token: str = ""
def loginWeb(self, url: str, username: str, password: str, authorization: str):
"""
登录web
:param url: 请求地址
:param username: 用户名
:param password: 密码
:param authorization: 授权
:return:
"""
self.url: str = url
self.username: str = username
self.password: str = password
self.authorization: str = authorization
headers = {'Authorization': "Basic %s" % str(authorization)}
data = {"username": str(username),
"password": str(password),
"tenantId": "000000",
"grant_type": "password"}
try:
res = requests.post(url=url, headers=headers, data=data)
if res.status_code == requests.codes.ok:
jsonData = json.loads(res.text)
if jsonData.get('access_token', None) is not None and jsonData.get('token_type', None) is not None:
self.token = "%s %s" % (jsonData.get('token_type'), jsonData.get('access_token'))
return "", jsonData.get('token_type'), jsonData.get('access_token')
else:
return "【登录web】请求成功但返回信息中不含。请求信息%s token :%s" % (str(data), str(res.text)), None, None
else:
return "【登录web】请求登录失败状态码%d,请求信息:%s 返回信息:%s" % (
res.status_code, str(data), str(res.text)), None, None
except Exception as e:
return "【登录web】访问登录失败疑似网络问题", None, None
def upload_pinch_coal(self, url: str, load_time: str, carriage_order: int,
pinch_coal: float,
track_name: str):
"""
上传掐煤量
:param url: 请求地址
:param load_time: 装车时间
:param carriage_order: 车厢节号
:param pinch_coal: 掐煤量
:param track_name: 股道名
:return: 请求结果:str
"""
# url = "http://localhost:20004/train-manual/train-manual-detail/api-save"
headers = {'Content-Type': 'application/json', 'blade-auth': self.token}
data = {"loadTime": str(load_time),
"carriageOrder": carriage_order,
"carriageWeightHouse": pinch_coal,
# "houseStatus": 4,
"poundNo": track_name}
try:
res = requests.post(url=url, headers=headers, json=data)
if res.status_code == requests.codes.ok:
jsonData = json.loads(res.text)
if not jsonData['success']:
if res.status_code == 401:
self.loginWeb(self.url, self.username, self.password, self.authorization)
if self.upload_pinch_coal(url, load_time, carriage_order, pinch_coal, track_name) is "":
return ""
return "【上传掐煤量】联通请求,但返回失败:%s,请求信息:%s" % (jsonData['msg'], str(data))
else:
return ""
else:
return "【上传掐煤量】请求失败,状态码:%d,请求信息:%s 返回信息:%s" % (
res.status_code, str(data), str(res.text))
except Exception as e:
return "【上传掐煤量】访问接口失败!疑似网络问题!"
def upload_gross_weight(self, url: str, load_time: str, carriage_order: int,
gross_weight: float,
track_name: str):
"""
上传装车毛重
:param url: 请求地址
:param load_time: 装车时间
:param carriage_order: 车厢节号
:param gross_weight: 掐煤量
:param track_name: 股道名
:return: 请求结果:str
"""
# url = "http://localhost:20004/train-manual/train-manual-detail/api-save"
headers = {'Content-Type': 'application/json', 'blade-auth': self.token}
data = {"loadTime": str(load_time),
"carriageOrder": carriage_order,
"grossWeightHouse": gross_weight,
"houseStatus": 4,
"poundNo": track_name}
try:
res = requests.post(url=url, headers=headers, json=data)
if res.status_code == requests.codes.ok:
jsonData = json.loads(res.text)
if not jsonData['success']:
if res.status_code == 401:
self.loginWeb(self.url, self.username, self.password, self.authorization)
if self.upload_gross_weight(url, load_time, carriage_order, gross_weight, track_name) is "":
return ""
return "【上传装车毛重】联通请求,但返回失败:%s,请求信息:%s" % (jsonData['msg'], str(data))
else:
return ""
else:
return "【上传装车毛重】请求失败,状态码:%d,请求信息:%s 返回信息:%s" % (
res.status_code, str(data), str(res.text))
except Exception as e:
return "【上传装车毛重】访问接口失败!疑似网络问题!"
def update_load_type(self, url: str, load_time: str, carriage_order: int,
house_status: int,
track_name: str):
"""
修改装车状态
:param url: 请求地址
:param load_time: 装车时间
:param carriage_order: 车厢节号
:param house_status: 装车状态
:param track_name: 股道名
:return: 请求结果:str
"""
headers = {'Content-Type': 'application/json', 'blade-auth': self.token}
data = {"loadTime": str(load_time),
"carriageOrder": carriage_order,
"houseStatus": house_status,
"poundNo": track_name}
try:
res = requests.post(url=url, headers=headers, json=data)
if res.status_code == requests.codes.ok:
jsonData = json.loads(res.text)
if not jsonData['success']:
if res.status_code == 401:
self.loginWeb(self.url, self.username, self.password, self.authorization)
if self.update_load_type(url, load_time, carriage_order, house_status, track_name) is "":
return ""
return "【修改装车状态】联通请求,但返回失败:%s,请求信息:%s" % (jsonData['msg'], str(data))
else:
return ""
else:
return "【修改装车状态】请求失败,状态码:%d,请求信息:%s 返回信息:%s" % (
res.status_code, str(data), str(res.text))
except Exception as e:
return "【修改装车状态】访问接口失败!疑似网络问题!"
def upload_device_type(self, url: str, data):
"""
:param url: 请求地址
:param data: 设备状态集合
:return: 请求结果:str
"""
headers = {'Content-Type': 'application/json', 'blade-auth': self.token}
try:
res = requests.post(url=url, headers=headers, json=data)
if res.status_code == requests.codes.ok:
jsonData = json.loads(res.text)
if not jsonData['success']:
if res.status_code == 401:
self.loginWeb(self.url, self.username, self.password, self.authorization)
if self.upload_device_type(url, data) is "":
return ""
return "【上传设备状态】联通请求,但返回失败:%s,请求信息:%s" % (jsonData['msg'], str(data))
else:
return ""
else:
return "【上传设备状态】请求失败,状态码:%d,请求信息:%s 返回信息:%s" % (
res.status_code, str(data), str(res.text))
except Exception as e:
return "【上传设备状态】访问接口失败!疑似网络问题!"