Applied Black

This commit is contained in:
Mohamed ElKalioby
2021-06-22 17:12:12 +03:00
parent 7a154cfa34
commit 9c126f06b5
24 changed files with 630 additions and 389 deletions

View File

@@ -1,19 +1,26 @@
from django.conf import settings
from django.core.mail import EmailMessage
try:
from django.urls import reverse
except:
from django.core.urlresolver import reverse
def send(to, subject, body):
from_email_address = settings.EMAIL_HOST_USER
if '@' not in from_email_address:
if "@" not in from_email_address:
from_email_address = settings.DEFAULT_FROM_EMAIL
From = "%s <%s>" % (settings.EMAIL_FROM, from_email_address)
email = EmailMessage(subject, body, From, to)
email.content_subtype = "html"
return email.send(False)
def get_redirect_url():
return {"redirect_html": reverse(getattr(settings, 'MFA_REDIRECT_AFTER_REGISTRATION', 'mfa_home')),
"reg_success_msg":getattr(settings,"MFA_SUCCESS_REGISTRATION_MSG")}
return {
"redirect_html": reverse(
getattr(settings, "MFA_REDIRECT_AFTER_REGISTRATION", "mfa_home")
),
"reg_success_msg": getattr(settings, "MFA_SUCCESS_REGISTRATION_MSG"),
}

View File

@@ -4,20 +4,28 @@ from django.template.context_processors import csrf
import datetime, random
from random import randint
from .models import *
# from django.template.context import RequestContext
from .views import login
from .Common import send
def sendEmail(request, username, secret):
"""Send Email to the user after rendering `mfa_email_token_template`"""
from django.contrib.auth import get_user_model
User = get_user_model()
key = getattr(User, 'USERNAME_FIELD', 'username')
key = getattr(User, "USERNAME_FIELD", "username")
kwargs = {key: username}
user = User.objects.get(**kwargs)
res=render(request,"mfa_email_token_template.html",{"request":request,"user":user,'otp':secret})
res = render(
request,
"mfa_email_token_template.html",
{"request": request, "user": user, "otp": secret},
)
return send([user.email], "OTP", res.content.decode())
@never_cache
def start(request):
"""Start adding email as a 2nd factor"""
@@ -30,37 +38,57 @@ def start(request):
uk.enabled = 1
uk.save()
from django.http import HttpResponseRedirect
try:
from django.core.urlresolvers import reverse
except:
from django.urls import reverse
return HttpResponseRedirect(reverse(getattr(settings,'MFA_REDIRECT_AFTER_REGISTRATION','mfa_home')))
return HttpResponseRedirect(
reverse(
getattr(settings, "MFA_REDIRECT_AFTER_REGISTRATION", "mfa_home")
)
)
context["invalid"] = True
else:
request.session["email_secret"] = str(randint(0,100000)) #generate a random integer
request.session["email_secret"] = str(
randint(0, 100000)
) # generate a random integer
if sendEmail(request, request.user.username, request.session["email_secret"]):
context["sent"] = True
return render(request, "Email/Add.html", context)
@never_cache
def auth(request):
"""Authenticating the user by email."""
context = csrf(request)
if request.method == "POST":
if request.session["email_secret"] == request.POST["otp"].strip():
uk = User_Keys.objects.get(username=request.session["base_username"], key_type="Email")
uk = User_Keys.objects.get(
username=request.session["base_username"], key_type="Email"
)
mfa = {"verified": True, "method": "Email", "id": uk.id}
if getattr(settings, "MFA_RECHECK", False):
mfa["next_check"] = datetime.datetime.timestamp(datetime.datetime.now() + datetime.timedelta(
seconds = random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX)))
mfa["next_check"] = datetime.datetime.timestamp(
datetime.datetime.now()
+ datetime.timedelta(
seconds=random.randint(
settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX
)
)
)
request.session["mfa"] = mfa
from django.utils import timezone
uk.last_used = timezone.now()
uk.save()
return login(request)
context["invalid"] = True
else:
request.session["email_secret"] = str(randint(0, 100000))
if sendEmail(request, request.session["base_username"], request.session["email_secret"]):
if sendEmail(
request, request.session["base_username"], request.session["email_secret"]
):
context["sent"] = True
return render(request, "Email/Auth.html", context)

View File

