import json
import os
import requests
import base64
import logging
from datetime import datetime
import jwt
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def getLockStatus(token, api_uri, api_path, validationData):
responseDict = {}
try:
headers_dict = {"x-nl3-authorization-token": token, "Content-Type": "application/json"}
responseIPInfo = {}
location = ""
if "." in validationData["ipAddress"] or ":" in validationData["ipAddress"]:
responseIPInfo = requests.get("https://ipinfo.io/" + validationData["ipAddress"] + "?token=[ipinfo_auth_token]").json()
if "city" in responseIPInfo:
location = responseIPInfo["city"] + ", " + responseIPInfo["region"]
data_dict = {
"userIP": validationData["ipAddress"],
"userDevice": "N/A",
"userLocation": location,
"integrationType": "okta-oidc",
"integrationData": responseIPInfo,
}
response = requests.post("".join([api_uri,api_path]), headers=headers_dict, json=data_dict)
responseDict = response.json()
except Exception as e:
responseDict = { "message": str(e) }
return responseDict
def lambda_handler(event, context):
### This is the API Key used to authenticate from the Okta Inline Hook to this API
### Ideally, this API Key would be stored and retrieved from a secrets manager
### and not from an environmental variable
if base64.b64decode(event["headers"]["Authorization"]).decode('utf-8') == os.environ["API_KEY"]:
body = json.loads(event["body"])
username = body["data"]["identity"]["claims"]["preferred_username"]
claims = {
"iss": os.environ["APP_URI"],
"iat": (datetime.utcnow().timestamp() + (-1 * 60)),
"exp": (datetime.utcnow().timestamp() + (5 * 60)),
"aud": os.environ["API_URI"],
"sub": username
}
### Ideally, the Signing Key would be stored and retrieved from a secrets manager
### and not from an environmental variable
decodedDomainToken = base64.b64decode(os.environ["SIGNING_KEY"])
token = jwt.encode(
payload=claims,
key=decodedDomainToken
)
response = getLockStatus(token, os.environ["API_URI"], os.environ["API_PATH"], body["data"]["context"]["request"])
retVal = {
"isBase64Encoded": False,
"statusCode": 200,
}
if response.get("locked", False):
retVal = {
"isBase64Encoded": False,
"statusCode": 200,
"headers": { "Content-Type": "application/json" },
"body": json.dumps({ "error": { "errorSummary": os.environ["LOCKED_MESSAGE"] } }),
}
else:
retVal = {
"isBase64Encoded": False,
"statusCode": 200,
"headers": { "Content-Type": "application/json" },
"body": json.dumps({ "error": { "errorSummary": "Okta Hook Forbidden" } }),
}
return retVal