diff --git a/mfa/Common.py b/mfa/Common.py index 1504340..7d45373 100644 --- a/mfa/Common.py +++ b/mfa/Common.py @@ -1,19 +1,26 @@ from django.conf import settings from django.core.mail import EmailMessage + try: from django.urls import reverse except: from django.core.urlresolver import reverse -def send(to,subject,body): + +def send(to, subject, body): from_email_address = settings.EMAIL_HOST_USER - if '@' not in from_email_address: + if "@" not in from_email_address: from_email_address = settings.DEFAULT_FROM_EMAIL From = "%s <%s>" % (settings.EMAIL_FROM, from_email_address) - email = EmailMessage(subject,body,From,to) + email = EmailMessage(subject, body, From, to) email.content_subtype = "html" return email.send(False) + def get_redirect_url(): - return {"redirect_html": reverse(getattr(settings, 'MFA_REDIRECT_AFTER_REGISTRATION', 'mfa_home')), - "reg_success_msg":getattr(settings,"MFA_SUCCESS_REGISTRATION_MSG")} + return { + "redirect_html": reverse( + getattr(settings, "MFA_REDIRECT_AFTER_REGISTRATION", "mfa_home") + ), + "reg_success_msg": getattr(settings, "MFA_SUCCESS_REGISTRATION_MSG"), + } diff --git a/mfa/Email.py b/mfa/Email.py index 010d6d5..d5535cd 100644 --- a/mfa/Email.py +++ b/mfa/Email.py @@ -1,66 +1,94 @@ from django.shortcuts import render from django.views.decorators.cache import never_cache from django.template.context_processors import csrf -import datetime,random +import datetime, random from random import randint from .models import * -#from django.template.context import RequestContext + +# from django.template.context import RequestContext from .views import login from .Common import send -def sendEmail(request,username,secret): + +def sendEmail(request, username, secret): """Send Email to the user after rendering `mfa_email_token_template`""" from django.contrib.auth import get_user_model + User = get_user_model() - key = getattr(User, 'USERNAME_FIELD', 'username') + key = getattr(User, "USERNAME_FIELD", "username") kwargs = {key: username} user = User.objects.get(**kwargs) - res=render(request,"mfa_email_token_template.html",{"request":request,"user":user,'otp':secret}) - return send([user.email],"OTP", res.content.decode()) + res = render( + request, + "mfa_email_token_template.html", + {"request": request, "user": user, "otp": secret}, + ) + return send([user.email], "OTP", res.content.decode()) + @never_cache def start(request): """Start adding email as a 2nd factor""" context = csrf(request) if request.method == "POST": - if request.session["email_secret"] == request.POST["otp"]: #if successful - uk=User_Keys() - uk.username=request.user.username - uk.key_type="Email" - uk.enabled=1 + if request.session["email_secret"] == request.POST["otp"]: # if successful + uk = User_Keys() + uk.username = request.user.username + uk.key_type = "Email" + uk.enabled = 1 uk.save() from django.http import HttpResponseRedirect + try: from django.core.urlresolvers import reverse except: from django.urls import reverse - return HttpResponseRedirect(reverse(getattr(settings,'MFA_REDIRECT_AFTER_REGISTRATION','mfa_home'))) + return HttpResponseRedirect( + reverse( + getattr(settings, "MFA_REDIRECT_AFTER_REGISTRATION", "mfa_home") + ) + ) context["invalid"] = True else: - request.session["email_secret"] = str(randint(0,100000)) #generate a random integer + request.session["email_secret"] = str( + randint(0, 100000) + ) # generate a random integer if sendEmail(request, request.user.username, request.session["email_secret"]): context["sent"] = True - return render(request,"Email/Add.html", context) + return render(request, "Email/Add.html", context) + + @never_cache def auth(request): """Authenticating the user by email.""" - context=csrf(request) - if request.method=="POST": - if request.session["email_secret"]==request.POST["otp"].strip(): - uk = User_Keys.objects.get(username=request.session["base_username"], key_type="Email") - mfa = {"verified": True, "method": "Email","id":uk.id} + context = csrf(request) + if request.method == "POST": + if request.session["email_secret"] == request.POST["otp"].strip(): + uk = User_Keys.objects.get( + username=request.session["base_username"], key_type="Email" + ) + mfa = {"verified": True, "method": "Email", "id": uk.id} if getattr(settings, "MFA_RECHECK", False): - mfa["next_check"] = datetime.datetime.timestamp(datetime.datetime.now() + datetime.timedelta( - seconds = random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX))) + mfa["next_check"] = datetime.datetime.timestamp( + datetime.datetime.now() + + datetime.timedelta( + seconds=random.randint( + settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX + ) + ) + ) request.session["mfa"] = mfa from django.utils import timezone - uk.last_used=timezone.now() + + uk.last_used = timezone.now() uk.save() return login(request) - context["invalid"]=True + context["invalid"] = True else: request.session["email_secret"] = str(randint(0, 100000)) - if sendEmail(request, request.session["base_username"], request.session["email_secret"]): + if sendEmail( + request, request.session["base_username"], request.session["email_secret"] + ): context["sent"] = True - return render(request,"Email/Auth.html", context) + return render(request, "Email/Auth.html", context) diff --git a/mfa/FIDO2.py b/mfa/FIDO2.py index 98a6e5e..2a51b4b 100644 --- a/mfa/FIDO2.py +++ b/mfa/FIDO2.py @@ -1,9 +1,12 @@ +import traceback + from fido2.client import ClientData from fido2.server import Fido2Server, PublicKeyCredentialRpEntity from fido2.ctap2 import AttestationObject, AuthenticatorData from django.template.context_processors import csrf from django.views.decorators.csrf import csrf_exempt from django.shortcuts import render + # from django.template.context import RequestContext import simplejson from fido2 import cbor @@ -35,14 +38,19 @@ def getServer(): def begin_registeration(request): """Starts registering a new FIDO Device, called from API""" server = getServer() - registration_data, state = server.register_begin({ - u'id': request.user.username.encode("utf8"), - u'name': (request.user.first_name + " " + request.user.last_name), - u'displayName': request.user.username, - }, getUserCredentials(request.user.username)) - request.session['fido_state'] = state + registration_data, state = server.register_begin( + { + u"id": request.user.username.encode("utf8"), + u"name": (request.user.first_name + " " + request.user.last_name), + u"displayName": request.user.username, + }, + getUserCredentials(request.user.username), + ) + request.session["fido_state"] = state - return HttpResponse(cbor.encode(registration_data), content_type = 'application/octet-stream') + return HttpResponse( + cbor.encode(registration_data), content_type="application/octet-stream" + ) @csrf_exempt @@ -51,29 +59,30 @@ def complete_reg(request): try: data = cbor.decode(request.body) - client_data = ClientData(data['clientDataJSON']) - att_obj = AttestationObject((data['attestationObject'])) + client_data = ClientData(data["clientDataJSON"]) + att_obj = AttestationObject((data["attestationObject"])) server = getServer() auth_data = server.register_complete( - request.session['fido_state'], - client_data, - att_obj + request.session["fido_state"], client_data, att_obj ) encoded = websafe_encode(auth_data.credential_data) uk = User_Keys() uk.username = request.user.username - uk.properties = {"device": encoded, "type": att_obj.fmt, } + uk.properties = { + "device": encoded, + "type": att_obj.fmt, + } uk.owned_by_enterprise = getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False) uk.key_type = "FIDO2" uk.save() - return HttpResponse(simplejson.dumps({'status': 'OK'})) + return HttpResponse(simplejson.dumps({"status": "OK"})) except Exception as exp: - try: - from raven.contrib.django.raven_compat.models import client - client.captureException() - except: - pass - return HttpResponse(simplejson.dumps({'status': 'ERR', "message": "Error on server, please try again later"})) + print(traceback.format_exc()) + return HttpResponse( + simplejson.dumps( + {"status": "ERR", "message": "Error on server, please try again later"} + ) + ) def start(request): @@ -85,8 +94,10 @@ def start(request): def getUserCredentials(username): credentials = [] - for uk in User_Keys.objects.filter(username = username, key_type = "FIDO2"): - credentials.append(AttestedCredentialData(websafe_decode(uk.properties["device"]))) + for uk in User_Keys.objects.filter(username=username, key_type="FIDO2"): + credentials.append( + AttestedCredentialData(websafe_decode(uk.properties["device"])) + ) return credentials @@ -97,10 +108,12 @@ def auth(request): def authenticate_begin(request): server = getServer() - credentials = getUserCredentials(request.session.get("base_username", request.user.username)) + credentials = getUserCredentials( + request.session.get("base_username", request.user.username) + ) auth_data, state = server.authenticate_begin(credentials) - request.session['fido_state'] = state - return HttpResponse(cbor.encode(auth_data), content_type = "application/octet-stream") + request.session["fido_state"] = state + return HttpResponse(cbor.encode(auth_data), content_type="application/octet-stream") @csrf_exempt @@ -111,49 +124,71 @@ def authenticate_complete(request): server = getServer() credentials = getUserCredentials(username) data = cbor.decode(request.body) - credential_id = data['credentialId'] - client_data = ClientData(data['clientDataJSON']) - auth_data = AuthenticatorData(data['authenticatorData']) - signature = data['signature'] + credential_id = data["credentialId"] + client_data = ClientData(data["clientDataJSON"]) + auth_data = AuthenticatorData(data["authenticatorData"]) + signature = data["signature"] try: cred = server.authenticate_complete( - request.session.pop('fido_state'), + request.session.pop("fido_state"), credentials, credential_id, client_data, auth_data, - signature + signature, ) except ValueError: - return HttpResponse(simplejson.dumps({'status': "ERR", - "message": "Wrong challenge received, make sure that this is your security and try again."}), - content_type = "application/json") + return HttpResponse( + simplejson.dumps( + { + "status": "ERR", + "message": "Wrong challenge received, make sure that this is your security and try again.", + } + ), + content_type="application/json", + ) except Exception as excep: - try: - from raven.contrib.django.raven_compat.models import client - client.captureException() - except: - pass - return HttpResponse(simplejson.dumps({'status': "ERR", - "message": excep.message}), - content_type = "application/json") + print(traceback.format_exc()) + return HttpResponse( + simplejson.dumps({"status": "ERR", "message": excep.message}), + content_type="application/json", + ) if request.session.get("mfa_recheck", False): import time + request.session["mfa"]["rechecked_at"] = time.time() - return HttpResponse(simplejson.dumps({'status': "OK"}), - content_type = "application/json") + return HttpResponse( + simplejson.dumps({"status": "OK"}), content_type="application/json" + ) else: import random - keys = User_Keys.objects.filter(username = username, key_type = "FIDO2", enabled = 1) + + keys = User_Keys.objects.filter( + username=username, key_type="FIDO2", enabled=1 + ) for k in keys: - if AttestedCredentialData(websafe_decode(k.properties["device"])).credential_id == cred.credential_id: + if ( + AttestedCredentialData( + websafe_decode(k.properties["device"]) + ).credential_id + == cred.credential_id + ): k.last_used = timezone.now() k.save() - mfa = {"verified": True, "method": "FIDO2", 'id': k.id} + mfa = {"verified": True, "method": "FIDO2", "id": k.id} if getattr(settings, "MFA_RECHECK", False): - mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now() + datetime.timedelta( - seconds = random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX)))) + mfa["next_check"] = datetime.datetime.timestamp( + ( + datetime.datetime.now() + + datetime.timedelta( + seconds=random.randint( + settings.MFA_RECHECK_MIN, + settings.MFA_RECHECK_MAX, + ) + ) + ) + ) request.session["mfa"] = mfa try: authenticated = request.user.is_authenticated @@ -161,11 +196,20 @@ def authenticate_complete(request): authenticated = request.user.is_authenticated() if not authenticated: res = login(request) - if not "location" in res: return reset_cookie(request) - return HttpResponse(simplejson.dumps({'status': "OK", "redirect": res["location"]}), - content_type = "application/json") - return HttpResponse(simplejson.dumps({'status': "OK"}), - content_type = "application/json") + if not "location" in res: + return reset_cookie(request) + return HttpResponse( + simplejson.dumps( + {"status": "OK", "redirect": res["location"]} + ), + content_type="application/json", + ) + return HttpResponse( + simplejson.dumps({"status": "OK"}), + content_type="application/json", + ) except Exception as exp: - return HttpResponse(simplejson.dumps({'status': "ERR", "message": exp.message}), - content_type = "application/json") + return HttpResponse( + simplejson.dumps({"status": "ERR", "message": str(exp)}), + content_type="application/json", + ) diff --git a/mfa/TrustedDevice.py b/mfa/TrustedDevice.py index 94b2136..2b6dc66 100644 --- a/mfa/TrustedDevice.py +++ b/mfa/TrustedDevice.py @@ -8,127 +8,158 @@ from .models import * import user_agents from django.utils import timezone + def id_generator(size=6, chars=string.ascii_uppercase + string.digits): - x=''.join(random.choice(chars) for _ in range(size)) - if not User_Keys.objects.filter(properties__shas="$.key="+x).exists(): return x - else: return id_generator(size,chars) + x = "".join(random.choice(chars) for _ in range(size)) + if not User_Keys.objects.filter(properties__shas="$.key=" + x).exists(): + return x + else: + return id_generator(size, chars) + def getUserAgent(request): - id=id=request.session.get("td_id",None) + id = id = request.session.get("td_id", None) if id: - tk=User_Keys.objects.get(id=id) - if tk.properties.get("user_agent","")!="": + tk = User_Keys.objects.get(id=id) + if tk.properties.get("user_agent", "") != "": ua = user_agents.parse(tk.properties["user_agent"]) - res = render(None, "TrustedDevices/user-agent.html", context={"ua":ua}) + res = render(None, "TrustedDevices/user-agent.html", context={"ua": ua}) return HttpResponse(res) return HttpResponse("") + def trust_device(request): tk = User_Keys.objects.get(id=request.session["td_id"]) - tk.properties["status"]="trusted" + tk.properties["status"] = "trusted" tk.save() del request.session["td_id"] return HttpResponse("OK") + def checkTrusted(request): res = "" - id=request.session.get("td_id","") - if id!="": + id = request.session.get("td_id", "") + if id != "": try: tk = User_Keys.objects.get(id=id) - if tk.properties["status"] == "trusted": res = "OK" + if tk.properties["status"] == "trusted": + res = "OK" except: pass return HttpResponse(res) + def getCookie(request): tk = User_Keys.objects.get(id=request.session["td_id"]) if tk.properties["status"] == "trusted": - context={"added":True} - response = render(request,"TrustedDevices/Done.html", context) + context = {"added": True} + response = render(request, "TrustedDevices/Done.html", context) from datetime import datetime, timedelta + expires = datetime.now() + timedelta(days=180) - tk.expires=expires + tk.expires = expires tk.save() response.set_cookie("deviceid", tk.properties["signature"], expires=expires) return response + def add(request): - context=csrf(request) - if request.method=="GET": - return render(request,"TrustedDevices/Add.html",context) + context = csrf(request) + if request.method == "GET": + return render(request, "TrustedDevices/Add.html", context) else: - key=request.POST["key"].replace("-","").replace(" ","").upper() + key = request.POST["key"].replace("-", "").replace(" ", "").upper() context["username"] = request.POST["username"] context["key"] = request.POST["key"] - trusted_keys=User_Keys.objects.filter(username=request.POST["username"],properties__has="$.key="+key) - cookie=False + trusted_keys = User_Keys.objects.filter( + username=request.POST["username"], properties__has="$.key=" + key + ) + cookie = False if trusted_keys.exists(): - tk=trusted_keys[0] - request.session["td_id"]=tk.id - ua=request.META['HTTP_USER_AGENT'] - agent=user_agents.parse(ua) + tk = trusted_keys[0] + request.session["td_id"] = tk.id + ua = request.META["HTTP_USER_AGENT"] + agent = user_agents.parse(ua) if agent.is_pc: - context["invalid"]="This is a PC, it can't used as a trusted device." + context["invalid"] = "This is a PC, it can't used as a trusted device." else: - tk.properties["user_agent"]=ua + tk.properties["user_agent"] = ua tk.save() - context["success"]=True + context["success"] = True # tk.properties["user_agent"]=ua # tk.save() # context["success"]=True else: - context["invalid"]="The username or key is wrong, please check and try again." + context[ + "invalid" + ] = "The username or key is wrong, please check and try again." + + return render(request, "TrustedDevices/Add.html", context) - return render(request,"TrustedDevices/Add.html", context) def start(request): - if User_Keys.objects.filter(username=request.user.username,key_type="Trusted Device").count()>= 2: - return render(request,"TrustedDevices/start.html",{"not_allowed":True}) - td=None - if not request.session.get("td_id",None): - td=User_Keys() - td.username=request.user.username - td.properties={"key":id_generator(),"status":"adding"} - td.key_type="Trusted Device" + if ( + User_Keys.objects.filter( + username=request.user.username, key_type="Trusted Device" + ).count() + >= 2 + ): + return render(request, "TrustedDevices/start.html", {"not_allowed": True}) + td = None + if not request.session.get("td_id", None): + td = User_Keys() + td.username = request.user.username + td.properties = {"key": id_generator(), "status": "adding"} + td.key_type = "Trusted Device" td.save() - request.session["td_id"]=td.id + request.session["td_id"] = td.id try: - if td==None: td=User_Keys.objects.get(id=request.session["td_id"]) - context={"key":td.properties["key"]} + if td == None: + td = User_Keys.objects.get(id=request.session["td_id"]) + context = {"key": td.properties["key"]} except: del request.session["td_id"] return start(request) - return render(request,"TrustedDevices/start.html",context) + return render(request, "TrustedDevices/start.html", context) + def send_email(request): - body=render(request,"TrustedDevices/email.html",{}).content + body = render(request, "TrustedDevices/email.html", {}).content from .Common import send - e=request.user.email - if e=="": - e=request.session.get("user",{}).get("email","") - if e=="": + + e = request.user.email + if e == "": + e = request.session.get("user", {}).get("email", "") + if e == "": res = "User has no email on the system." - elif send([e],"Add Trusted Device Link",body): - res="Sent Successfully" + elif send([e], "Add Trusted Device Link", body): + res = "Sent Successfully" else: - res="Error occured, please try again later." + res = "Error occured, please try again later." return HttpResponse(res) def verify(request): - if request.COOKIES.get('deviceid',None): + if request.COOKIES.get("deviceid", None): from jose import jwt - json= jwt.decode(request.COOKIES.get('deviceid'),settings.SECRET_KEY) - if json["username"].lower()== request.session['base_username'].lower(): + + json = jwt.decode(request.COOKIES.get("deviceid"), settings.SECRET_KEY) + if json["username"].lower() == request.session["base_username"].lower(): try: - uk = User_Keys.objects.get(username=request.POST["username"].lower(), properties__has="$.key=" + json["key"]) + uk = User_Keys.objects.get( + username=request.POST["username"].lower(), + properties__has="$.key=" + json["key"], + ) if uk.enabled and uk.properties["status"] == "trusted": - uk.last_used=timezone.now() + uk.last_used = timezone.now() uk.save() - request.session["mfa"] = {"verified": True, "method": "Trusted Device","id":uk.id} + request.session["mfa"] = { + "verified": True, + "method": "Trusted Device", + "id": uk.id, + } return True except: return False diff --git a/mfa/U2F.py b/mfa/U2F.py index fe5ea19..2ad5f08 100644 --- a/mfa/U2F.py +++ b/mfa/U2F.py @@ -1,12 +1,16 @@ - -from u2flib_server.u2f import (begin_registration, begin_authentication, - complete_registration, complete_authentication) +from u2flib_server.u2f import ( + begin_registration, + begin_authentication, + complete_registration, + complete_authentication, +) from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import Encoding from django.shortcuts import render import simplejson -#from django.template.context import RequestContext + +# from django.template.context import RequestContext from django.template.context_processors import csrf from django.conf import settings from django.http import HttpResponse @@ -15,97 +19,127 @@ from .views import login import datetime from django.utils import timezone + def recheck(request): context = csrf(request) - context["mode"]="recheck" + context["mode"] = "recheck" s = sign(request.user.username) request.session["_u2f_challenge_"] = s[0] context["token"] = s[1] - request.session["mfa_recheck"]=True - return render(request,"U2F/recheck.html", context) + request.session["mfa_recheck"] = True + return render(request, "U2F/recheck.html", context) + def process_recheck(request): - x=validate(request,request.user.username) - if x==True: + x = validate(request, request.user.username) + if x == True: import time + request.session["mfa"]["rechecked_at"] = time.time() - return HttpResponse(simplejson.dumps({"recheck":True}),content_type="application/json") + return HttpResponse( + simplejson.dumps({"recheck": True}), content_type="application/json" + ) return x + def check_errors(request, data): if "errorCode" in data: - if data["errorCode"] == 0: return True + if data["errorCode"] == 0: + return True if data["errorCode"] == 4: return HttpResponse("Invalid Security Key") if data["errorCode"] == 1: return auth(request) return True -def validate(request,username): + + +def validate(request, username): import datetime, random data = simplejson.loads(request.POST["response"]) - res= check_errors(request,data) - if res!=True: + res = check_errors(request, data) + if res != True: return res - challenge = request.session.pop('_u2f_challenge_') + challenge = request.session.pop("_u2f_challenge_") device, c, t = complete_authentication(challenge, data, [settings.U2F_APPID]) - key=User_Keys.objects.get(username=username,properties__shas="$.device.publicKey=%s"%device["publicKey"]) - key.last_used=timezone.now() + key = User_Keys.objects.get( + username=username, + properties__shas="$.device.publicKey=%s" % device["publicKey"], + ) + key.last_used = timezone.now() key.save() - mfa = {"verified": True, "method": "U2F","id":key.id} + mfa = {"verified": True, "method": "U2F", "id": key.id} if getattr(settings, "MFA_RECHECK", False): - mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now() - + datetime.timedelta( - seconds=random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX)))) + mfa["next_check"] = datetime.datetime.timestamp( + ( + datetime.datetime.now() + + datetime.timedelta( + seconds=random.randint( + settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX + ) + ) + ) + ) request.session["mfa"] = mfa return True -def auth(request): - context=csrf(request) - s=sign(request.session["base_username"]) - request.session["_u2f_challenge_"]=s[0] - context["token"]=s[1] - return render(request,"U2F/Auth.html") +def auth(request): + context = csrf(request) + s = sign(request.session["base_username"]) + request.session["_u2f_challenge_"] = s[0] + context["token"] = s[1] + + return render(request, "U2F/Auth.html") + def start(request): enroll = begin_registration(settings.U2F_APPID, []) - request.session['_u2f_enroll_'] = enroll.json - context=csrf(request) - context["token"]=simplejson.dumps(enroll.data_for_client) + request.session["_u2f_enroll_"] = enroll.json + context = csrf(request) + context["token"] = simplejson.dumps(enroll.data_for_client) context.update(get_redirect_url()) - return render(request,"U2F/Add.html",context) + return render(request, "U2F/Add.html", context) def bind(request): import hashlib - enroll = request.session['_u2f_enroll_'] - data=simplejson.loads(request.POST["response"]) + + enroll = request.session["_u2f_enroll_"] + data = simplejson.loads(request.POST["response"]) device, cert = complete_registration(enroll, data, [settings.U2F_APPID]) cert = x509.load_der_x509_certificate(cert, default_backend()) - cert_hash=hashlib.md5(cert.public_bytes(Encoding.PEM)).hexdigest() - q=User_Keys.objects.filter(key_type="U2F", properties__icontains= cert_hash) + cert_hash = hashlib.md5(cert.public_bytes(Encoding.PEM)).hexdigest() + q = User_Keys.objects.filter(key_type="U2F", properties__icontains=cert_hash) if q.exists(): - return HttpResponse("This key is registered before, it can't be registered again.") - User_Keys.objects.filter(username=request.user.username,key_type="U2F").delete() + return HttpResponse( + "This key is registered before, it can't be registered again." + ) + User_Keys.objects.filter(username=request.user.username, key_type="U2F").delete() uk = User_Keys() uk.username = request.user.username uk.owned_by_enterprise = getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False) - uk.properties = {"device":simplejson.loads(device.json),"cert":cert_hash} + uk.properties = {"device": simplejson.loads(device.json), "cert": cert_hash} uk.key_type = "U2F" uk.save() return HttpResponse("OK") + def sign(username): - u2f_devices=[d.properties["device"] for d in User_Keys.objects.filter(username=username,key_type="U2F")] + u2f_devices = [ + d.properties["device"] + for d in User_Keys.objects.filter(username=username, key_type="U2F") + ] challenge = begin_authentication(settings.U2F_APPID, u2f_devices) - return [challenge.json,simplejson.dumps(challenge.data_for_client)] + return [challenge.json, simplejson.dumps(challenge.data_for_client)] + def verify(request): - x= validate(request,request.session["base_username"]) - if x==True: + x = validate(request, request.session["base_username"]) + if x == True: return login(request) - else: return x + else: + return x diff --git a/mfa/__init__.py b/mfa/__init__.py index d9d106a..8a124bf 100644 --- a/mfa/__init__.py +++ b/mfa/__init__.py @@ -1 +1 @@ -__version__="2.2.0" +__version__ = "2.2.0" diff --git a/mfa/apps.py b/mfa/apps.py index cb5ecca..f9816d4 100644 --- a/mfa/apps.py +++ b/mfa/apps.py @@ -1,4 +1,6 @@ from django.apps import AppConfig + + class myAppNameConfig(AppConfig): - name = 'mfa' - verbose_name = 'A Much Better Name' \ No newline at end of file + name = "mfa" + verbose_name = "A Much Better Name" diff --git a/mfa/helpers.py b/mfa/helpers.py index a61c769..e22e573 100644 --- a/mfa/helpers.py +++ b/mfa/helpers.py @@ -3,30 +3,45 @@ from .models import * from . import TrustedDevice, U2F, FIDO2, totp import simplejson from django.shortcuts import HttpResponse -from mfa.views import verify,goto -def has_mfa(request,username): - if User_Keys.objects.filter(username=username,enabled=1).count()>0: +from mfa.views import verify, goto + + +def has_mfa(request, username): + if User_Keys.objects.filter(username=username, enabled=1).count() > 0: return verify(request, username) return False -def is_mfa(request,ignore_methods=[]): - if request.session.get("mfa",{}).get("verified",False): - if not request.session.get("mfa",{}).get("method",None) in ignore_methods: + +def is_mfa(request, ignore_methods=[]): + if request.session.get("mfa", {}).get("verified", False): + if not request.session.get("mfa", {}).get("method", None) in ignore_methods: return True return False + def recheck(request): - method=request.session.get("mfa",{}).get("method",None) + method = request.session.get("mfa", {}).get("method", None) if not method: - return HttpResponse(simplejson.dumps({"res":False}),content_type="application/json") - if method=="Trusted Device": - return HttpResponse(simplejson.dumps({"res":TrustedDevice.verify(request)}),content_type="application/json") - elif method=="U2F": - return HttpResponse(simplejson.dumps({"html": U2F.recheck(request).content}), content_type="application/json") + return HttpResponse( + simplejson.dumps({"res": False}), content_type="application/json" + ) + if method == "Trusted Device": + return HttpResponse( + simplejson.dumps({"res": TrustedDevice.verify(request)}), + content_type="application/json", + ) + elif method == "U2F": + return HttpResponse( + simplejson.dumps({"html": U2F.recheck(request).content}), + content_type="application/json", + ) elif method == "FIDO2": - return HttpResponse(simplejson.dumps({"html": FIDO2.recheck(request).content}), content_type="application/json") - elif method=="TOTP": - return HttpResponse(simplejson.dumps({"html": totp.recheck(request).content}), content_type="application/json") - - - + return HttpResponse( + simplejson.dumps({"html": FIDO2.recheck(request).content}), + content_type="application/json", + ) + elif method == "TOTP": + return HttpResponse( + simplejson.dumps({"html": totp.recheck(request).content}), + content_type="application/json", + ) diff --git a/mfa/middleware.py b/mfa/middleware.py index 4923416..4acc51f 100644 --- a/mfa/middleware.py +++ b/mfa/middleware.py @@ -2,12 +2,18 @@ import time from django.http import HttpResponseRedirect from django.core.urlresolvers import reverse from django.conf import settings + + def process(request): - next_check=request.session.get('mfa',{}).get("next_check",False) - if not next_check: return None - now=int(time.time()) + next_check = request.session.get("mfa", {}).get("next_check", False) + if not next_check: + return None + now = int(time.time()) if now >= next_check: - method=request.session["mfa"]["method"] + method = request.session["mfa"]["method"] path = request.META["PATH_INFO"] - return HttpResponseRedirect(reverse(method+"_auth")+"?next=%s"%(settings.BASE_URL + path).replace("//", "/")) - return None \ No newline at end of file + return HttpResponseRedirect( + reverse(method + "_auth") + + "?next=%s" % (settings.BASE_URL + path).replace("//", "/") + ) + return None diff --git a/mfa/migrations/0001_initial.py b/mfa/migrations/0001_initial.py index c243f37..bce7c86 100644 --- a/mfa/migrations/0001_initial.py +++ b/mfa/migrations/0001_initial.py @@ -6,17 +6,24 @@ from django.db import models, migrations class Migration(migrations.Migration): - dependencies = [ - ] + dependencies = [] operations = [ migrations.CreateModel( - name='User_Keys', + name="User_Keys", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('username', models.CharField(max_length=50)), - ('secret_key', models.CharField(max_length=15)), - ('added_on', models.DateTimeField(auto_now_add=True)), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ("username", models.CharField(max_length=50)), + ("secret_key", models.CharField(max_length=15)), + ("added_on", models.DateTimeField(auto_now_add=True)), ], ), ] diff --git a/mfa/migrations/0002_user_keys_key_type.py b/mfa/migrations/0002_user_keys_key_type.py index 5cb4aef..6738181 100644 --- a/mfa/migrations/0002_user_keys_key_type.py +++ b/mfa/migrations/0002_user_keys_key_type.py @@ -7,13 +7,13 @@ from django.db import models, migrations class Migration(migrations.Migration): dependencies = [ - ('mfa', '0001_initial'), + ("mfa", "0001_initial"), ] operations = [ migrations.AddField( - model_name='user_keys', - name='key_type', - field=models.CharField(default=b'TOTP', max_length=25), + model_name="user_keys", + name="key_type", + field=models.CharField(default=b"TOTP", max_length=25), ), ] diff --git a/mfa/migrations/0003_auto_20181114_2159.py b/mfa/migrations/0003_auto_20181114_2159.py index 49dfd5e..d9b4797 100644 --- a/mfa/migrations/0003_auto_20181114_2159.py +++ b/mfa/migrations/0003_auto_20181114_2159.py @@ -7,13 +7,13 @@ from django.db import models, migrations class Migration(migrations.Migration): dependencies = [ - ('mfa', '0002_user_keys_key_type'), + ("mfa", "0002_user_keys_key_type"), ] operations = [ migrations.AlterField( - model_name='user_keys', - name='secret_key', + model_name="user_keys", + name="secret_key", field=models.CharField(max_length=32), ), ] diff --git a/mfa/migrations/0004_user_keys_enabled.py b/mfa/migrations/0004_user_keys_enabled.py index b0bcb78..372e265 100644 --- a/mfa/migrations/0004_user_keys_enabled.py +++ b/mfa/migrations/0004_user_keys_enabled.py @@ -7,13 +7,13 @@ from django.db import models, migrations class Migration(migrations.Migration): dependencies = [ - ('mfa', '0003_auto_20181114_2159'), + ("mfa", "0003_auto_20181114_2159"), ] operations = [ migrations.AddField( - model_name='user_keys', - name='enabled', + model_name="user_keys", + name="enabled", field=models.BooleanField(default=True), ), ] diff --git a/mfa/migrations/0005_auto_20181115_2014.py b/mfa/migrations/0005_auto_20181115_2014.py index 8f9b76d..0084cb0 100644 --- a/mfa/migrations/0005_auto_20181115_2014.py +++ b/mfa/migrations/0005_auto_20181115_2014.py @@ -7,24 +7,25 @@ import jsonfield.fields def modify_json(apps, schema_editor): from django.conf import settings + if "mysql" in settings.DATABASES.get("default", {}).get("engine", ""): migrations.RunSQL("alter table mfa_user_keys modify column properties json;") class Migration(migrations.Migration): dependencies = [ - ('mfa', '0004_user_keys_enabled'), + ("mfa", "0004_user_keys_enabled"), ] operations = [ migrations.RemoveField( - model_name='user_keys', - name='secret_key', + model_name="user_keys", + name="secret_key", ), migrations.AddField( - model_name='user_keys', - name='properties', + model_name="user_keys", + name="properties", field=jsonfield.fields.JSONField(null=True), ), - migrations.RunPython(modify_json) + migrations.RunPython(modify_json), ] diff --git a/mfa/migrations/0006_trusted_devices.py b/mfa/migrations/0006_trusted_devices.py index de14a0e..3ae6504 100644 --- a/mfa/migrations/0006_trusted_devices.py +++ b/mfa/migrations/0006_trusted_devices.py @@ -7,21 +7,29 @@ from django.db import models, migrations class Migration(migrations.Migration): dependencies = [ - ('mfa', '0005_auto_20181115_2014'), + ("mfa", "0005_auto_20181115_2014"), ] operations = [ migrations.CreateModel( - name='Trusted_Devices', + name="Trusted_Devices", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('signature', models.CharField(max_length=255)), - ('key', models.CharField(max_length=6)), - ('username', models.CharField(max_length=50)), - ('user_agent', models.CharField(max_length=255)), - ('status', models.CharField(default=b'adding', max_length=255)), - ('added_on', models.DateTimeField(auto_now_add=True)), - ('last_used', models.DateTimeField(default=None, null=True)), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ("signature", models.CharField(max_length=255)), + ("key", models.CharField(max_length=6)), + ("username", models.CharField(max_length=50)), + ("user_agent", models.CharField(max_length=255)), + ("status", models.CharField(default=b"adding", max_length=255)), + ("added_on", models.DateTimeField(auto_now_add=True)), + ("last_used", models.DateTimeField(default=None, null=True)), ], ), ] diff --git a/mfa/migrations/0007_auto_20181230_1549.py b/mfa/migrations/0007_auto_20181230_1549.py index 965bb9e..fe3350f 100644 --- a/mfa/migrations/0007_auto_20181230_1549.py +++ b/mfa/migrations/0007_auto_20181230_1549.py @@ -7,16 +7,16 @@ from django.db import models, migrations class Migration(migrations.Migration): dependencies = [ - ('mfa', '0006_trusted_devices'), + ("mfa", "0006_trusted_devices"), ] operations = [ migrations.DeleteModel( - name='Trusted_Devices', + name="Trusted_Devices", ), migrations.AddField( - model_name='user_keys', - name='expires', + model_name="user_keys", + name="expires", field=models.DateTimeField(default=None, null=True, blank=True), ), ] diff --git a/mfa/migrations/0008_user_keys_last_used.py b/mfa/migrations/0008_user_keys_last_used.py index b555762..76d42f5 100644 --- a/mfa/migrations/0008_user_keys_last_used.py +++ b/mfa/migrations/0008_user_keys_last_used.py @@ -7,13 +7,13 @@ from django.db import models, migrations class Migration(migrations.Migration): dependencies = [ - ('mfa', '0007_auto_20181230_1549'), + ("mfa", "0007_auto_20181230_1549"), ] operations = [ migrations.AddField( - model_name='user_keys', - name='last_used', + model_name="user_keys", + name="last_used", field=models.DateTimeField(default=None, null=True, blank=True), ), ] diff --git a/mfa/migrations/0009_user_keys_owned_by_enterprise.py b/mfa/migrations/0009_user_keys_owned_by_enterprise.py index 9185fcc..adf2bd0 100644 --- a/mfa/migrations/0009_user_keys_owned_by_enterprise.py +++ b/mfa/migrations/0009_user_keys_owned_by_enterprise.py @@ -6,21 +6,23 @@ from django.conf import settings def update_owned_by_enterprise(apps, schema_editor): - user_keys = apps.get_model('mfa', 'user_keys') - user_keys.objects.filter(key_type='FIDO2').update(owned_by_enterprise=getattr(settings,"MFA_OWNED_BY_ENTERPRISE",False)) + user_keys = apps.get_model("mfa", "user_keys") + user_keys.objects.filter(key_type="FIDO2").update( + owned_by_enterprise=getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False) + ) class Migration(migrations.Migration): dependencies = [ - ('mfa', '0008_user_keys_last_used'), + ("mfa", "0008_user_keys_last_used"), ] operations = [ migrations.AddField( - model_name='user_keys', - name='owned_by_enterprise', + model_name="user_keys", + name="owned_by_enterprise", field=models.NullBooleanField(default=None), ), - migrations.RunPython(update_owned_by_enterprise) + migrations.RunPython(update_owned_by_enterprise), ] diff --git a/mfa/migrations/0010_auto_20201110_0557.py b/mfa/migrations/0010_auto_20201110_0557.py index 48190b6..de6913c 100644 --- a/mfa/migrations/0010_auto_20201110_0557.py +++ b/mfa/migrations/0010_auto_20201110_0557.py @@ -6,13 +6,13 @@ from django.db import migrations, models class Migration(migrations.Migration): dependencies = [ - ('mfa', '0009_user_keys_owned_by_enterprise'), + ("mfa", "0009_user_keys_owned_by_enterprise"), ] operations = [ migrations.AlterField( - model_name='user_keys', - name='key_type', - field=models.CharField(default='TOTP', max_length=25), + model_name="user_keys", + name="key_type", + field=models.CharField(default="TOTP", max_length=25), ), ] diff --git a/mfa/migrations/0011_auto_20210530_0622.py b/mfa/migrations/0011_auto_20210530_0622.py index 3d215c2..a270d66 100644 --- a/mfa/migrations/0011_auto_20210530_0622.py +++ b/mfa/migrations/0011_auto_20210530_0622.py @@ -6,13 +6,13 @@ from django.db import migrations, models class Migration(migrations.Migration): dependencies = [ - ('mfa', '0010_auto_20201110_0557'), + ("mfa", "0010_auto_20201110_0557"), ] operations = [ migrations.AlterField( - model_name='user_keys', - name='owned_by_enterprise', + model_name="user_keys", + name="owned_by_enterprise", field=models.BooleanField(blank=True, default=None, null=True), ), ] diff --git a/mfa/models.py b/mfa/models.py index d4eff59..b9dd673 100644 --- a/mfa/models.py +++ b/mfa/models.py @@ -2,31 +2,45 @@ from django.db import models from jsonfield import JSONField from jose import jwt from django.conf import settings -#from jsonLookup import shasLookup, hasLookup + +# from jsonLookup import shasLookup, hasLookup # JSONField.register_lookup(shasLookup) # JSONField.register_lookup(hasLookup) class User_Keys(models.Model): - username=models.CharField(max_length = 50) - properties=JSONField(null = True) - added_on=models.DateTimeField(auto_now_add = True) - key_type=models.CharField(max_length = 25,default = "TOTP") - enabled=models.BooleanField(default=True) - expires=models.DateTimeField(null=True,default=None,blank=True) - last_used=models.DateTimeField(null=True,default=None,blank=True) - owned_by_enterprise=models.BooleanField(default=None,null=True,blank=True) + username = models.CharField(max_length=50) + properties = JSONField(null=True) + added_on = models.DateTimeField(auto_now_add=True) + key_type = models.CharField(max_length=25, default="TOTP") + enabled = models.BooleanField(default=True) + expires = models.DateTimeField(null=True, default=None, blank=True) + last_used = models.DateTimeField(null=True, default=None, blank=True) + owned_by_enterprise = models.BooleanField(default=None, null=True, blank=True) - def save(self, force_insert=False, force_update=False, using=None, update_fields=None): - if self.key_type == "Trusted Device" and self.properties.get("signature","") == "": - self.properties["signature"]= jwt.encode({"username": self.username, "key": self.properties["key"]}, settings.SECRET_KEY) - super(User_Keys, self).save(force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields) + def save( + self, force_insert=False, force_update=False, using=None, update_fields=None + ): + if ( + self.key_type == "Trusted Device" + and self.properties.get("signature", "") == "" + ): + self.properties["signature"] = jwt.encode( + {"username": self.username, "key": self.properties["key"]}, + settings.SECRET_KEY, + ) + super(User_Keys, self).save( + force_insert=force_insert, + force_update=force_update, + using=using, + update_fields=update_fields, + ) def __unicode__(self): - return "%s -- %s"%(self.username,self.key_type) + return "%s -- %s" % (self.username, self.key_type) def __str__(self): return self.__unicode__() class Meta: - app_label='mfa' + app_label = "mfa" diff --git a/mfa/totp.py b/mfa/totp.py index 96617c7..2284ab1 100644 --- a/mfa/totp.py +++ b/mfa/totp.py @@ -11,66 +11,95 @@ from .views import login import datetime from django.utils import timezone import random -def verify_login(request,username,token): - for key in User_Keys.objects.filter(username=username,key_type = "TOTP"): + + +def verify_login(request, username, token): + for key in User_Keys.objects.filter(username=username, key_type="TOTP"): totp = pyotp.TOTP(key.properties["secret_key"]) - if totp.verify(token,valid_window = 30): - key.last_used=timezone.now() + if totp.verify(token, valid_window=30): + key.last_used = timezone.now() key.save() - return [True,key.id] + return [True, key.id] return [False] + def recheck(request): context = csrf(request) - context["mode"]="recheck" + context["mode"] = "recheck" if request.method == "POST": - if verify_login(request,request.user.username, token=request.POST["otp"]): + if verify_login(request, request.user.username, token=request.POST["otp"]): import time + request.session["mfa"]["rechecked_at"] = time.time() - return HttpResponse(simplejson.dumps({"recheck": True}), content_type="application/json") + return HttpResponse( + simplejson.dumps({"recheck": True}), content_type="application/json" + ) else: - return HttpResponse(simplejson.dumps({"recheck": False}), content_type="application/json") - return render(request,"TOTP/recheck.html", context) + return HttpResponse( + simplejson.dumps({"recheck": False}), content_type="application/json" + ) + return render(request, "TOTP/recheck.html", context) + @never_cache def auth(request): - context=csrf(request) - if request.method=="POST": - res=verify_login(request,request.session["base_username"],token = request.POST["otp"]) + context = csrf(request) + if request.method == "POST": + res = verify_login( + request, request.session["base_username"], token=request.POST["otp"] + ) if res[0]: - mfa = {"verified": True, "method": "TOTP","id":res[1]} + mfa = {"verified": True, "method": "TOTP", "id": res[1]} if getattr(settings, "MFA_RECHECK", False): - mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now() - + datetime.timedelta( - seconds=random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX)))) + mfa["next_check"] = datetime.datetime.timestamp( + ( + datetime.datetime.now() + + datetime.timedelta( + seconds=random.randint( + settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX + ) + ) + ) + ) request.session["mfa"] = mfa return login(request) - context["invalid"]=True - return render(request,"TOTP/Auth.html", context) - + context["invalid"] = True + return render(request, "TOTP/Auth.html", context) def getToken(request): - secret_key=pyotp.random_base32() + secret_key = pyotp.random_base32() totp = pyotp.TOTP(secret_key) - request.session["new_mfa_answer"]=totp.now() - return HttpResponse(simplejson.dumps({"qr":pyotp.totp.TOTP(secret_key).provisioning_uri(str(request.user.username), issuer_name = settings.TOKEN_ISSUER_NAME), - "secret_key": secret_key})) + request.session["new_mfa_answer"] = totp.now() + return HttpResponse( + simplejson.dumps( + { + "qr": pyotp.totp.TOTP(secret_key).provisioning_uri( + str(request.user.username), issuer_name=settings.TOKEN_ISSUER_NAME + ), + "secret_key": secret_key, + } + ) + ) + + def verify(request): - answer=request.GET["answer"] - secret_key=request.GET["key"] + answer = request.GET["answer"] + secret_key = request.GET["key"] totp = pyotp.TOTP(secret_key) - if totp.verify(answer,valid_window = 60): - uk=User_Keys() - uk.username=request.user.username - uk.properties={"secret_key":secret_key} - #uk.name="Authenticatior #%s"%User_Keys.objects.filter(username=user.username,type="TOTP") - uk.key_type="TOTP" + if totp.verify(answer, valid_window=60): + uk = User_Keys() + uk.username = request.user.username + uk.properties = {"secret_key": secret_key} + # uk.name="Authenticatior #%s"%User_Keys.objects.filter(username=user.username,type="TOTP") + uk.key_type = "TOTP" uk.save() return HttpResponse("Success") - else: return HttpResponse("Error") + else: + return HttpResponse("Error") + @never_cache def start(request): """Start Adding Time One Time Password (TOTP)""" - return render(request,"TOTP/Add.html",get_redirect_url()) + return render(request, "TOTP/Add.html", get_redirect_url()) diff --git a/mfa/urls.py b/mfa/urls.py index 90e9432..48513c2 100644 --- a/mfa/urls.py +++ b/mfa/urls.py @@ -1,50 +1,46 @@ -from . import views,totp,U2F,TrustedDevice,helpers,FIDO2,Email -#app_name='mfa' +from . import views, totp, U2F, TrustedDevice, helpers, FIDO2, Email + +# app_name='mfa' try: - from django.urls import re_path as url + from django.urls import re_path as url except: - from django.conf.urls import url + from django.conf.urls import url urlpatterns = [ - url(r'totp/start/', totp.start , name="start_new_otop"), - url(r'totp/getToken', totp.getToken , name="get_new_otop"), - url(r'totp/verify', totp.verify, name="verify_otop"), - url(r'totp/auth', totp.auth, name="totp_auth"), - url(r'totp/recheck', totp.recheck, name="totp_recheck"), - - url(r'email/start/', Email.start , name="start_email"), - url(r'email/auth/', Email.auth , name="email_auth"), - - url(r'u2f/$', U2F.start, name="start_u2f"), - url(r'u2f/bind', U2F.bind, name="bind_u2f"), - url(r'u2f/auth', U2F.auth, name="u2f_auth"), - url(r'u2f/process_recheck', U2F.process_recheck, name="u2f_recheck"), - url(r'u2f/verify', U2F.verify, name="u2f_verify"), - - url(r'fido2/$', FIDO2.start, name="start_fido2"), - url(r'fido2/auth', FIDO2.auth, name="fido2_auth"), - url(r'fido2/begin_auth', FIDO2.authenticate_begin, name="fido2_begin_auth"), - url(r'fido2/complete_auth', FIDO2.authenticate_complete, name="fido2_complete_auth"), - url(r'fido2/begin_reg', FIDO2.begin_registeration, name="fido2_begin_reg"), - url(r'fido2/complete_reg', FIDO2.complete_reg, name="fido2_complete_reg"), - url(r'fido2/recheck', FIDO2.recheck, name="fido2_recheck"), - - - url(r'td/$', TrustedDevice.start, name="start_td"), - url(r'td/add', TrustedDevice.add, name="add_td"), - url(r'td/send_link', TrustedDevice.send_email, name="td_sendemail"), - url(r'td/get-ua', TrustedDevice.getUserAgent, name="td_get_useragent"), - url(r'td/trust', TrustedDevice.trust_device, name="td_trust_device"), - url(r'u2f/checkTrusted', TrustedDevice.checkTrusted, name="td_checkTrusted"), - url(r'u2f/secure_device', TrustedDevice.getCookie, name="td_securedevice"), - - url(r'^$', views.index, name="mfa_home"), - url(r'goto/(.*)', views.goto, name="mfa_goto"), - url(r'selct_method', views.show_methods, name="mfa_methods_list"), - url(r'recheck', helpers.recheck, name="mfa_recheck"), - url(r'toggleKey', views.toggleKey, name="toggle_key"), - url(r'delete', views.delKey, name="mfa_delKey"), - url(r'reset', views.reset_cookie, name="mfa_reset_cookie"), - - ] -# print(urlpatterns) \ No newline at end of file + url(r"totp/start/", totp.start, name="start_new_otop"), + url(r"totp/getToken", totp.getToken, name="get_new_otop"), + url(r"totp/verify", totp.verify, name="verify_otop"), + url(r"totp/auth", totp.auth, name="totp_auth"), + url(r"totp/recheck", totp.recheck, name="totp_recheck"), + url(r"email/start/", Email.start, name="start_email"), + url(r"email/auth/", Email.auth, name="email_auth"), + url(r"u2f/$", U2F.start, name="start_u2f"), + url(r"u2f/bind", U2F.bind, name="bind_u2f"), + url(r"u2f/auth", U2F.auth, name="u2f_auth"), + url(r"u2f/process_recheck", U2F.process_recheck, name="u2f_recheck"), + url(r"u2f/verify", U2F.verify, name="u2f_verify"), + url(r"fido2/$", FIDO2.start, name="start_fido2"), + url(r"fido2/auth", FIDO2.auth, name="fido2_auth"), + url(r"fido2/begin_auth", FIDO2.authenticate_begin, name="fido2_begin_auth"), + url( + r"fido2/complete_auth", FIDO2.authenticate_complete, name="fido2_complete_auth" + ), + url(r"fido2/begin_reg", FIDO2.begin_registeration, name="fido2_begin_reg"), + url(r"fido2/complete_reg", FIDO2.complete_reg, name="fido2_complete_reg"), + url(r"fido2/recheck", FIDO2.recheck, name="fido2_recheck"), + url(r"td/$", TrustedDevice.start, name="start_td"), + url(r"td/add", TrustedDevice.add, name="add_td"), + url(r"td/send_link", TrustedDevice.send_email, name="td_sendemail"), + url(r"td/get-ua", TrustedDevice.getUserAgent, name="td_get_useragent"), + url(r"td/trust", TrustedDevice.trust_device, name="td_trust_device"), + url(r"u2f/checkTrusted", TrustedDevice.checkTrusted, name="td_checkTrusted"), + url(r"u2f/secure_device", TrustedDevice.getCookie, name="td_securedevice"), + url(r"^$", views.index, name="mfa_home"), + url(r"goto/(.*)", views.goto, name="mfa_goto"), + url(r"selct_method", views.show_methods, name="mfa_methods_list"), + url(r"recheck", helpers.recheck, name="mfa_recheck"), + url(r"toggleKey", views.toggleKey, name="toggle_key"), + url(r"delete", views.delKey, name="mfa_delKey"), + url(r"reset", views.reset_cookie, name="mfa_reset_cookie"), +] +# print(urlpatterns) diff --git a/mfa/views.py b/mfa/views.py index 9039363..3d3251e 100644 --- a/mfa/views.py +++ b/mfa/views.py @@ -1,6 +1,7 @@ from django.shortcuts import render -from django.http import HttpResponse,HttpResponseRedirect +from django.http import HttpResponse, HttpResponseRedirect from .models import * + try: from django.urls import reverse except: @@ -12,79 +13,94 @@ from . import TrustedDevice from django.contrib.auth.decorators import login_required from user_agents import parse + @login_required def index(request): - keys=[] - context={"keys":User_Keys.objects.filter(username=request.user.username),"UNALLOWED_AUTHEN_METHODS":settings.MFA_UNALLOWED_METHODS - ,"HIDE_DISABLE":getattr(settings,"MFA_HIDE_DISABLE",[])} + keys = [] + context = { + "keys": User_Keys.objects.filter(username=request.user.username), + "UNALLOWED_AUTHEN_METHODS": settings.MFA_UNALLOWED_METHODS, + "HIDE_DISABLE": getattr(settings, "MFA_HIDE_DISABLE", []), + } for k in context["keys"]: - if k.key_type =="Trusted Device" : - setattr(k,"device",parse(k.properties.get("user_agent","-----"))) + if k.key_type == "Trusted Device": + setattr(k, "device", parse(k.properties.get("user_agent", "-----"))) elif k.key_type == "FIDO2": - setattr(k,"device",k.properties.get("type","----")) + setattr(k, "device", k.properties.get("type", "----")) keys.append(k) - context["keys"]=keys - return render(request,"MFA.html",context) + context["keys"] = keys + return render(request, "MFA.html", context) -def verify(request,username): + +def verify(request, username): request.session["base_username"] = username - #request.session["base_password"] = password - keys=User_Keys.objects.filter(username=username,enabled=1) - methods=list(set([k.key_type for k in keys])) + # request.session["base_password"] = password + keys = User_Keys.objects.filter(username=username, enabled=1) + methods = list(set([k.key_type for k in keys])) - if "Trusted Device" in methods and not request.session.get("checked_trusted_device",False): + if "Trusted Device" in methods and not request.session.get( + "checked_trusted_device", False + ): if TrustedDevice.verify(request): return login(request) methods.remove("Trusted Device") request.session["mfa_methods"] = methods - if len(methods)==1: - return HttpResponseRedirect(reverse(methods[0].lower()+"_auth")) + if len(methods) == 1: + return HttpResponseRedirect(reverse(methods[0].lower() + "_auth")) return show_methods(request) + def show_methods(request): - return render(request,"select_mfa_method.html", {}) + return render(request, "select_mfa_method.html", {}) + def reset_cookie(request): - response=HttpResponseRedirect(settings.LOGIN_URL) + response = HttpResponseRedirect(settings.LOGIN_URL) response.delete_cookie("base_username") return response + + def login(request): from django.contrib import auth from django.conf import settings + callable_func = __get_callable_function__(settings.MFA_LOGIN_CALLBACK) - return callable_func(request,username=request.session["base_username"]) + return callable_func(request, username=request.session["base_username"]) @login_required def delKey(request): - key=User_Keys.objects.get(id=request.GET["id"]) + key = User_Keys.objects.get(id=request.GET["id"]) if key.username == request.user.username: key.delete() return HttpResponse("Deleted Successfully") else: return HttpResponse("Error: You own this token so you can't delete it") + def __get_callable_function__(func_path): import importlib - if not '.' in func_path: + + if not "." in func_path: raise Exception("class Name should include modulename.classname") parsed_str = func_path.split(".") - module_name , func_name = ".".join(parsed_str[:-1]) , parsed_str[-1] + module_name, func_name = ".".join(parsed_str[:-1]), parsed_str[-1] imported_module = importlib.import_module(module_name) - callable_func = getattr(imported_module,func_name) + callable_func = getattr(imported_module, func_name) if not callable_func: raise Exception("Module does not have requested function") return callable_func + @login_required def toggleKey(request): - id=request.GET["id"] - q=User_Keys.objects.filter(username=request.user.username, id=id) - if q.count()==1: - key=q[0] + id = request.GET["id"] + q = User_Keys.objects.filter(username=request.user.username, id=id) + if q.count() == 1: + key = q[0] if not key.key_type in settings.MFA_HIDE_DISABLE: - key.enabled=not key.enabled + key.enabled = not key.enabled key.save() return HttpResponse("OK") else: @@ -92,5 +108,6 @@ def toggleKey(request): else: return HttpResponse("Error") -def goto(request,method): - return HttpResponseRedirect(reverse(method.lower()+"_auth")) + +def goto(request, method): + return HttpResponseRedirect(reverse(method.lower() + "_auth"))