@@ -1,9 +1,12 @@
import traceback
from fido2.client import ClientData
from fido2.server import Fido2Server, PublicKeyCredentialRpEntity
from fido2.ctap2 import AttestationObject, AuthenticatorData
from django.template.context_processors import csrf
from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import render
# from django.template.context import RequestContext
import simplejson
from fido2 import cbor
@@ -35,14 +38,19 @@ def getServer():
def begin_registeration(request):
"""Starts registering a new FIDO Device, called from API"""
server = getServer()
registration_data, state = server.register_begin({
u'id': request.user.username.encode("utf8"),
u'name': (request.user.first_name + " " + request.user.last_name),
u'displayName': request.user.username,
}, getUserCredentials(request.user.username))
request.session['fido_state'] = state
registration_data, state = server.register_begin(
{
u"id": request.user.username.encode("utf8"),
u"name": (request.user.first_name + " " + request.user.last_name),
u"displayName": request.user.username,
},
getUserCredentials(request.user.username),
)
request.session["fido_state"] = state
return HttpResponse(cbor.encode(registration_data), content_type = 'application/octet-stream')
return HttpResponse(
cbor.encode(registration_data), content_type="application/octet-stream"
)
@csrf_exempt
@@ -51,29 +59,30 @@ def complete_reg(request):
try:
data = cbor.decode(request.body)
client_data = ClientData(data['clientDataJSON'])
att_obj = AttestationObject((data['attestationObject']))
client_data = ClientData(data["clientDataJSON"])
att_obj = AttestationObject((data["attestationObject"]))
server = getServer()
auth_data = server.register_complete(
request.session['fido_state'],
client_data,
att_obj
request.session["fido_state"], client_data, att_obj
)
encoded = websafe_encode(auth_data.credential_data)
uk = User_Keys()
uk.username = request.user.username
uk.properties = {"device": encoded, "type": att_obj.fmt, }
uk.properties = {
"device": encoded,
"type": att_obj.fmt,
}
uk.owned_by_enterprise = getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False)
uk.key_type = "FIDO2"
uk.save()
return HttpResponse(simplejson.dumps({'status': 'OK'}))
return HttpResponse(simplejson.dumps({"status": "OK"}))
except Exception as exp:
try:
from raven.contrib.django.raven_compat.models import client
client.captureException()
except:
pass
return HttpResponse(simplejson.dumps({'status': 'ERR', "message": "Error on server, please try again later"}))
print(traceback.format_exc())
return HttpResponse(
simplejson.dumps(
{"status": "ERR", "message": "Error on server, please try again later"}
)
)
def start(request):
@@ -86,7 +95,9 @@ def start(request):
def getUserCredentials(username):
credentials = []
for uk in User_Keys.objects.filter(username=username, key_type="FIDO2"):
credentials.append(AttestedCredentialData(websafe_decode(uk.properties["device"])))
credentials.append(
AttestedCredentialData(websafe_decode(uk.properties["device"]))
)
return credentials
@@ -97,9 +108,11 @@ def auth(request):
def authenticate_begin(request):
server = getServer()
credentials = getUserCredentials(request.session.get("base_username", request.user.username))
credentials = getUserCredentials(
request.session.get("base_username", request.user.username)
)
auth_data, state = server.authenticate_begin(credentials)
request.session['fido_state'] = state
request.session["fido_state"] = state
return HttpResponse(cbor.encode(auth_data), content_type="application/octet-stream")
@@ -111,49 +124,71 @@ def authenticate_complete(request):
server = getServer()
credentials = getUserCredentials(username)
data = cbor.decode(request.body)
credential_id = data['credentialId']
client_data = ClientData(data['clientDataJSON'])
auth_data = AuthenticatorData(data['authenticatorData'])
signature = data['signature']
credential_id = data["credentialId"]
client_data = ClientData(data["clientDataJSON"])
auth_data = AuthenticatorData(data["authenticatorData"])
signature = data["signature"]
try:
cred = server.authenticate_complete(
request.session.pop('fido_state'),
request.session.pop("fido_state"),
credentials,
credential_id,
client_data,
auth_data,
signature
signature,
)
except ValueError:
return HttpResponse(simplejson.dumps({'status': "ERR",
"message": "Wrong challenge received, make sure that this is your security and try again."}),
content_type = "application/json")
return HttpResponse(
simplejson.dumps(
{
"status": "ERR",
"message": "Wrong challenge received, make sure that this is your security and try again.",
}
),
content_type="application/json",
)
except Exception as excep:
try:
from raven.contrib.django.raven_compat.models import client
client.captureException()
except:
pass
return HttpResponse(simplejson.dumps({'status': "ERR",
"message": excep.message}),
content_type = "application/json")
print(traceback.format_exc())
return HttpResponse(
simplejson.dumps({"status": "ERR", "message": excep.message}),
content_type="application/json",
)
if request.session.get("mfa_recheck", False):
import time
request.session["mfa"]["rechecked_at"] = time.time()
return HttpResponse(simplejson.dumps({'status': "OK"}),
content_type = "application/json")
return HttpResponse(
simplejson.dumps({"status": "OK"}), content_type="application/json"
)
else:
import random
keys = User_Keys.objects.filter(username = username, key_type = "FIDO2", enabled = 1)
keys = User_Keys.objects.filter(
username=username, key_type="FIDO2", enabled=1
)
for k in keys:
if AttestedCredentialData(websafe_decode(k.properties["device"])).credential_id == cred.credential_id:
if (
AttestedCredentialData(
websafe_decode(k.properties["device"])
).credential_id
== cred.credential_id
):
k.last_used = timezone.now()
k.save()
mfa = {"verified": True, "method": "FIDO2", 'id': k.id}
mfa = {"verified": True, "method": "FIDO2", "id": k.id}
if getattr(settings, "MFA_RECHECK", False):
mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now() + datetime.timedelta(
seconds = random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX))))
mfa["next_check"] = datetime.datetime.timestamp(
(
datetime.datetime.now()
+ datetime.timedelta(
seconds=random.randint(
settings.MFA_RECHECK_MIN,
settings.MFA_RECHECK_MAX,
)
)
)
)
request.session["mfa"] = mfa
try:
authenticated = request.user.is_authenticated
@@ -161,11 +196,20 @@ def authenticate_complete(request):
authenticated = request.user.is_authenticated()
if not authenticated:
res = login(request)
if not "location" in res: return reset_cookie(request)
return HttpResponse(simplejson.dumps({'status': "OK", "redirect": res["location"]}),
content_type = "application/json")
return HttpResponse(simplejson.dumps({'status': "OK"}),
content_type = "application/json")
if not "location" in res:
return reset_cookie(request)
return HttpResponse(
simplejson.dumps(
{"status": "OK", "redirect": res["location"]}
),
content_type="application/json",
)
return HttpResponse(
simplejson.dumps({"status": "OK"}),
content_type="application/json",
)
except Exception as exp:
return HttpResponse(simplejson.dumps({'status': "ERR", "message": exp.message}),
content_type = "application/json")
return HttpResponse(
simplejson.dumps({"status": "ERR", "message": str(exp)}),
content_type="application/json",
)

