VMiddleware/service/interfaceToWeb.py

191 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import requests
class interfaceToWeb:
def __init__(self):
self.url: str = ""
self.username: str = ""
self.password: str = ""
self.authorization: str = ""
self.token: str = ""
def loginWeb(self, url: str, username: str, password: str, authorization: str):
"""
登录web
:param url: 请求地址
:param username: 用户名
:param password: 密码
:param authorization: 授权
:return:
"""
self.url: str = url
self.username: str = username
self.password: str = password
self.authorization: str = authorization
headers = {'Authorization': "Basic %s" % str(authorization)}
data = {"username": str(username),
"password": str(password),
"tenantId": "000000",
"grant_type": "password"}
try:
res = requests.post(url=url, headers=headers, data=data)
if res.status_code == requests.codes.ok:
jsonData = json.loads(res.text)
if jsonData.get('access_token', None) is not None and jsonData.get('token_type', None) is not None:
self.token = "%s %s" % (jsonData.get('token_type'), jsonData.get('access_token'))
return "", jsonData.get('token_type'), jsonData.get('access_token')
else:
return "【登录web】请求成功但返回信息中不含。请求信息%s token :%s" % (str(data), str(res.text)), None, None
else:
return "【登录web】请求登录失败状态码%d,请求信息:%s 返回信息:%s" % (
res.status_code, str(data), str(res.text)), None, None
except Exception as e:
return "【登录web】访问登录失败疑似网络问题", None, None
def upload_pinch_coal(self, url: str, load_time: str, carriage_order: int,
pinch_coal: float,
track_name: str):
"""
上传掐煤量
:param url: 请求地址
:param load_time: 装车时间
:param carriage_order: 车厢节号
:param pinch_coal: 掐煤量
:param track_name: 股道名
:return: 请求结果:str
"""
# url = "http://localhost:20004/train-manual/train-manual-detail/api-save"
headers = {'Content-Type': 'application/json', 'blade-auth': self.token}
data = {"loadTime": str(load_time),
"carriageOrder": carriage_order,
"carriageWeightHouse": pinch_coal,
# "houseStatus": 4,
"poundNo": track_name}
try:
res = requests.post(url=url, headers=headers, json=data)
if res.status_code == requests.codes.ok:
jsonData = json.loads(res.text)
if not jsonData['success']:
if res.status_code == 401:
self.loginWeb(self.url, self.username, self.password, self.authorization)
if self.upload_pinch_coal(url, load_time, carriage_order, pinch_coal, track_name) is "":
return ""
return "【上传掐煤量】联通请求,但返回失败:%s,请求信息:%s" % (jsonData['msg'], str(data))
else:
return ""
else:
return "【上传掐煤量】请求失败,状态码:%d,请求信息:%s 返回信息:%s" % (
res.status_code, str(data), str(res.text))
except Exception as e:
return "【上传掐煤量】访问接口失败!疑似网络问题!"
def upload_gross_weight(self, url: str, load_time: str, carriage_order: int,
gross_weight: float,
track_name: str):
"""
上传装车毛重
:param url: 请求地址
:param load_time: 装车时间
:param carriage_order: 车厢节号
:param gross_weight: 掐煤量
:param track_name: 股道名
:return: 请求结果:str
"""
# url = "http://localhost:20004/train-manual/train-manual-detail/api-save"
headers = {'Content-Type': 'application/json', 'blade-auth': self.token}
data = {"loadTime": str(load_time),
"carriageOrder": carriage_order,
"grossWeightHouse": gross_weight,
"houseStatus": 4,
"poundNo": track_name}
try:
res = requests.post(url=url, headers=headers, json=data)
if res.status_code == requests.codes.ok:
jsonData = json.loads(res.text)
if not jsonData['success']:
if res.status_code == 401:
self.loginWeb(self.url, self.username, self.password, self.authorization)
if self.upload_gross_weight(url, load_time, carriage_order, gross_weight, track_name) is "":
return ""
return "【上传装车毛重】联通请求,但返回失败:%s,请求信息:%s" % (jsonData['msg'], str(data))
else:
return ""
else:
return "【上传装车毛重】请求失败,状态码:%d,请求信息:%s 返回信息:%s" % (
res.status_code, str(data), str(res.text))
except Exception as e:
return "【上传装车毛重】访问接口失败!疑似网络问题!"
def update_load_type(self, url: str, load_time: str, carriage_order: int,
house_status: int,
track_name: str):
"""
修改装车状态
:param url: 请求地址
:param load_time: 装车时间
:param carriage_order: 车厢节号
:param house_status: 装车状态
:param track_name: 股道名
:return: 请求结果:str
"""
headers = {'Content-Type': 'application/json', 'blade-auth': self.token}
data = {"loadTime": str(load_time),
"carriageOrder": carriage_order,
"houseStatus": house_status,
"poundNo": track_name}
try:
res = requests.post(url=url, headers=headers, json=data)
if res.status_code == requests.codes.ok:
jsonData = json.loads(res.text)
if not jsonData['success']:
if res.status_code == 401:
self.loginWeb(self.url, self.username, self.password, self.authorization)
if self.update_load_type(url, load_time, carriage_order, house_status, track_name) is "":
return ""
return "【修改装车状态】联通请求,但返回失败:%s,请求信息:%s" % (jsonData['msg'], str(data))
else:
return ""
else:
return "【修改装车状态】请求失败,状态码:%d,请求信息:%s 返回信息:%s" % (
res.status_code, str(data), str(res.text))
except Exception as e:
return "【修改装车状态】访问接口失败!疑似网络问题!"
def upload_device_type(self, url: str, data):
"""
:param url: 请求地址
:param data: 设备状态集合
:return: 请求结果:str
"""
headers = {'Content-Type': 'application/json', 'blade-auth': self.token}
try:
res = requests.post(url=url, headers=headers, json=data)
if res.status_code == requests.codes.ok:
jsonData = json.loads(res.text)
if not jsonData['success']:
if res.status_code == 401:
self.loginWeb(self.url, self.username, self.password, self.authorization)
if self.upload_device_type(url, data) is "":
return ""
return "【上传设备状态】联通请求,但返回失败:%s,请求信息:%s" % (jsonData['msg'], str(data))
else:
return ""
else:
return "【上传设备状态】请求失败,状态码:%d,请求信息:%s 返回信息:%s" % (
res.status_code, str(data), str(res.text))
except Exception as e:
return "【上传设备状态】访问接口失败!疑似网络问题!"