174 lines
5.4 KiB
Python
174 lines
5.4 KiB
Python
from u2flib_server.u2f import (
|
|
begin_registration,
|
|
begin_authentication,
|
|
complete_registration,
|
|
complete_authentication,
|
|
)
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.serialization import Encoding
|
|
from django.shortcuts import render
|
|
import simplejson
|
|
|
|
# from django.template.context import RequestContext
|
|
from django.template.context_processors import csrf
|
|
from django.conf import settings
|
|
from django.http import HttpResponse
|
|
from .models import *
|
|
from .views import login
|
|
from .Common import get_redirect_url
|
|
import datetime
|
|
from django.utils import timezone
|
|
|
|
|
|
def recheck(request):
|
|
context = csrf(request)
|
|
context["mode"] = "recheck"
|
|
s = sign(request.user.username)
|
|
request.session["_u2f_challenge_"] = s[0]
|
|
context["token"] = s[1]
|
|
request.session["mfa_recheck"] = True
|
|
return render(request, "U2F/recheck.html", context)
|
|
|
|
|
|
def process_recheck(request):
|
|
x = validate(request, request.user.username)
|
|
if x == True:
|
|
import time
|
|
|
|
request.session["mfa"]["rechecked_at"] = time.time()
|
|
return HttpResponse(
|
|
simplejson.dumps({"recheck": True}), content_type="application/json"
|
|
)
|
|
return x
|
|
|
|
|
|
def check_errors(request, data):
|
|
if "errorCode" in data:
|
|
if data["errorCode"] == 0:
|
|
return True
|
|
if data["errorCode"] == 4:
|
|
return HttpResponse("Invalid Security Key")
|
|
if data["errorCode"] == 1:
|
|
return auth(request)
|
|
return True
|
|
|
|
|
|
def validate(request, username):
|
|
import datetime, random
|
|
|
|
data = simplejson.loads(request.POST["response"])
|
|
|
|
res = check_errors(request, data)
|
|
if res != True:
|
|
return res
|
|
|
|
challenge = request.session.pop("_u2f_challenge_")
|
|
device, c, t = complete_authentication(challenge, data, [settings.U2F_APPID])
|
|
try:
|
|
key = User_Keys.objects.get(
|
|
username=username,
|
|
properties__icontains='"publicKey": "%s"' % device["publicKey"],
|
|
)
|
|
key.last_used = timezone.now()
|
|
key.save()
|
|
mfa = {"verified": True, "method": "U2F", "id": key.id}
|
|
if getattr(settings, "MFA_RECHECK", False):
|
|
mfa["next_check"] = datetime.datetime.timestamp(
|
|
(
|
|
datetime.datetime.now()
|
|
+ datetime.timedelta(
|
|
seconds=random.randint(
|
|
settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX
|
|
)
|
|
)
|
|
)
|
|
)
|
|
request.session["mfa"] = mfa
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
def auth(request):
|
|
context = csrf(request)
|
|
s = sign(request.session["base_username"])
|
|
request.session["_u2f_challenge_"] = s[0]
|
|
context["token"] = s[1]
|
|
context["method"] = {
|
|
"name": getattr(settings, "MFA_RENAME_METHODS", {}).get(
|
|
"U2F", "Classical Security Key"
|
|
)
|
|
}
|
|
return render(request, "U2F/Auth.html", context)
|
|
|
|
|
|
def start(request):
|
|
enroll = begin_registration(settings.U2F_APPID, [])
|
|
request.session["_u2f_enroll_"] = enroll.json
|
|
context = csrf(request)
|
|
context["token"] = simplejson.dumps(enroll.data_for_client)
|
|
context.update(get_redirect_url())
|
|
context["method"] = {
|
|
"name": getattr(settings, "MFA_RENAME_METHODS", {}).get(
|
|
"U2F", "Classical Security Key"
|
|
)
|
|
}
|
|
context["RECOVERY_METHOD"] = getattr(settings, "MFA_RENAME_METHODS", {}).get(
|
|
"RECOVERY", "Recovery codes"
|
|
)
|
|
return render(request, "U2F/Add.html", context)
|
|
|
|
|
|
def bind(request):
|
|
import hashlib
|
|
|
|
enroll = request.session["_u2f_enroll_"]
|
|
data = simplejson.loads(request.POST["response"])
|
|
device, cert = complete_registration(enroll, data, [settings.U2F_APPID])
|
|
cert = x509.load_der_x509_certificate(cert, default_backend())
|
|
cert_hash = hashlib.md5(cert.public_bytes(Encoding.PEM)).hexdigest()
|
|
q = User_Keys.objects.filter(key_type="U2F", properties__icontains=cert_hash)
|
|
if q.exists():
|
|
return HttpResponse(
|
|
"This key is registered before, it can't be registered again."
|
|
)
|
|
User_Keys.objects.filter(username=request.user.username, key_type="U2F").delete()
|
|
uk = User_Keys()
|
|
uk.username = request.user.username
|
|
uk.owned_by_enterprise = getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False)
|
|
uk.properties = {"device": simplejson.loads(device.json), "cert": cert_hash}
|
|
uk.key_type = "U2F"
|
|
uk.save()
|
|
if (
|
|
getattr(settings, "MFA_ENFORCE_RECOVERY_METHOD", False)
|
|
and not User_Keys.objects.filter(
|
|
key_type="RECOVERY", username=request.user.username
|
|
).exists()
|
|
):
|
|
request.session["mfa_reg"] = {
|
|
"method": "U2F",
|
|
"name": getattr(settings, "MFA_RENAME_METHODS", {}).get(
|
|
"U2F", "Classical Security Key"
|
|
),
|
|
}
|
|
return HttpResponse("RECOVERY")
|
|
return HttpResponse("OK")
|
|
|
|
|
|
def sign(username):
|
|
u2f_devices = [
|
|
d.properties["device"]
|
|
for d in User_Keys.objects.filter(username=username, key_type="U2F")
|
|
]
|
|
challenge = begin_authentication(settings.U2F_APPID, u2f_devices)
|
|
return [challenge.json, simplejson.dumps(challenge.data_for_client)]
|
|
|
|
|
|
def verify(request):
|
|
x = validate(request, request.session["base_username"])
|
|
if x == True:
|
|
return login(request)
|
|
else:
|
|
return x
|