View File

@@ -8,10 +8,14 @@ from .models import *
import user_agents
from django.utils import timezone
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
x=''.join(random.choice(chars) for _ in range(size))
if not User_Keys.objects.filter(properties__shas="$.key="+x).exists(): return x
else: return id_generator(size,chars)
x = "".join(random.choice(chars) for _ in range(size))
if not User_Keys.objects.filter(properties__shas="$.key=" + x).exists():
return x
else:
return id_generator(size, chars)
def getUserAgent(request):
id = id = request.session.get("td_id", None)
@@ -23,6 +27,7 @@ def getUserAgent(request):
return HttpResponse(res)
return HttpResponse("")
def trust_device(request):
tk = User_Keys.objects.get(id=request.session["td_id"])
tk.properties["status"] = "trusted"
@@ -30,17 +35,20 @@ def trust_device(request):
del request.session["td_id"]
return HttpResponse("OK")
def checkTrusted(request):
res = ""
id = request.session.get("td_id", "")
if id != "":
try:
tk = User_Keys.objects.get(id=id)
if tk.properties["status"] == "trusted": res = "OK"
if tk.properties["status"] == "trusted":
res = "OK"
except:
pass
return HttpResponse(res)
def getCookie(request):
tk = User_Keys.objects.get(id=request.session["td_id"])
@@ -48,12 +56,14 @@ def getCookie(request):
context = {"added": True}
response = render(request, "TrustedDevices/Done.html", context)
from datetime import datetime, timedelta
expires = datetime.now() + timedelta(days=180)
tk.expires = expires
tk.save()
response.set_cookie("deviceid", tk.properties["signature"], expires=expires)
return response
def add(request):
context = csrf(request)
if request.method == "GET":
@@ -62,12 +72,14 @@ def add(request):
key = request.POST["key"].replace("-", "").replace(" ", "").upper()
context["username"] = request.POST["username"]
context["key"] = request.POST["key"]
trusted_keys=User_Keys.objects.filter(username=request.POST["username"],properties__has="$.key="+key)
trusted_keys = User_Keys.objects.filter(
username=request.POST["username"], properties__has="$.key=" + key
)
cookie = False
if trusted_keys.exists():
tk = trusted_keys[0]
request.session["td_id"] = tk.id
ua=request.META['HTTP_USER_AGENT']
ua = request.META["HTTP_USER_AGENT"]
agent = user_agents.parse(ua)
if agent.is_pc:
context["invalid"] = "This is a PC, it can't used as a trusted device."
@@ -80,12 +92,20 @@ def add(request):
# context["success"]=True
else:
context["invalid"]="The username or key is wrong, please check and try again."
context[
"invalid"
] = "The username or key is wrong, please check and try again."
return render(request, "TrustedDevices/Add.html", context)
def start(request):
if User_Keys.objects.filter(username=request.user.username,key_type="Trusted Device").count()>= 2:
if (
User_Keys.objects.filter(
username=request.user.username, key_type="Trusted Device"
).count()
>= 2
):
return render(request, "TrustedDevices/start.html", {"not_allowed": True})
td = None
if not request.session.get("td_id", None):
@@ -96,16 +116,19 @@ def start(request):
td.save()
request.session["td_id"] = td.id
try:
if td==None: td=User_Keys.objects.get(id=request.session["td_id"])
if td == None:
td = User_Keys.objects.get(id=request.session["td_id"])
context = {"key": td.properties["key"]}
except:
del request.session["td_id"]
return start(request)
return render(request, "TrustedDevices/start.html", context)
def send_email(request):
body = render(request, "TrustedDevices/email.html", {}).content
from .Common import send
e = request.user.email
if e == "":
e = request.session.get("user", {}).get("email", "")
@@ -119,16 +142,24 @@ def send_email(request):
def verify(request):
if request.COOKIES.get('deviceid',None):
if request.COOKIES.get("deviceid", None):
from jose import jwt
json= jwt.decode(request.COOKIES.get('deviceid'),settings.SECRET_KEY)
if json["username"].lower()== request.session['base_username'].lower():
json = jwt.decode(request.COOKIES.get("deviceid"), settings.SECRET_KEY)
if json["username"].lower() == request.session["base_username"].lower():
try:
uk = User_Keys.objects.get(username=request.POST["username"].lower(), properties__has="$.key=" + json["key"])
uk = User_Keys.objects.get(
username=request.POST["username"].lower(),
properties__has="$.key=" + json["key"],
)
if uk.enabled and uk.properties["status"] == "trusted":
uk.last_used = timezone.now()
uk.save()
request.session["mfa"] = {"verified": True, "method": "Trusted Device","id":uk.id}
request.session["mfa"] = {
"verified": True,
"method": "Trusted Device",
"id": uk.id,
}
return True
except:
return False

