import json
import os
import requests
import base64
import logging
from datetime import datetime
import jwt
def getLockStatus(token, api_uri, api_path, validationData):
responseDict = {}
try:
headers_dict = {"x-nl3-authorization-token": token, "Content-Type": "application/json"}
data_dict = {
"userIP": validationData["ip"],
"userDevice": validationData["device"],
"userLocation": validationData["location"],
"integrationType": "cognito",
"integrationData": json.loads(validationData["additionalData"])
}
response = requests.post("".join([api_uri,api_path]), headers=headers_dict, json=data_dict)
responseDict = response.json()
except Exception as e:
responseDict = { "message": str(e) }
return responseDict
def protectionCheck (userName, validationData):
claims = {
"iss": os.environ["APP_URI"],
"iat": (datetime.utcnow().timestamp() + (-1 * 60)),
"exp": (datetime.utcnow().timestamp() + (5 * 60)),
"aud": os.environ["API_URI"],
"sub": userName
}
### Ildeally the Signing Key would be stored and retrieved from a secrets manager
### and not an environmental variable
decodedDomainToken = base64.b64decode(os.environ["SIGNING_KEY"])
token = jwt.encode(
payload=claims,
key=decodedDomainToken
)
response = getLockStatus(token, os.environ["API_URI"], os.environ["API_PATH"], validationData)
if response.get("locked", False):
// Code for prohibiting login and returning generic error message
// Code for unlocked or unprotected accounts