pyhOn/pyhon/auth.py

158 lines
6.4 KiB
Python
Raw Normal View History

2023-02-12 17:41:38 -07:00
import json
import logging
import re
import secrets
import urllib
from urllib import parse
import aiohttp as aiohttp
2023-03-16 18:56:04 -06:00
from yarl import URL
2023-02-12 17:41:38 -07:00
2023-02-12 19:36:09 -07:00
from pyhon import const
2023-02-12 17:41:38 -07:00
_LOGGER = logging.getLogger()
class HonAuth:
def __init__(self) -> None:
2023-03-16 18:56:04 -06:00
self._access_token = ""
self._refresh_token = ""
2023-02-12 17:41:38 -07:00
self._cognito_token = ""
self._id_token = ""
@property
def cognito_token(self):
return self._cognito_token
@property
def id_token(self):
return self._id_token
2023-03-16 18:56:04 -06:00
@property
def access_token(self):
return self._access_token
@property
def refresh_token(self):
return self._refresh_token
async def _load_login(self, session):
nonce = secrets.token_hex(16)
nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
params = {
"response_type": "token+id_token",
"client_id": const.CLIENT_ID,
"redirect_uri": urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done"),
"display": "touch",
"scope": "api openid refresh_token web",
"nonce": nonce
}
params = "&".join([f"{k}={v}" for k, v in params.items()])
async with session.get(f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}") as resp:
if not (login_url := re.findall("url = '(.+?)'", await resp.text())):
return False
async with session.get(login_url[0], allow_redirects=False) as redirect1:
if not (url := redirect1.headers.get("Location")):
return False
async with session.get(url, allow_redirects=False) as redirect2:
if not (url := redirect2.headers.get("Location") + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"):
return False
2023-03-18 18:08:54 -06:00
async with session.get(URL(url, encoded=True)) as login_screen:
2023-03-16 18:56:04 -06:00
if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()):
fw_uid, loaded_str = context[0]
loaded = json.loads(loaded_str)
login_url = login_url[0].replace("/".join(const.AUTH_API.split("/")[:-1]), "")
return fw_uid, loaded, login_url
return False
async def _login(self, session, email, password, fw_uid, loaded, login_url):
2023-02-12 17:41:38 -07:00
data = {
"message": {
"actions": [
{
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": email,
"password": password,
2023-03-16 18:56:04 -06:00
"startUrl": parse.unquote(login_url.split("startURL=")[-1]).split("%3D")[0]
2023-02-12 17:41:38 -07:00
}
}
]
},
"aura.context": {
"mode": "PROD",
2023-03-16 18:56:04 -06:00
"fwuid": fw_uid,
2023-02-12 17:41:38 -07:00
"app": "siteforce:loginApp2",
2023-03-16 18:56:04 -06:00
"loaded": loaded,
2023-02-12 17:41:38 -07:00
"dn": [],
"globals": {},
"uad": False},
2023-03-16 18:56:04 -06:00
"aura.pageURI": login_url,
2023-02-12 17:41:38 -07:00
"aura.token": None}
params = {"r": 3, "other.LightningLoginCustom.login": 1}
async with session.post(
const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params
) as response:
2023-03-16 18:56:04 -06:00
if response.status == 200:
try:
return (await response.json())["events"][0]["attributes"]["values"]["url"]
except json.JSONDecodeError:
pass
_LOGGER.error("Unable to login: %s\n%s", response.status, await response.text())
return ""
2023-02-12 17:41:38 -07:00
2023-03-16 18:56:04 -06:00
async def _get_token(self, session, url):
async with session.get(url) as resp:
2023-02-12 17:41:38 -07:00
if resp.status != 200:
2023-03-16 18:56:04 -06:00
_LOGGER.error("Unable to get token: %s", resp.status)
2023-02-12 17:41:38 -07:00
return False
2023-03-16 18:56:04 -06:00
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text())
async with session.get(url[0]) as resp:
if resp.status != 200:
_LOGGER.error("Unable to get token: %s", resp.status)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with session.get(url) as resp:
2023-02-12 17:41:38 -07:00
if resp.status != 200:
_LOGGER.error("Unable to connect to the login service: %s", resp.status)
return False
2023-03-16 18:56:04 -06:00
text = await resp.text()
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
2023-02-12 17:41:38 -07:00
return True
async def authorize(self, email, password, mobile_id):
2023-03-18 18:08:54 -06:00
headers = {"user-agent": const.USER_AGENT}
async with aiohttp.ClientSession(headers=headers) as session:
if login_site := await self._load_login(session):
fw_uid, loaded, login_url = login_site
else:
return False
2023-03-16 18:56:04 -06:00
if not (url := await self._login(session, email, password, fw_uid, loaded, login_url)):
2023-02-12 17:41:38 -07:00
return False
2023-03-16 18:56:04 -06:00
if not await self._get_token(session, url):
2023-02-12 17:41:38 -07:00
return False
post_headers = {"Content-Type": "application/json", "id-token": self._id_token}
data = {"appVersion": const.APP_VERSION, "mobileId": mobile_id, "osVersion": const.OS_VERSION,
"os": const.OS, "deviceModel": const.DEVICE_MODEL}
async with session.post(f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data) as resp:
try:
json_data = await resp.json()
except json.JSONDecodeError:
_LOGGER.error("No JSON Data after POST: %s", await resp.text())
return False
self._cognito_token = json_data["cognitoUser"]["Token"]
return True