View File

@@ -1,11 +1,15 @@
from u2flib_server.u2f import (begin_registration, begin_authentication,
complete_registration, complete_authentication)
from u2flib_server.u2f import (
begin_registration,
begin_authentication,
complete_registration,
complete_authentication,
)
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from django.shortcuts import render
import simplejson
# from django.template.context import RequestContext
from django.template.context_processors import csrf
from django.conf import settings
@@ -15,6 +19,7 @@ from .views import login
import datetime
from django.utils import timezone
def recheck(request):
context = csrf(request)
context["mode"] = "recheck"
@@ -24,22 +29,30 @@ def recheck(request):
request.session["mfa_recheck"] = True
return render(request, "U2F/recheck.html", context)
def process_recheck(request):
x = validate(request, request.user.username)
if x == True:
import time
request.session["mfa"]["rechecked_at"] = time.time()
return HttpResponse(simplejson.dumps({"recheck":True}),content_type="application/json")
return HttpResponse(
simplejson.dumps({"recheck": True}), content_type="application/json"
)
return x
def check_errors(request, data):
if "errorCode" in data:
if data["errorCode"] == 0: return True
if data["errorCode"] == 0:
return True
if data["errorCode"] == 4:
return HttpResponse("Invalid Security Key")
if data["errorCode"] == 1:
return auth(request)
return True
def validate(request, username):
import datetime, random
@@ -49,20 +62,31 @@ def validate(request,username):
if res != True:
return res
challenge = request.session.pop('_u2f_challenge_')
challenge = request.session.pop("_u2f_challenge_")
device, c, t = complete_authentication(challenge, data, [settings.U2F_APPID])
key=User_Keys.objects.get(username=username,properties__shas="$.device.publicKey=%s"%device["publicKey"])
key = User_Keys.objects.get(
username=username,
properties__shas="$.device.publicKey=%s" % device["publicKey"],
)
key.last_used = timezone.now()
key.save()
mfa = {"verified": True, "method": "U2F", "id": key.id}
if getattr(settings, "MFA_RECHECK", False):
mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now()
mfa["next_check"] = datetime.datetime.timestamp(
(
datetime.datetime.now()
+ datetime.timedelta(
seconds=random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX))))
seconds=random.randint(
settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX
)
)
)
)
request.session["mfa"] = mfa
return True
def auth(request):
context = csrf(request)
s = sign(request.session["base_username"])
@@ -71,9 +95,10 @@ def auth(request):
return render(request, "U2F/Auth.html")
def start(request):
enroll = begin_registration(settings.U2F_APPID, [])
request.session['_u2f_enroll_'] = enroll.json
request.session["_u2f_enroll_"] = enroll.json
context = csrf(request)
context["token"] = simplejson.dumps(enroll.data_for_client)
context.update(get_redirect_url())
@@ -82,14 +107,17 @@ def start(request):
def bind(request):
import hashlib
enroll = request.session['_u2f_enroll_']
enroll = request.session["_u2f_enroll_"]
data = simplejson.loads(request.POST["response"])
device, cert = complete_registration(enroll, data, [settings.U2F_APPID])
cert = x509.load_der_x509_certificate(cert, default_backend())
cert_hash = hashlib.md5(cert.public_bytes(Encoding.PEM)).hexdigest()
q = User_Keys.objects.filter(key_type="U2F", properties__icontains=cert_hash)
if q.exists():
return HttpResponse("This key is registered before, it can't be registered again.")
return HttpResponse(
"This key is registered before, it can't be registered again."
)
User_Keys.objects.filter(username=request.user.username, key_type="U2F").delete()
uk = User_Keys()
uk.username = request.user.username
@@ -99,13 +127,19 @@ def bind(request):
uk.save()
return HttpResponse("OK")
def sign(username):
u2f_devices=[d.properties["device"] for d in User_Keys.objects.filter(username=username,key_type="U2F")]
u2f_devices = [
d.properties["device"]
for d in User_Keys.objects.filter(username=username, key_type="U2F")
]
challenge = begin_authentication(settings.U2F_APPID, u2f_devices)
return [challenge.json, simplejson.dumps(challenge.data_for_client)]
def verify(request):
x = validate(request, request.session["base_username"])
if x == True:
return login(request)
else: return x
else:
return x

View File

@@ -1,4 +1,6 @@
from django.apps import AppConfig
class myAppNameConfig(AppConfig):
name = 'mfa'
verbose_name = 'A Much Better Name'
name = "mfa"
verbose_name = "A Much Better Name"

View File

