Adding Black
This commit is contained in:
106
mfa/recovery.py
106
mfa/recovery.py
@@ -13,105 +13,135 @@ from django.utils import timezone
|
||||
|
||||
USER_FRIENDLY_NAME = "Recovery Codes"
|
||||
|
||||
|
||||
class Hash(PBKDF2PasswordHasher):
|
||||
algorithm = 'pbkdf2_sha256_custom'
|
||||
iterations = getattr(settings,"RECOVERY_ITERATION",1)
|
||||
algorithm = "pbkdf2_sha256_custom"
|
||||
iterations = getattr(settings, "RECOVERY_ITERATION", 1)
|
||||
|
||||
|
||||
def delTokens(request):
|
||||
#Only when all MFA have been deactivated, or to generate new !
|
||||
#We iterate only to clean if any error happend and multiple entry of RECOVERY created for one user
|
||||
for key in User_Keys.objects.filter(username=request.user.username, key_type = "RECOVERY"):
|
||||
# Only when all MFA have been deactivated, or to generate new !
|
||||
# We iterate only to clean if any error happend and multiple entry of RECOVERY created for one user
|
||||
for key in User_Keys.objects.filter(
|
||||
username=request.user.username, key_type="RECOVERY"
|
||||
):
|
||||
if key.username == request.user.username:
|
||||
key.delete()
|
||||
|
||||
|
||||
def randomGen(n):
|
||||
return ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(n))
|
||||
return "".join(
|
||||
random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits)
|
||||
for _ in range(n)
|
||||
)
|
||||
|
||||
|
||||
@never_cache
|
||||
def genTokens(request):
|
||||
#Delete old ones
|
||||
# Delete old ones
|
||||
delTokens(request)
|
||||
#Then generate new one
|
||||
# Then generate new one
|
||||
salt = randomGen(15)
|
||||
hashedKeys = []
|
||||
clearKeys = []
|
||||
for i in range(5):
|
||||
token = randomGen(5) + "-" + randomGen(5)
|
||||
hashedToken = make_password(token, salt, 'pbkdf2_sha256_custom')
|
||||
hashedKeys.append(hashedToken)
|
||||
clearKeys.append(token)
|
||||
uk=User_Keys()
|
||||
token = randomGen(5) + "-" + randomGen(5)
|
||||
hashedToken = make_password(token, salt, "pbkdf2_sha256_custom")
|
||||
hashedKeys.append(hashedToken)
|
||||
clearKeys.append(token)
|
||||
uk = User_Keys()
|
||||
|
||||
uk.username = request.user.username
|
||||
uk.properties={"secret_keys":hashedKeys, "salt":salt}
|
||||
uk.key_type="RECOVERY"
|
||||
uk.properties = {"secret_keys": hashedKeys, "salt": salt}
|
||||
uk.key_type = "RECOVERY"
|
||||
uk.enabled = True
|
||||
uk.save()
|
||||
return HttpResponse(simplejson.dumps({"keys":clearKeys}))
|
||||
return HttpResponse(simplejson.dumps({"keys": clearKeys}))
|
||||
|
||||
|
||||
def verify_login(request, username, token):
|
||||
for key in User_Keys.objects.filter(username=username, key_type = "RECOVERY"):
|
||||
for key in User_Keys.objects.filter(username=username, key_type="RECOVERY"):
|
||||
secret_keys = key.properties["secret_keys"]
|
||||
salt = key.properties["salt"]
|
||||
hashedToken = make_password(token, salt, "pbkdf2_sha256_custom")
|
||||
for i,token in enumerate(secret_keys):
|
||||
for i, token in enumerate(secret_keys):
|
||||
if hashedToken == token:
|
||||
secret_keys.pop(i)
|
||||
key.properties["secret_keys"] = secret_keys
|
||||
key.last_used= timezone.now()
|
||||
key.last_used = timezone.now()
|
||||
key.save()
|
||||
return [True, key.id, len(secret_keys) == 0]
|
||||
return [False]
|
||||
|
||||
|
||||
def getTokenLeft(request):
|
||||
uk = User_Keys.objects.filter(username=request.user.username, key_type = "RECOVERY")
|
||||
keyLeft=0
|
||||
uk = User_Keys.objects.filter(username=request.user.username, key_type="RECOVERY")
|
||||
keyLeft = 0
|
||||
for key in uk:
|
||||
keyLeft += len(key.properties["secret_keys"])
|
||||
return HttpResponse(simplejson.dumps({"left":keyLeft}))
|
||||
return HttpResponse(simplejson.dumps({"left": keyLeft}))
|
||||
|
||||
|
||||
def recheck(request):
|
||||
context = csrf(request)
|
||||
context["mode"]="recheck"
|
||||
context["mode"] = "recheck"
|
||||
if request.method == "POST":
|
||||
if verify_login(request,request.user.username, token=request.POST["recovery"])[0]:
|
||||
if verify_login(request, request.user.username, token=request.POST["recovery"])[
|
||||
0
|
||||
]:
|
||||
import time
|
||||
|
||||
request.session["mfa"]["rechecked_at"] = time.time()
|
||||
return HttpResponse(simplejson.dumps({"recheck": True}), content_type="application/json")
|
||||
return HttpResponse(
|
||||
simplejson.dumps({"recheck": True}), content_type="application/json"
|
||||
)
|
||||
else:
|
||||
return HttpResponse(simplejson.dumps({"recheck": False}), content_type="application/json")
|
||||
return render(request,"RECOVERY/recheck.html", context)
|
||||
return HttpResponse(
|
||||
simplejson.dumps({"recheck": False}), content_type="application/json"
|
||||
)
|
||||
return render(request, "RECOVERY/recheck.html", context)
|
||||
|
||||
|
||||
@never_cache
|
||||
def auth(request):
|
||||
from .views import login
|
||||
context=csrf(request)
|
||||
if request.method=="POST":
|
||||
|
||||
context = csrf(request)
|
||||
if request.method == "POST":
|
||||
tokenLength = len(request.POST["recovery"])
|
||||
if tokenLength == 11 and "RECOVERY" not in settings.MFA_UNALLOWED_METHODS:
|
||||
#Backup code check
|
||||
resBackup=verify_login(request, request.session["base_username"], token=request.POST["recovery"])
|
||||
# Backup code check
|
||||
resBackup = verify_login(
|
||||
request,
|
||||
request.session["base_username"],
|
||||
token=request.POST["recovery"],
|
||||
)
|
||||
if resBackup[0]:
|
||||
mfa = {"verified": True, "method": "RECOVERY","id":resBackup[1], "lastBackup":resBackup[2]}
|
||||
mfa = {
|
||||
"verified": True,
|
||||
"method": "RECOVERY",
|
||||
"id": resBackup[1],
|
||||
"lastBackup": resBackup[2],
|
||||
}
|
||||
# if getattr(settings, "MFA_RECHECK", False):
|
||||
# mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now()
|
||||
# + datetime.timedelta(
|
||||
# seconds=random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX))))
|
||||
request.session["mfa"] = mfa
|
||||
if resBackup[2]:
|
||||
#If the last bakup code has just been used, we return a response insead of redirecting to login
|
||||
# If the last bakup code has just been used, we return a response insead of redirecting to login
|
||||
context["lastBackup"] = True
|
||||
return render(request,"RECOVERY/Auth.html", context)
|
||||
return render(request, "RECOVERY/Auth.html", context)
|
||||
return login(request)
|
||||
context["invalid"]=True
|
||||
context["invalid"] = True
|
||||
|
||||
elif request.method=="GET":
|
||||
elif request.method == "GET":
|
||||
mfa = request.session.get("mfa")
|
||||
if mfa and mfa["verified"] and mfa["lastBackup"]:
|
||||
return login(request)
|
||||
|
||||
return render(request,"RECOVERY/Auth.html", context)
|
||||
return render(request, "RECOVERY/Auth.html", context)
|
||||
|
||||
|
||||
@never_cache
|
||||
def start(request):
|
||||
@@ -119,4 +149,4 @@ def start(request):
|
||||
context = get_redirect_url()
|
||||
if "mfa_reg" in request.session:
|
||||
context["mfa_redirect"] = request.session["mfa_reg"]["name"]
|
||||
return render(request,"RECOVERY/Add.html",context)
|
||||
return render(request, "RECOVERY/Add.html", context)
|
||||
|
||||
Reference in New Issue
Block a user