@@ -4,29 +4,44 @@ from . import TrustedDevice, U2F, FIDO2, totp
import simplejson
from django.shortcuts import HttpResponse
from mfa.views import verify, goto
def has_mfa(request, username):
if User_Keys.objects.filter(username=username, enabled=1).count() > 0:
return verify(request, username)
return False
def is_mfa(request, ignore_methods=[]):
if request.session.get("mfa", {}).get("verified", False):
if not request.session.get("mfa", {}).get("method", None) in ignore_methods:
return True
return False
def recheck(request):
method = request.session.get("mfa", {}).get("method", None)
if not method:
return HttpResponse(simplejson.dumps({"res":False}),content_type="application/json")
return HttpResponse(
simplejson.dumps({"res": False}), content_type="application/json"
)
if method == "Trusted Device":
return HttpResponse(simplejson.dumps({"res":TrustedDevice.verify(request)}),content_type="application/json")
return HttpResponse(
simplejson.dumps({"res": TrustedDevice.verify(request)}),
content_type="application/json",
)
elif method == "U2F":
return HttpResponse(simplejson.dumps({"html": U2F.recheck(request).content}), content_type="application/json")
return HttpResponse(
simplejson.dumps({"html": U2F.recheck(request).content}),
content_type="application/json",
)
elif method == "FIDO2":
return HttpResponse(simplejson.dumps({"html": FIDO2.recheck(request).content}), content_type="application/json")
return HttpResponse(
simplejson.dumps({"html": FIDO2.recheck(request).content}),
content_type="application/json",
)
elif method == "TOTP":
return HttpResponse(simplejson.dumps({"html": totp.recheck(request).content}), content_type="application/json")
return HttpResponse(
simplejson.dumps({"html": totp.recheck(request).content}),
content_type="application/json",
)

View File

@@ -2,12 +2,18 @@ import time
from django.http import HttpResponseRedirect
from django.core.urlresolvers import reverse
from django.conf import settings
def process(request):
next_check=request.session.get('mfa',{}).get("next_check",False)
if not next_check: return None
next_check = request.session.get("mfa", {}).get("next_check", False)
if not next_check:
return None
now = int(time.time())
if now >= next_check:
method = request.session["mfa"]["method"]
path = request.META["PATH_INFO"]
return HttpResponseRedirect(reverse(method+"_auth")+"?next=%s"%(settings.BASE_URL + path).replace("//", "/"))
return HttpResponseRedirect(
reverse(method + "_auth")
+ "?next=%s" % (settings.BASE_URL + path).replace("//", "/")
)
return None

View File

@@ -6,17 +6,24 @@ from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
]
dependencies = []
operations = [
migrations.CreateModel(
name='User_Keys',
name="User_Keys",
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('username', models.CharField(max_length=50)),
('secret_key', models.CharField(max_length=15)),
('added_on', models.DateTimeField(auto_now_add=True)),
(
"id",
models.AutoField(
verbose_name="ID",
serialize=False,
auto_created=True,
primary_key=True,
),
),
("username", models.CharField(max_length=50)),
("secret_key", models.CharField(max_length=15)),
("added_on", models.DateTimeField(auto_now_add=True)),
],
),
]

View File

@@ -7,13 +7,13 @@ from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('mfa', '0001_initial'),
("mfa", "0001_initial"),
]
operations = [
migrations.AddField(
model_name='user_keys',
name='key_type',
field=models.CharField(default=b'TOTP', max_length=25),
model_name="user_keys",
name="key_type",
field=models.CharField(default=b"TOTP", max_length=25),
),
]

View File

@@ -7,13 +7,13 @@ from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('mfa', '0002_user_keys_key_type'),
("mfa", "0002_user_keys_key_type"),
]
operations = [
migrations.AlterField(
model_name='user_keys',
name='secret_key',
model_name="user_keys",
name="secret_key",
field=models.CharField(max_length=32),
),
]

View File

@@ -7,13 +7,13 @@ from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('mfa', '0003_auto_20181114_2159'),
("mfa", "0003_auto_20181114_2159"),
]
operations = [
migrations.AddField(
model_name='user_keys',
name='enabled',
model_name="user_keys",
name="enabled",
field=models.BooleanField(default=True),
),
]

View File

@@ -7,24 +7,25 @@ import jsonfield.fields
def modify_json(apps, schema_editor):
from django.conf import settings
if "mysql" in settings.DATABASES.get("default", {}).get("engine", ""):
migrations.RunSQL("alter table mfa_user_keys modify column properties json;")
class Migration(migrations.Migration):
dependencies = [
('mfa', '0004_user_keys_enabled'),
("mfa", "0004_user_keys_enabled"),
]
operations = [
migrations.RemoveField(
model_name='user_keys',
name='secret_key',
model_name="user_keys",
name="secret_key",
),
migrations.AddField(
model_name='user_keys',
name='properties',
model_name="user_keys",
name="properties",
field=jsonfield.fields.JSONField(null=True),
),
migrations.RunPython(modify_json)
migrations.RunPython(modify_json),
]

View File

@@ -7,21 +7,29 @@ from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('mfa', '0005_auto_20181115_2014'),
("mfa", "0005_auto_20181115_2014"),
]
operations = [
migrations.CreateModel(
name='Trusted_Devices',
name="Trusted_Devices",
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('signature', models.CharField(max_length=255)),
('key', models.CharField(max_length=6)),
('username', models.CharField(max_length=50)),
('user_agent', models.CharField(max_length=255)),
('status', models.CharField(default=b'adding', max_length=255)),
('added_on', models.DateTimeField(auto_now_add=True)),
('last_used', models.DateTimeField(default=None, null=True)),
(
"id",
models.AutoField(
verbose_name="ID",
serialize=False,
auto_created=True,
primary_key=True,
),
),
("signature", models.CharField(max_length=255)),
("key", models.CharField(max_length=6)),
("username", models.CharField(max_length=50)),
("user_agent", models.CharField(max_length=255)),
("status", models.CharField(default=b"adding", max_length=255)),
("added_on", models.DateTimeField(auto_now_add=True)),
("last_used", models.DateTimeField(default=None, null=True)),
],
),
]

View File

@@ -7,16 +7,16 @@ from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('mfa', '0006_trusted_devices'),
("mfa", "0006_trusted_devices"),
]
operations = [
migrations.DeleteModel(
name='Trusted_Devices',
name="Trusted_Devices",
),
migrations.AddField(
model_name='user_keys',
name='expires',
model_name="user_keys",
name="expires",
field=models.DateTimeField(default=None, null=True, blank=True),
),
]

View File

@@ -7,13 +7,13 @@ from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('mfa', '0007_auto_20181230_1549'),
("mfa", "0007_auto_20181230_1549"),
]
operations = [
migrations.AddField(
model_name='user_keys',
name='last_used',
model_name="user_keys",
name="last_used",
field=models.DateTimeField(default=None, null=True, blank=True),
),
]

View File

@@ -6,21 +6,23 @@ from django.conf import settings
def update_owned_by_enterprise(apps, schema_editor):
user_keys = apps.get_model('mfa', 'user_keys')
user_keys.objects.filter(key_type='FIDO2').update(owned_by_enterprise=getattr(settings,"MFA_OWNED_BY_ENTERPRISE",False))
user_keys = apps.get_model("mfa", "user_keys")
user_keys.objects.filter(key_type="FIDO2").update(
owned_by_enterprise=getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False)
)
class Migration(migrations.Migration):
dependencies = [
('mfa', '0008_user_keys_last_used'),
("mfa", "0008_user_keys_last_used"),
]
operations = [
migrations.AddField(
model_name='user_keys',
name='owned_by_enterprise',
model_name="user_keys",
name="owned_by_enterprise",
field=models.NullBooleanField(default=None),
),
migrations.RunPython(update_owned_by_enterprise)
migrations.RunPython(update_owned_by_enterprise),
]

View File

@@ -6,13 +6,13 @@ from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('mfa', '0009_user_keys_owned_by_enterprise'),
("mfa", "0009_user_keys_owned_by_enterprise"),
]
operations = [
migrations.AlterField(
model_name='user_keys',
name='key_type',
field=models.CharField(default='TOTP', max_length=25),
model_name="user_keys",
name="key_type",
field=models.CharField(default="TOTP", max_length=25),
),
]

View File

@@ -6,13 +6,13 @@ from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('mfa', '0010_auto_20201110_0557'),
("mfa", "0010_auto_20201110_0557"),
]
operations = [
migrations.AlterField(
model_name='user_keys',
name='owned_by_enterprise',
model_name="user_keys",
name="owned_by_enterprise",
field=models.BooleanField(blank=True, default=None, null=True),
),
]

View File

@@ -2,6 +2,7 @@ from django.db import models
from jsonfield import JSONField
from jose import jwt
from django.conf import settings
# from jsonLookup import shasLookup, hasLookup
# JSONField.register_lookup(shasLookup)
# JSONField.register_lookup(hasLookup)
@@ -17,10 +18,23 @@ class User_Keys(models.Model):
last_used = models.DateTimeField(null=True, default=None, blank=True)
owned_by_enterprise = models.BooleanField(default=None, null=True, blank=True)
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
if self.key_type == "Trusted Device" and self.properties.get("signature","") == "":
self.properties["signature"]= jwt.encode({"username": self.username, "key": self.properties["key"]}, settings.SECRET_KEY)
super(User_Keys, self).save(force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields)
def save(
self, force_insert=False, force_update=False, using=None, update_fields=None
):
if (
self.key_type == "Trusted Device"
and self.properties.get("signature", "") == ""
):
self.properties["signature"] = jwt.encode(
{"username": self.username, "key": self.properties["key"]},
settings.SECRET_KEY,
)
super(User_Keys, self).save(
force_insert=force_insert,
force_update=force_update,
using=using,
update_fields=update_fields,
)
def __unicode__(self):
return "%s -- %s" % (self.username, self.key_type)
@@ -29,4 +43,4 @@ class User_Keys(models.Model):
return self.__unicode__()
class Meta:
app_label='mfa'
app_label = "mfa"

View File

@@ -11,6 +11,8 @@ from .views import login
import datetime
from django.utils import timezone
import random
def verify_login(request, username, token):
for key in User_Keys.objects.filter(username=username, key_type="TOTP"):
totp = pyotp.TOTP(key.properties["secret_key"])
@@ -20,42 +22,67 @@ def verify_login(request,username,token):
return [True, key.id]
return [False]
def recheck(request):
context = csrf(request)
context["mode"] = "recheck"
if request.method == "POST":
if verify_login(request, request.user.username, token=request.POST["otp"]):
import time
request.session["mfa"]["rechecked_at"] = time.time()
return HttpResponse(simplejson.dumps({"recheck": True}), content_type="application/json")
return HttpResponse(
simplejson.dumps({"recheck": True}), content_type="application/json"
)
else:
return HttpResponse(simplejson.dumps({"recheck": False}), content_type="application/json")
return HttpResponse(
simplejson.dumps({"recheck": False}), content_type="application/json"
)
return render(request, "TOTP/recheck.html", context)
@never_cache
def auth(request):
context = csrf(request)
if request.method == "POST":
res=verify_login(request,request.session["base_username"],token = request.POST["otp"])
res = verify_login(
request, request.session["base_username"], token=request.POST["otp"]
)
if res[0]:
mfa = {"verified": True, "method": "TOTP", "id": res[1]}
if getattr(settings, "MFA_RECHECK", False):
mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now()
mfa["next_check"] = datetime.datetime.timestamp(
(
datetime.datetime.now()
+ datetime.timedelta(
seconds=random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX))))
seconds=random.randint(
settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX
)
)
)
)
request.session["mfa"] = mfa
return login(request)
context["invalid"] = True
return render(request, "TOTP/Auth.html", context)
def getToken(request):
secret_key = pyotp.random_base32()
totp = pyotp.TOTP(secret_key)
request.session["new_mfa_answer"] = totp.now()
return HttpResponse(simplejson.dumps({"qr":pyotp.totp.TOTP(secret_key).provisioning_uri(str(request.user.username), issuer_name = settings.TOKEN_ISSUER_NAME),
"secret_key": secret_key}))
return HttpResponse(
simplejson.dumps(
{
"qr": pyotp.totp.TOTP(secret_key).provisioning_uri(
str(request.user.username), issuer_name=settings.TOKEN_ISSUER_NAME
),
"secret_key": secret_key,
}
)
)
def verify(request):
answer = request.GET["answer"]
secret_key = request.GET["key"]
@@ -68,7 +95,9 @@ def verify(request):
uk.key_type = "TOTP"
uk.save()
return HttpResponse("Success")
else: return HttpResponse("Error")
else:
return HttpResponse("Error")
@never_cache
def start(request):

View File

@@ -1,4 +1,5 @@
from . import views, totp, U2F, TrustedDevice, helpers, FIDO2, Email
# app_name='mfa'
try:
@@ -6,45 +7,40 @@ try:
except:
from django.conf.urls import url
urlpatterns = [
url(r'totp/start/', totp.start , name="start_new_otop"),
url(r'totp/getToken', totp.getToken , name="get_new_otop"),
url(r'totp/verify', totp.verify, name="verify_otop"),
url(r'totp/auth', totp.auth, name="totp_auth"),
url(r'totp/recheck', totp.recheck, name="totp_recheck"),
url(r'email/start/', Email.start , name="start_email"),
url(r'email/auth/', Email.auth , name="email_auth"),
url(r'u2f/$', U2F.start, name="start_u2f"),
url(r'u2f/bind', U2F.bind, name="bind_u2f"),
url(r'u2f/auth', U2F.auth, name="u2f_auth"),
url(r'u2f/process_recheck', U2F.process_recheck, name="u2f_recheck"),
url(r'u2f/verify', U2F.verify, name="u2f_verify"),
url(r'fido2/$', FIDO2.start, name="start_fido2"),
url(r'fido2/auth', FIDO2.auth, name="fido2_auth"),
url(r'fido2/begin_auth', FIDO2.authenticate_begin, name="fido2_begin_auth"),
url(r'fido2/complete_auth', FIDO2.authenticate_complete, name="fido2_complete_auth"),
url(r'fido2/begin_reg', FIDO2.begin_registeration, name="fido2_begin_reg"),
url(r'fido2/complete_reg', FIDO2.complete_reg, name="fido2_complete_reg"),
url(r'fido2/recheck', FIDO2.recheck, name="fido2_recheck"),
url(r'td/$', TrustedDevice.start, name="start_td"),
url(r'td/add', TrustedDevice.add, name="add_td"),
url(r'td/send_link', TrustedDevice.send_email, name="td_sendemail"),
url(r'td/get-ua', TrustedDevice.getUserAgent, name="td_get_useragent"),
url(r'td/trust', TrustedDevice.trust_device, name="td_trust_device"),
url(r'u2f/checkTrusted', TrustedDevice.checkTrusted, name="td_checkTrusted"),
url(r'u2f/secure_device', TrustedDevice.getCookie, name="td_securedevice"),
url(r'^$', views.index, name="mfa_home"),
url(r'goto/(.*)', views.goto, name="mfa_goto"),
url(r'selct_method', views.show_methods, name="mfa_methods_list"),
url(r'recheck', helpers.recheck, name="mfa_recheck"),
url(r'toggleKey', views.toggleKey, name="toggle_key"),
url(r'delete', views.delKey, name="mfa_delKey"),
url(r'reset', views.reset_cookie, name="mfa_reset_cookie"),
url(r"totp/start/", totp.start, name="start_new_otop"),
url(r"totp/getToken", totp.getToken, name="get_new_otop"),
url(r"totp/verify", totp.verify, name="verify_otop"),
url(r"totp/auth", totp.auth, name="totp_auth"),
url(r"totp/recheck", totp.recheck, name="totp_recheck"),
url(r"email/start/", Email.start, name="start_email"),
url(r"email/auth/", Email.auth, name="email_auth"),
url(r"u2f/$", U2F.start, name="start_u2f"),
url(r"u2f/bind", U2F.bind, name="bind_u2f"),
url(r"u2f/auth", U2F.auth, name="u2f_auth"),
url(r"u2f/process_recheck", U2F.process_recheck, name="u2f_recheck"),
url(r"u2f/verify", U2F.verify, name="u2f_verify"),
url(r"fido2/$", FIDO2.start, name="start_fido2"),
url(r"fido2/auth", FIDO2.auth, name="fido2_auth"),
url(r"fido2/begin_auth", FIDO2.authenticate_begin, name="fido2_begin_auth"),
url(
r"fido2/complete_auth", FIDO2.authenticate_complete, name="fido2_complete_auth"
),
url(r"fido2/begin_reg", FIDO2.begin_registeration, name="fido2_begin_reg"),
url(r"fido2/complete_reg", FIDO2.complete_reg, name="fido2_complete_reg"),
url(r"fido2/recheck", FIDO2.recheck, name="fido2_recheck"),
url(r"td/$", TrustedDevice.start, name="start_td"),
url(r"td/add", TrustedDevice.add, name="add_td"),
url(r"td/send_link", TrustedDevice.send_email, name="td_sendemail"),
url(r"td/get-ua", TrustedDevice.getUserAgent, name="td_get_useragent"),
url(r"td/trust", TrustedDevice.trust_device, name="td_trust_device"),
url(r"u2f/checkTrusted", TrustedDevice.checkTrusted, name="td_checkTrusted"),
url(r"u2f/secure_device", TrustedDevice.getCookie, name="td_securedevice"),
url(r"^$", views.index, name="mfa_home"),
url(r"goto/(.*)", views.goto, name="mfa_goto"),
url(r"selct_method", views.show_methods, name="mfa_methods_list"),
url(r"recheck", helpers.recheck, name="mfa_recheck"),
url(r"toggleKey", views.toggleKey, name="toggle_key"),
url(r"delete", views.delKey, name="mfa_delKey"),
url(r"reset", views.reset_cookie, name="mfa_reset_cookie"),
]
# print(urlpatterns)

View File

@@ -1,6 +1,7 @@
from django.shortcuts import render
from django.http import HttpResponse, HttpResponseRedirect
from .models import *
try:
from django.urls import reverse
except:
@@ -12,11 +13,15 @@ from . import TrustedDevice
from django.contrib.auth.decorators import login_required
from user_agents import parse
@login_required
def index(request):
keys = []
context={"keys":User_Keys.objects.filter(username=request.user.username),"UNALLOWED_AUTHEN_METHODS":settings.MFA_UNALLOWED_METHODS
,"HIDE_DISABLE":getattr(settings,"MFA_HIDE_DISABLE",[])}
context = {
"keys": User_Keys.objects.filter(username=request.user.username),
"UNALLOWED_AUTHEN_METHODS": settings.MFA_UNALLOWED_METHODS,
"HIDE_DISABLE": getattr(settings, "MFA_HIDE_DISABLE", []),
}
for k in context["keys"]:
if k.key_type == "Trusted Device":
setattr(k, "device", parse(k.properties.get("user_agent", "-----")))
@@ -26,13 +31,16 @@ def index(request):
context["keys"] = keys
return render(request, "MFA.html", context)
def verify(request, username):
request.session["base_username"] = username
# request.session["base_password"] = password
keys = User_Keys.objects.filter(username=username, enabled=1)
methods = list(set([k.key_type for k in keys]))
if "Trusted Device" in methods and not request.session.get("checked_trusted_device",False):
if "Trusted Device" in methods and not request.session.get(
"checked_trusted_device", False
):
if TrustedDevice.verify(request):
return login(request)
methods.remove("Trusted Device")
@@ -41,16 +49,21 @@ def verify(request,username):
return HttpResponseRedirect(reverse(methods[0].lower() + "_auth"))
return show_methods(request)
def show_methods(request):
return render(request, "select_mfa_method.html", {})
def reset_cookie(request):
response = HttpResponseRedirect(settings.LOGIN_URL)
response.delete_cookie("base_username")
return response
def login(request):
from django.contrib import auth
from django.conf import settings
callable_func = __get_callable_function__(settings.MFA_LOGIN_CALLBACK)
return callable_func(request, username=request.session["base_username"])
@@ -64,9 +77,11 @@ def delKey(request):
else:
return HttpResponse("Error: You own this token so you can't delete it")
def __get_callable_function__(func_path):
import importlib
if not '.' in func_path:
if not "." in func_path:
raise Exception("class Name should include modulename.classname")
parsed_str = func_path.split(".")
@@ -77,6 +92,7 @@ def __get_callable_function__(func_path):
raise Exception("Module does not have requested function")
return callable_func
@login_required
def toggleKey(request):
id = request.GET["id"]
@@ -92,5 +108,6 @@ def toggleKey(request):
else:
return HttpResponse("Error")
def goto(request, method):
return HttpResponseRedirect(reverse(method.lower() + "_auth"))