Merge pull request #50 from xi/cleanup
This commit is contained in:
13
.github/workflows/main.yml
vendored
Normal file
13
.github/workflows/main.yml
vendored
Normal file
@@ -0,0 +1,13 @@
|
||||
on: [push]
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: python
|
||||
run: |
|
||||
sudo apt-get install python3
|
||||
- name: lint
|
||||
run: |
|
||||
pip install black
|
||||
black --check --exclude migrations mfa/ example/ setup.py
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
## 2.2.0
|
||||
* Added: MFA_REDIRECT_AFTER_REGISTRATION settings parameter
|
||||
* Fixed: Deprecation error for NULBooleanField
|
||||
* Fixed: Deprecation error for NullBooleanField
|
||||
|
||||
## 2.1.2
|
||||
* Fixed: Getting timestamp on Python 3.7 as ("%s") is raising an exception
|
||||
|
||||
19
README.md
19
README.md
@@ -1,6 +1,7 @@
|
||||
# django-mfa2
|
||||
A Django app that handles MFA, it supports TOTP, U2F, FIDO2 U2F (Web Authn), Email Tokens , and Trusted Devices
|
||||
|
||||

|
||||
### Pip Stats
|
||||
[](https://badge.fury.io/py/django-mfa2)
|
||||
[](https://pepy.tech/project/django-mfa2)
|
||||
@@ -105,20 +106,22 @@ Depends on
|
||||
* if user doesn't have mfa then call your function to create the user session
|
||||
|
||||
```python
|
||||
from mfa.helpers import has_mfa
|
||||
|
||||
def login(request): # this function handles the login form POST
|
||||
user = auth.authenticate(username=username, password=password)
|
||||
if user is not None: # if the user object exist
|
||||
from mfa.helpers import has_mfa
|
||||
res = has_mfa(username=username, request=request) # has_mfa returns false or HttpResponseRedirect
|
||||
if res:
|
||||
return res
|
||||
return log_user_in(request, username=user.username)
|
||||
#log_user_in is a function that handles creatung user session, it should be in the setting file as MFA_CALLBACK
|
||||
# log_user_in is a function that handles creating user session, it should be in the setting file as MFA_CALLBACK
|
||||
```
|
||||
1. Add mfa to urls.py
|
||||
```python
|
||||
import mfa
|
||||
import mfa.TrustedDevice
|
||||
|
||||
urls_patterns = [
|
||||
'...',
|
||||
url(r'^mfa/', include('mfa.urls')),
|
||||
@@ -126,7 +129,7 @@ Depends on
|
||||
'....',
|
||||
]
|
||||
```
|
||||
1. Provide `mfa_auth_base.html` in your templaes with block called 'head' and 'content'
|
||||
1. Provide `mfa_auth_base.html` in your templates with block called 'head' and 'content'
|
||||
The template will be included during the user login.
|
||||
If you will use Email Token method, then you have to provide template named `mfa_email_token_template.html` that will content the format of the email with parameter named `user` and `otp`.
|
||||
1. To match the look and feel of your project, MFA includes `base.html` but it needs blocks named `head` & `content` to added its content to it.
|
||||
@@ -148,12 +151,14 @@ To be able to go passwordless for returning users, create a cookie named 'base_
|
||||
|
||||
Second, update the GET part of your login view
|
||||
```python
|
||||
from mfa.helpers import has_mfa
|
||||
|
||||
if "mfa" in settings.INSTALLED_APPS and getattr(settings, "MFA_QUICKLOGIN", False) and request.COOKIES.get('base_username'):
|
||||
username=request.COOKIES.get('base_username')
|
||||
from mfa.helpers import has_mfa
|
||||
res = has_mfa(username = username,request=request,)
|
||||
if res: return res
|
||||
## continue and return the form.
|
||||
res = has_mfa(username=username, request=request)
|
||||
if res:
|
||||
return res
|
||||
# continue and return the form.
|
||||
```
|
||||
# Checking MFA on Client Side
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@
|
||||
'....',
|
||||
]
|
||||
```
|
||||
1. Provide `mfa_auth_base.html` in your templaes with block called 'head' and 'content'
|
||||
1. Provide `mfa_auth_base.html` in your templates with block called 'head' and 'content'
|
||||
The template will be included during the user login.
|
||||
If you will use Email Token method, then you have to provide template named `mfa_email_token_template.html` that will content the format of the email with parameter named `user` and `otp`.
|
||||
1. To match the look and feel of your project, MFA includes `base.html` but it needs blocks named `head` & `content` to added its content to it.
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
from django.shortcuts import render
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.urls import reverse
|
||||
from django.contrib.auth import authenticate, login, logout
|
||||
from django.contrib.auth.models import User
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.shortcuts import render
|
||||
from django.urls import reverse
|
||||
|
||||
from mfa.helpers import has_mfa
|
||||
|
||||
|
||||
def loginView(request):
|
||||
context = {}
|
||||
if request.method == "POST":
|
||||
@@ -10,19 +14,21 @@ def loginView(request):
|
||||
password = request.POST["password"]
|
||||
user = authenticate(username=username, password=password)
|
||||
if user:
|
||||
from mfa.helpers import has_mfa
|
||||
res = has_mfa(username = username, request = request) # has_mfa returns false or HttpResponseRedirect
|
||||
res = has_mfa(
|
||||
username=username, request=request
|
||||
) # has_mfa returns false or HttpResponseRedirect
|
||||
if res:
|
||||
return res
|
||||
return create_session(request, user.username)
|
||||
context["invalid"] = True
|
||||
return render(request, "login.html", context)
|
||||
|
||||
|
||||
def create_session(request, username):
|
||||
user = User.objects.get(username=username)
|
||||
user.backend='django.contrib.auth.backends.ModelBackend'
|
||||
user.backend = "django.contrib.auth.backends.ModelBackend"
|
||||
login(request, user)
|
||||
return HttpResponseRedirect(reverse('home'))
|
||||
return HttpResponseRedirect(reverse("home"))
|
||||
|
||||
|
||||
def logoutView(request):
|
||||
|
||||
@@ -20,7 +20,7 @@ BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
# See https://docs.djangoproject.com/en/2.0/howto/deployment/checklist/
|
||||
|
||||
# SECURITY WARNING: keep the secret key used in production secret!
|
||||
SECRET_KEY = '#9)q!_i3@pr-^3oda(e^3$x!kq3b4f33#5l@+=+&vuz+p6gb3g'
|
||||
SECRET_KEY = "#9)q!_i3@pr-^3oda(e^3$x!kq3b4f33#5l@+=+&vuz+p6gb3g"
|
||||
|
||||
# SECURITY WARNING: don't run with debug turned on in production!
|
||||
DEBUG = True
|
||||
@@ -31,54 +31,54 @@ ALLOWED_HOSTS = []
|
||||
# Application definition
|
||||
|
||||
INSTALLED_APPS = [
|
||||
'django.contrib.admin',
|
||||
'django.contrib.auth',
|
||||
'django.contrib.contenttypes',
|
||||
'django.contrib.sessions',
|
||||
'django.contrib.messages',
|
||||
'django.contrib.staticfiles',
|
||||
'mfa',
|
||||
'sslserver'
|
||||
"django.contrib.admin",
|
||||
"django.contrib.auth",
|
||||
"django.contrib.contenttypes",
|
||||
"django.contrib.sessions",
|
||||
"django.contrib.messages",
|
||||
"django.contrib.staticfiles",
|
||||
"mfa",
|
||||
"sslserver",
|
||||
]
|
||||
|
||||
MIDDLEWARE = [
|
||||
'django.middleware.security.SecurityMiddleware',
|
||||
'django.contrib.sessions.middleware.SessionMiddleware',
|
||||
'django.middleware.common.CommonMiddleware',
|
||||
'django.middleware.csrf.CsrfViewMiddleware',
|
||||
'django.contrib.auth.middleware.AuthenticationMiddleware',
|
||||
'django.contrib.messages.middleware.MessageMiddleware',
|
||||
'django.middleware.clickjacking.XFrameOptionsMiddleware',
|
||||
"django.middleware.security.SecurityMiddleware",
|
||||
"django.contrib.sessions.middleware.SessionMiddleware",
|
||||
"django.middleware.common.CommonMiddleware",
|
||||
"django.middleware.csrf.CsrfViewMiddleware",
|
||||
"django.contrib.auth.middleware.AuthenticationMiddleware",
|
||||
"django.contrib.messages.middleware.MessageMiddleware",
|
||||
"django.middleware.clickjacking.XFrameOptionsMiddleware",
|
||||
]
|
||||
|
||||
ROOT_URLCONF = 'example.urls'
|
||||
ROOT_URLCONF = "example.urls"
|
||||
|
||||
TEMPLATES = [
|
||||
{
|
||||
'BACKEND': 'django.template.backends.django.DjangoTemplates',
|
||||
'DIRS': [os.path.join(BASE_DIR ,'example','templates' )],
|
||||
'APP_DIRS': True,
|
||||
'OPTIONS': {
|
||||
'context_processors': [
|
||||
'django.template.context_processors.debug',
|
||||
'django.template.context_processors.request',
|
||||
'django.contrib.auth.context_processors.auth',
|
||||
'django.contrib.messages.context_processors.messages',
|
||||
"BACKEND": "django.template.backends.django.DjangoTemplates",
|
||||
"DIRS": [os.path.join(BASE_DIR, "example", "templates")],
|
||||
"APP_DIRS": True,
|
||||
"OPTIONS": {
|
||||
"context_processors": [
|
||||
"django.template.context_processors.debug",
|
||||
"django.template.context_processors.request",
|
||||
"django.contrib.auth.context_processors.auth",
|
||||
"django.contrib.messages.context_processors.messages",
|
||||
],
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
WSGI_APPLICATION = 'example.wsgi.application'
|
||||
WSGI_APPLICATION = "example.wsgi.application"
|
||||
|
||||
|
||||
# Database
|
||||
# https://docs.djangoproject.com/en/2.0/ref/settings/#databases
|
||||
|
||||
DATABASES = {
|
||||
'default': {
|
||||
'ENGINE': 'django.db.backends.sqlite3',
|
||||
'NAME': 'test_db',
|
||||
"default": {
|
||||
"ENGINE": "django.db.backends.sqlite3",
|
||||
"NAME": "test_db",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,16 +88,16 @@ DATABASES = {
|
||||
|
||||
AUTH_PASSWORD_VALIDATORS = [
|
||||
{
|
||||
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
|
||||
"NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator",
|
||||
},
|
||||
{
|
||||
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
|
||||
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
|
||||
},
|
||||
{
|
||||
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
|
||||
"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
|
||||
},
|
||||
{
|
||||
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
|
||||
"NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",
|
||||
},
|
||||
]
|
||||
|
||||
@@ -105,9 +105,9 @@ AUTH_PASSWORD_VALIDATORS = [
|
||||
# Internationalization
|
||||
# https://docs.djangoproject.com/en/2.0/topics/i18n/
|
||||
|
||||
LANGUAGE_CODE = 'en-us'
|
||||
LANGUAGE_CODE = "en-us"
|
||||
|
||||
TIME_ZONE = 'UTC'
|
||||
TIME_ZONE = "UTC"
|
||||
|
||||
USE_I18N = True
|
||||
|
||||
@@ -119,32 +119,33 @@ USE_TZ = True
|
||||
# Static files (CSS, JavaScript, Images)
|
||||
# https://docs.djangoproject.com/en/2.0/howto/static-files/
|
||||
|
||||
STATIC_URL = '/static/'
|
||||
STATIC_URL = "/static/"
|
||||
# STATIC_ROOT=(os.path.join(BASE_DIR,'static'))
|
||||
STATICFILES_DIRS=[os.path.join(BASE_DIR,'static')]
|
||||
STATICFILES_DIRS = [os.path.join(BASE_DIR, "static")]
|
||||
LOGIN_URL = "/auth/login"
|
||||
|
||||
EMAIL_FROM='Test App'
|
||||
EMAIL_FROM = "Test App"
|
||||
EMAIL_HOST = "smtp.gmail.com"
|
||||
EMAIL_PORT = 587
|
||||
EMAIL_HOST_USER = ""
|
||||
EMAIL_HOST_PASSWORD=''
|
||||
EMAIL_HOST_PASSWORD = ""
|
||||
EMAIL_USE_TLS = True
|
||||
|
||||
|
||||
|
||||
MFA_UNALLOWED_METHODS = () # Methods that shouldn't be allowed for the user
|
||||
MFA_LOGIN_CALLBACK = "example.auth.create_session" # A function that should be called by username to login the user in session
|
||||
MFA_RECHECK = True # Allow random rechecking of the user
|
||||
MFA_RECHECK_MIN = 10 # Minimum interval in seconds
|
||||
MFA_RECHECK_MAX = 30 # Maximum in seconds
|
||||
MFA_QUICKLOGIN = True # Allow quick login for returning users by provide only their 2FA
|
||||
MFA_HIDE_DISABLE=('',) # Can the user disable his key (Added in 1.2.0).
|
||||
MFA_HIDE_DISABLE = ("",) # Can the user disable his key (Added in 1.2.0).
|
||||
MFA_REDIRECT_AFTER_REGISTRATION = "registered"
|
||||
MFA_SUCCESS_REGISTRATION_MSG = "Go to Home"
|
||||
|
||||
TOKEN_ISSUER_NAME = "PROJECT_NAME" # TOTP Issuer name
|
||||
|
||||
U2F_APPID = "https://localhost" # URL For U2F
|
||||
FIDO_SERVER_ID=u"localhost" # Server rp id for FIDO2, it the full domain of your project
|
||||
FIDO_SERVER_ID = (
|
||||
u"localhost" # Server rp id for FIDO2, it the full domain of your project
|
||||
)
|
||||
FIDO_SERVER_NAME = u"PROJECT_NAME"
|
||||
|
||||
@@ -14,14 +14,15 @@ Including another URLconf
|
||||
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
|
||||
"""
|
||||
from django.contrib import admin
|
||||
from django.urls import path,re_path,include
|
||||
from . import views,auth
|
||||
urlpatterns = [
|
||||
path('admin/', admin.site.urls),
|
||||
path('mfa/', include('mfa.urls')),
|
||||
path('auth/login',auth.loginView,name="login"),
|
||||
path('auth/logout',auth.logoutView,name="logout"),
|
||||
from django.urls import include, path
|
||||
|
||||
re_path('^$',views.home,name='home'),
|
||||
path('registered/',views.registered,name='registered')
|
||||
from . import auth, views
|
||||
|
||||
urlpatterns = [
|
||||
path("admin/", admin.site.urls),
|
||||
path("mfa/", include("mfa.urls")),
|
||||
path("auth/login/", auth.loginView, name="login"),
|
||||
path("auth/logout/", auth.logoutView, name="logout"),
|
||||
path("", views.home, name="home"),
|
||||
path("registered/", views.registered, name="registered"),
|
||||
]
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
from django.shortcuts import render
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.shortcuts import render
|
||||
|
||||
|
||||
@login_required()
|
||||
def home(request):
|
||||
return render(request, "home.html", {})
|
||||
|
||||
|
||||
@login_required()
|
||||
def registered(request):
|
||||
return render(request, "home.html", {"registered": True})
|
||||
|
||||
@@ -1,19 +1,22 @@
|
||||
from django.conf import settings
|
||||
from django.core.mail import EmailMessage
|
||||
try:
|
||||
from django.urls import reverse
|
||||
except:
|
||||
from django.core.urlresolver import reverse
|
||||
|
||||
|
||||
def send(to, subject, body):
|
||||
from_email_address = settings.EMAIL_HOST_USER
|
||||
if '@' not in from_email_address:
|
||||
if "@" not in from_email_address:
|
||||
from_email_address = settings.DEFAULT_FROM_EMAIL
|
||||
From = "%s <%s>" % (settings.EMAIL_FROM, from_email_address)
|
||||
email = EmailMessage(subject, body, From, to)
|
||||
email.content_subtype = "html"
|
||||
return email.send(False)
|
||||
|
||||
|
||||
def get_redirect_url():
|
||||
return {"redirect_html": reverse(getattr(settings, 'MFA_REDIRECT_AFTER_REGISTRATION', 'mfa_home')),
|
||||
"reg_success_msg":getattr(settings,"MFA_SUCCESS_REGISTRATION_MSG")}
|
||||
return {
|
||||
"redirect_html": reverse(
|
||||
getattr(settings, "MFA_REDIRECT_AFTER_REGISTRATION", "mfa_home")
|
||||
),
|
||||
"reg_success_msg": getattr(settings, "MFA_SUCCESS_REGISTRATION_MSG"),
|
||||
}
|
||||
|
||||
76
mfa/Email.py
76
mfa/Email.py
@@ -1,66 +1,90 @@
|
||||
from django.shortcuts import render
|
||||
from django.views.decorators.cache import never_cache
|
||||
from django.template.context_processors import csrf
|
||||
import datetime,random
|
||||
import datetime
|
||||
import random
|
||||
from random import randint
|
||||
from .models import *
|
||||
#from django.template.context import RequestContext
|
||||
from .views import login
|
||||
from .Common import send
|
||||
|
||||
def sendEmail(request,username,secret):
|
||||
"""Send Email to the user after rendering `mfa_email_token_template`"""
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.shortcuts import render
|
||||
from django.template.context_processors import csrf
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from django.views.decorators.cache import never_cache
|
||||
|
||||
from .Common import send
|
||||
from .models import UserKey
|
||||
from .views import login
|
||||
|
||||
|
||||
def send_email(request, username, secret):
|
||||
"""Send Email to the user after rendering `mfa_email_token_template`"""
|
||||
User = get_user_model()
|
||||
key = getattr(User, 'USERNAME_FIELD', 'username')
|
||||
key = getattr(User, "USERNAME_FIELD", "username")
|
||||
kwargs = {key: username}
|
||||
user = User.objects.get(**kwargs)
|
||||
res=render(request,"mfa_email_token_template.html",{"request":request,"user":user,'otp':secret})
|
||||
res = render(
|
||||
request,
|
||||
"mfa_email_token_template.html",
|
||||
{"request": request, "user": user, "otp": secret},
|
||||
)
|
||||
return send([user.email], "OTP", res.content.decode())
|
||||
|
||||
|
||||
@never_cache
|
||||
def start(request):
|
||||
"""Start adding email as a 2nd factor"""
|
||||
context = csrf(request)
|
||||
if request.method == "POST":
|
||||
if request.session["email_secret"] == request.POST["otp"]: # if successful
|
||||
uk=User_Keys()
|
||||
uk = UserKey()
|
||||
uk.username = request.user.username
|
||||
uk.key_type = "Email"
|
||||
uk.enabled = 1
|
||||
uk.save()
|
||||
from django.http import HttpResponseRedirect
|
||||
try:
|
||||
from django.core.urlresolvers import reverse
|
||||
except:
|
||||
from django.urls import reverse
|
||||
return HttpResponseRedirect(reverse(getattr(settings,'MFA_REDIRECT_AFTER_REGISTRATION','mfa_home')))
|
||||
return HttpResponseRedirect(
|
||||
reverse(
|
||||
getattr(settings, "MFA_REDIRECT_AFTER_REGISTRATION", "mfa_home")
|
||||
)
|
||||
)
|
||||
context["invalid"] = True
|
||||
else:
|
||||
request.session["email_secret"] = str(randint(0,100000)) #generate a random integer
|
||||
if sendEmail(request, request.user.username, request.session["email_secret"]):
|
||||
request.session["email_secret"] = str(
|
||||
randint(0, 100000)
|
||||
) # generate a random integer
|
||||
if send_email(request, request.user.username, request.session["email_secret"]):
|
||||
context["sent"] = True
|
||||
return render(request, "Email/Add.html", context)
|
||||
|
||||
|
||||
@never_cache
|
||||
def auth(request):
|
||||
"""Authenticating the user by email."""
|
||||
context = csrf(request)
|
||||
if request.method == "POST":
|
||||
if request.session["email_secret"] == request.POST["otp"].strip():
|
||||
uk = User_Keys.objects.get(username=request.session["base_username"], key_type="Email")
|
||||
uk = UserKey.objects.get(
|
||||
username=request.session["base_username"], key_type="Email"
|
||||
)
|
||||
mfa = {"verified": True, "method": "Email", "id": uk.id}
|
||||
if getattr(settings, "MFA_RECHECK", False):
|
||||
mfa["next_check"] = datetime.datetime.timestamp(datetime.datetime.now() + datetime.timedelta(
|
||||
seconds = random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX)))
|
||||
mfa["next_check"] = datetime.datetime.timestamp(
|
||||
datetime.datetime.now()
|
||||
+ datetime.timedelta(
|
||||
seconds=random.randint(
|
||||
settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX
|
||||
)
|
||||
)
|
||||
)
|
||||
request.session["mfa"] = mfa
|
||||
|
||||
from django.utils import timezone
|
||||
uk.last_used = timezone.now()
|
||||
uk.save()
|
||||
return login(request)
|
||||
context["invalid"] = True
|
||||
else:
|
||||
request.session["email_secret"] = str(randint(0, 100000))
|
||||
if sendEmail(request, request.session["base_username"], request.session["email_secret"]):
|
||||
if send_email(
|
||||
request, request.session["base_username"], request.session["email_secret"]
|
||||
):
|
||||
context["sent"] = True
|
||||
return render(request, "Email/Auth.html", context)
|
||||
|
||||
188
mfa/FIDO2.py
188
mfa/FIDO2.py
@@ -1,21 +1,23 @@
|
||||
from fido2.client import ClientData
|
||||
from fido2.server import Fido2Server, PublicKeyCredentialRpEntity
|
||||
from fido2.ctap2 import AttestationObject, AuthenticatorData
|
||||
from django.template.context_processors import csrf
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from django.shortcuts import render
|
||||
# from django.template.context import RequestContext
|
||||
import simplejson
|
||||
from fido2 import cbor
|
||||
from django.http import HttpResponse
|
||||
from django.conf import settings
|
||||
from .models import *
|
||||
from fido2.utils import websafe_decode, websafe_encode
|
||||
from fido2.ctap2 import AttestedCredentialData
|
||||
from .views import login, reset_cookie
|
||||
import datetime
|
||||
from .Common import get_redirect_url
|
||||
import random
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse, JsonResponse
|
||||
from django.shortcuts import render
|
||||
from django.template.context_processors import csrf
|
||||
from django.utils import timezone
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from fido2 import cbor
|
||||
from fido2.client import ClientData
|
||||
from fido2.ctap2 import AttestationObject, AttestedCredentialData, AuthenticatorData
|
||||
from fido2.server import Fido2Server, PublicKeyCredentialRpEntity
|
||||
from fido2.utils import websafe_decode, websafe_encode
|
||||
|
||||
from .Common import get_redirect_url
|
||||
from .models import UserKey
|
||||
from .views import login, reset_cookie
|
||||
|
||||
|
||||
def recheck(request):
|
||||
@@ -26,7 +28,7 @@ def recheck(request):
|
||||
return render(request, "FIDO2/recheck.html", context)
|
||||
|
||||
|
||||
def getServer():
|
||||
def get_server():
|
||||
"""Get Server Info from settings and returns a Fido2Server"""
|
||||
rp = PublicKeyCredentialRpEntity(settings.FIDO_SERVER_ID, settings.FIDO_SERVER_NAME)
|
||||
return Fido2Server(rp)
|
||||
@@ -34,15 +36,20 @@ def getServer():
|
||||
|
||||
def begin_registeration(request):
|
||||
"""Starts registering a new FIDO Device, called from API"""
|
||||
server = getServer()
|
||||
registration_data, state = server.register_begin({
|
||||
u'id': request.user.username.encode("utf8"),
|
||||
u'name': (request.user.first_name + " " + request.user.last_name),
|
||||
u'displayName': request.user.username,
|
||||
}, getUserCredentials(request.user.username))
|
||||
request.session['fido_state'] = state
|
||||
server = get_server()
|
||||
registration_data, state = server.register_begin(
|
||||
{
|
||||
u"id": request.user.username.encode("utf8"),
|
||||
u"name": (request.user.first_name + " " + request.user.last_name),
|
||||
u"displayName": request.user.username,
|
||||
},
|
||||
get_user_credentials(request.user.username),
|
||||
)
|
||||
request.session["fido_state"] = state
|
||||
|
||||
return HttpResponse(cbor.encode(registration_data), content_type = 'application/octet-stream')
|
||||
return HttpResponse(
|
||||
cbor.encode(registration_data), content_type="application/octet-stream"
|
||||
)
|
||||
|
||||
|
||||
@csrf_exempt
|
||||
@@ -51,29 +58,28 @@ def complete_reg(request):
|
||||
try:
|
||||
data = cbor.decode(request.body)
|
||||
|
||||
client_data = ClientData(data['clientDataJSON'])
|
||||
att_obj = AttestationObject((data['attestationObject']))
|
||||
server = getServer()
|
||||
client_data = ClientData(data["clientDataJSON"])
|
||||
att_obj = AttestationObject((data["attestationObject"]))
|
||||
server = get_server()
|
||||
auth_data = server.register_complete(
|
||||
request.session['fido_state'],
|
||||
client_data,
|
||||
att_obj
|
||||
request.session["fido_state"], client_data, att_obj
|
||||
)
|
||||
encoded = websafe_encode(auth_data.credential_data)
|
||||
uk = User_Keys()
|
||||
uk = UserKey()
|
||||
uk.username = request.user.username
|
||||
uk.properties = {"device": encoded, "type": att_obj.fmt, }
|
||||
uk.properties = {
|
||||
"device": encoded,
|
||||
"type": att_obj.fmt,
|
||||
}
|
||||
uk.owned_by_enterprise = getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False)
|
||||
uk.key_type = "FIDO2"
|
||||
uk.save()
|
||||
return HttpResponse(simplejson.dumps({'status': 'OK'}))
|
||||
return JsonResponse({"status": "OK"})
|
||||
except Exception as exp:
|
||||
try:
|
||||
from raven.contrib.django.raven_compat.models import client
|
||||
client.captureException()
|
||||
except:
|
||||
pass
|
||||
return HttpResponse(simplejson.dumps({'status': 'ERR', "message": "Error on server, please try again later"}))
|
||||
print(traceback.format_exc())
|
||||
return JsonResponse(
|
||||
{"status": "ERR", "message": "Error on server, please try again later"}
|
||||
)
|
||||
|
||||
|
||||
def start(request):
|
||||
@@ -83,10 +89,12 @@ def start(request):
|
||||
return render(request, "FIDO2/Add.html", context)
|
||||
|
||||
|
||||
def getUserCredentials(username):
|
||||
def get_user_credentials(username):
|
||||
credentials = []
|
||||
for uk in User_Keys.objects.filter(username = username, key_type = "FIDO2"):
|
||||
credentials.append(AttestedCredentialData(websafe_decode(uk.properties["device"])))
|
||||
for uk in UserKey.objects.filter(username=username, key_type="FIDO2"):
|
||||
credentials.append(
|
||||
AttestedCredentialData(websafe_decode(uk.properties["device"]))
|
||||
)
|
||||
return credentials
|
||||
|
||||
|
||||
@@ -96,10 +104,12 @@ def auth(request):
|
||||
|
||||
|
||||
def authenticate_begin(request):
|
||||
server = getServer()
|
||||
credentials = getUserCredentials(request.session.get("base_username", request.user.username))
|
||||
server = get_server()
|
||||
credentials = get_user_credentials(
|
||||
request.session.get("base_username", request.user.username)
|
||||
)
|
||||
auth_data, state = server.authenticate_begin(credentials)
|
||||
request.session['fido_state'] = state
|
||||
request.session["fido_state"] = state
|
||||
return HttpResponse(cbor.encode(auth_data), content_type="application/octet-stream")
|
||||
|
||||
|
||||
@@ -108,64 +118,70 @@ def authenticate_complete(request):
|
||||
try:
|
||||
credentials = []
|
||||
username = request.session.get("base_username", request.user.username)
|
||||
server = getServer()
|
||||
credentials = getUserCredentials(username)
|
||||
server = get_server()
|
||||
credentials = get_user_credentials(username)
|
||||
data = cbor.decode(request.body)
|
||||
credential_id = data['credentialId']
|
||||
client_data = ClientData(data['clientDataJSON'])
|
||||
auth_data = AuthenticatorData(data['authenticatorData'])
|
||||
signature = data['signature']
|
||||
credential_id = data["credentialId"]
|
||||
client_data = ClientData(data["clientDataJSON"])
|
||||
auth_data = AuthenticatorData(data["authenticatorData"])
|
||||
signature = data["signature"]
|
||||
try:
|
||||
cred = server.authenticate_complete(
|
||||
request.session.pop('fido_state'),
|
||||
request.session.pop("fido_state"),
|
||||
credentials,
|
||||
credential_id,
|
||||
client_data,
|
||||
auth_data,
|
||||
signature
|
||||
signature,
|
||||
)
|
||||
except ValueError:
|
||||
return HttpResponse(simplejson.dumps({'status': "ERR",
|
||||
"message": "Wrong challenge received, make sure that this is your security and try again."}),
|
||||
content_type = "application/json")
|
||||
return JsonResponse(
|
||||
{
|
||||
"status": "ERR",
|
||||
"message": "Wrong challenge received, make sure that this is your security and try again.",
|
||||
}
|
||||
)
|
||||
except Exception as excep:
|
||||
try:
|
||||
from raven.contrib.django.raven_compat.models import client
|
||||
client.captureException()
|
||||
except:
|
||||
pass
|
||||
return HttpResponse(simplejson.dumps({'status': "ERR",
|
||||
"message": excep.message}),
|
||||
content_type = "application/json")
|
||||
print(traceback.format_exc())
|
||||
return JsonResponse({"status": "ERR", "message": excep.message})
|
||||
|
||||
if request.session.get("mfa_recheck", False):
|
||||
import time
|
||||
request.session["mfa"]["rechecked_at"] = time.time()
|
||||
return HttpResponse(simplejson.dumps({'status': "OK"}),
|
||||
content_type = "application/json")
|
||||
return JsonResponse({"status": "OK"})
|
||||
else:
|
||||
import random
|
||||
keys = User_Keys.objects.filter(username = username, key_type = "FIDO2", enabled = 1)
|
||||
keys = UserKey.objects.filter(
|
||||
username=username, key_type="FIDO2", enabled=1
|
||||
)
|
||||
for k in keys:
|
||||
if AttestedCredentialData(websafe_decode(k.properties["device"])).credential_id == cred.credential_id:
|
||||
if (
|
||||
AttestedCredentialData(
|
||||
websafe_decode(k.properties["device"])
|
||||
).credential_id
|
||||
== cred.credential_id
|
||||
):
|
||||
k.last_used = timezone.now()
|
||||
k.save()
|
||||
mfa = {"verified": True, "method": "FIDO2", 'id': k.id}
|
||||
mfa = {"verified": True, "method": "FIDO2", "id": k.id}
|
||||
if getattr(settings, "MFA_RECHECK", False):
|
||||
mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now() + datetime.timedelta(
|
||||
seconds = random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX))))
|
||||
mfa["next_check"] = datetime.datetime.timestamp(
|
||||
(
|
||||
datetime.datetime.now()
|
||||
+ datetime.timedelta(
|
||||
seconds=random.randint(
|
||||
settings.MFA_RECHECK_MIN,
|
||||
settings.MFA_RECHECK_MAX,
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
request.session["mfa"] = mfa
|
||||
try:
|
||||
authenticated = request.user.is_authenticated
|
||||
except:
|
||||
authenticated = request.user.is_authenticated()
|
||||
if not authenticated:
|
||||
if not request.user.is_authenticated:
|
||||
res = login(request)
|
||||
if not "location" in res: return reset_cookie(request)
|
||||
return HttpResponse(simplejson.dumps({'status': "OK", "redirect": res["location"]}),
|
||||
content_type = "application/json")
|
||||
return HttpResponse(simplejson.dumps({'status': "OK"}),
|
||||
content_type = "application/json")
|
||||
if "location" not in res:
|
||||
return reset_cookie(request)
|
||||
return JsonResponse(
|
||||
{"status": "OK", "redirect": res["location"]}
|
||||
)
|
||||
return JsonResponse({"status": "OK"})
|
||||
except Exception as exp:
|
||||
return HttpResponse(simplejson.dumps({'status': "ERR", "message": exp.message}),
|
||||
content_type = "application/json")
|
||||
return JsonResponse({"status": "ERR", "message": str(exp)})
|
||||
|
||||
@@ -1,59 +1,73 @@
|
||||
import string
|
||||
import random
|
||||
from django.shortcuts import render
|
||||
from django.http import HttpResponse
|
||||
from django.template.context import RequestContext
|
||||
from django.template.context_processors import csrf
|
||||
from .models import *
|
||||
import string
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import user_agents
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse
|
||||
from django.shortcuts import render
|
||||
from django.template.context_processors import csrf
|
||||
from django.utils import timezone
|
||||
from jose import jwt
|
||||
|
||||
from .Common import send
|
||||
from .models import UserKey
|
||||
|
||||
|
||||
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
|
||||
x=''.join(random.choice(chars) for _ in range(size))
|
||||
if not User_Keys.objects.filter(properties__shas="$.key="+x).exists(): return x
|
||||
else: return id_generator(size,chars)
|
||||
x = "".join(random.choice(chars) for _ in range(size))
|
||||
if not UserKey.objects.filter(properties__shas="$.key=" + x).exists():
|
||||
return x
|
||||
else:
|
||||
return id_generator(size, chars)
|
||||
|
||||
def getUserAgent(request):
|
||||
|
||||
def get_user_agent(request):
|
||||
id = id = request.session.get("td_id", None)
|
||||
if id:
|
||||
tk=User_Keys.objects.get(id=id)
|
||||
tk = UserKey.objects.get(id=id)
|
||||
if tk.properties.get("user_agent", "") != "":
|
||||
ua = user_agents.parse(tk.properties["user_agent"])
|
||||
res = render(None, "TrustedDevices/user-agent.html", context={"ua": ua})
|
||||
return HttpResponse(res)
|
||||
return HttpResponse("")
|
||||
|
||||
|
||||
def trust_device(request):
|
||||
tk = User_Keys.objects.get(id=request.session["td_id"])
|
||||
tk = UserKey.objects.get(id=request.session["td_id"])
|
||||
tk.properties["status"] = "trusted"
|
||||
tk.save()
|
||||
del request.session["td_id"]
|
||||
return HttpResponse("OK")
|
||||
|
||||
def checkTrusted(request):
|
||||
|
||||
def check_trusted(request):
|
||||
res = ""
|
||||
id = request.session.get("td_id", "")
|
||||
if id != "":
|
||||
try:
|
||||
tk = User_Keys.objects.get(id=id)
|
||||
if tk.properties["status"] == "trusted": res = "OK"
|
||||
tk = UserKey.objects.get(id=id)
|
||||
if tk.properties["status"] == "trusted":
|
||||
res = "OK"
|
||||
except:
|
||||
pass
|
||||
return HttpResponse(res)
|
||||
|
||||
def getCookie(request):
|
||||
tk = User_Keys.objects.get(id=request.session["td_id"])
|
||||
|
||||
def get_cookie(request):
|
||||
tk = UserKey.objects.get(id=request.session["td_id"])
|
||||
|
||||
if tk.properties["status"] == "trusted":
|
||||
context = {"added": True}
|
||||
response = render(request, "TrustedDevices/Done.html", context)
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
expires = datetime.now() + timedelta(days=180)
|
||||
tk.expires = expires
|
||||
tk.save()
|
||||
response.set_cookie("deviceid", tk.properties["signature"], expires=expires)
|
||||
return response
|
||||
|
||||
|
||||
def add(request):
|
||||
context = csrf(request)
|
||||
if request.method == "GET":
|
||||
@@ -62,12 +76,14 @@ def add(request):
|
||||
key = request.POST["key"].replace("-", "").replace(" ", "").upper()
|
||||
context["username"] = request.POST["username"]
|
||||
context["key"] = request.POST["key"]
|
||||
trusted_keys=User_Keys.objects.filter(username=request.POST["username"],properties__has="$.key="+key)
|
||||
trusted_keys = UserKey.objects.filter(
|
||||
username=request.POST["username"], properties__has="$.key=" + key
|
||||
)
|
||||
cookie = False
|
||||
if trusted_keys.exists():
|
||||
tk = trusted_keys[0]
|
||||
request.session["td_id"] = tk.id
|
||||
ua=request.META['HTTP_USER_AGENT']
|
||||
ua = request.META["HTTP_USER_AGENT"]
|
||||
agent = user_agents.parse(ua)
|
||||
if agent.is_pc:
|
||||
context["invalid"] = "This is a PC, it can't used as a trusted device."
|
||||
@@ -75,37 +91,43 @@ def add(request):
|
||||
tk.properties["user_agent"] = ua
|
||||
tk.save()
|
||||
context["success"] = True
|
||||
# tk.properties["user_agent"]=ua
|
||||
# tk.save()
|
||||
# context["success"]=True
|
||||
|
||||
else:
|
||||
context["invalid"]="The username or key is wrong, please check and try again."
|
||||
context[
|
||||
"invalid"
|
||||
] = "The username or key is wrong, please check and try again."
|
||||
|
||||
return render(request, "TrustedDevices/Add.html", context)
|
||||
|
||||
|
||||
def start(request):
|
||||
if User_Keys.objects.filter(username=request.user.username,key_type="Trusted Device").count()>= 2:
|
||||
if (
|
||||
UserKey.objects.filter(
|
||||
username=request.user.username, key_type="Trusted Device"
|
||||
).count()
|
||||
>= 2
|
||||
):
|
||||
return render(request, "TrustedDevices/start.html", {"not_allowed": True})
|
||||
td = None
|
||||
if not request.session.get("td_id", None):
|
||||
td=User_Keys()
|
||||
td = UserKey()
|
||||
td.username = request.user.username
|
||||
td.properties = {"key": id_generator(), "status": "adding"}
|
||||
td.key_type = "Trusted Device"
|
||||
td.save()
|
||||
request.session["td_id"] = td.id
|
||||
try:
|
||||
if td==None: td=User_Keys.objects.get(id=request.session["td_id"])
|
||||
if td is None:
|
||||
td = UserKey.objects.get(id=request.session["td_id"])
|
||||
context = {"key": td.properties["key"]}
|
||||
except:
|
||||
del request.session["td_id"]
|
||||
return start(request)
|
||||
return render(request, "TrustedDevices/start.html", context)
|
||||
|
||||
|
||||
def send_email(request):
|
||||
body = render(request, "TrustedDevices/email.html", {}).content
|
||||
from .Common import send
|
||||
e = request.user.email
|
||||
if e == "":
|
||||
e = request.session.get("user", {}).get("email", "")
|
||||
@@ -119,16 +141,22 @@ def send_email(request):
|
||||
|
||||
|
||||
def verify(request):
|
||||
if request.COOKIES.get('deviceid',None):
|
||||
from jose import jwt
|
||||
json= jwt.decode(request.COOKIES.get('deviceid'),settings.SECRET_KEY)
|
||||
if json["username"].lower()== request.session['base_username'].lower():
|
||||
if request.COOKIES.get("deviceid", None):
|
||||
json = jwt.decode(request.COOKIES.get("deviceid"), settings.SECRET_KEY)
|
||||
if json["username"].lower() == request.session["base_username"].lower():
|
||||
try:
|
||||
uk = User_Keys.objects.get(username=request.POST["username"].lower(), properties__has="$.key=" + json["key"])
|
||||
uk = UserKey.objects.get(
|
||||
username=request.POST["username"].lower(),
|
||||
properties__has="$.key=" + json["key"],
|
||||
)
|
||||
if uk.enabled and uk.properties["status"] == "trusted":
|
||||
uk.last_used = timezone.now()
|
||||
uk.save()
|
||||
request.session["mfa"] = {"verified": True, "method": "Trusted Device","id":uk.id}
|
||||
request.session["mfa"] = {
|
||||
"verified": True,
|
||||
"method": "Trusted Device",
|
||||
"id": uk.id,
|
||||
}
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
92
mfa/U2F.py
92
mfa/U2F.py
@@ -1,19 +1,28 @@
|
||||
import datetime
|
||||
import hashlib
|
||||
import random
|
||||
import time
|
||||
|
||||
from u2flib_server.u2f import (begin_registration, begin_authentication,
|
||||
complete_registration, complete_authentication)
|
||||
import simplejson
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.serialization import Encoding
|
||||
from django.shortcuts import render
|
||||
import simplejson
|
||||
#from django.template.context import RequestContext
|
||||
from django.template.context_processors import csrf
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse
|
||||
from .models import *
|
||||
from .views import login
|
||||
import datetime
|
||||
from django.http import HttpResponse, JsonResponse
|
||||
from django.shortcuts import render
|
||||
from django.template.context_processors import csrf
|
||||
from django.utils import timezone
|
||||
from u2flib_server.u2f import (
|
||||
begin_authentication,
|
||||
begin_registration,
|
||||
complete_authentication,
|
||||
complete_registration,
|
||||
)
|
||||
|
||||
from .Common import get_redirect_url
|
||||
from .models import UserKey
|
||||
from .views import login
|
||||
|
||||
|
||||
def recheck(request):
|
||||
context = csrf(request)
|
||||
@@ -24,45 +33,58 @@ def recheck(request):
|
||||
request.session["mfa_recheck"] = True
|
||||
return render(request, "U2F/recheck.html", context)
|
||||
|
||||
|
||||
def process_recheck(request):
|
||||
x = validate(request, request.user.username)
|
||||
if x==True:
|
||||
import time
|
||||
if x is True:
|
||||
request.session["mfa"]["rechecked_at"] = time.time()
|
||||
return HttpResponse(simplejson.dumps({"recheck":True}),content_type="application/json")
|
||||
return JsonResponse({"recheck": True})
|
||||
return x
|
||||
|
||||
|
||||
def check_errors(request, data):
|
||||
if "errorCode" in data:
|
||||
if data["errorCode"] == 0: return True
|
||||
if data["errorCode"] == 0:
|
||||
return True
|
||||
if data["errorCode"] == 4:
|
||||
return HttpResponse("Invalid Security Key")
|
||||
if data["errorCode"] == 1:
|
||||
return auth(request)
|
||||
return True
|
||||
def validate(request,username):
|
||||
import datetime, random
|
||||
|
||||
|
||||
def validate(request, username):
|
||||
data = simplejson.loads(request.POST["response"])
|
||||
|
||||
res = check_errors(request, data)
|
||||
if res!=True:
|
||||
if res is not True:
|
||||
return res
|
||||
|
||||
challenge = request.session.pop('_u2f_challenge_')
|
||||
challenge = request.session.pop("_u2f_challenge_")
|
||||
device, c, t = complete_authentication(challenge, data, [settings.U2F_APPID])
|
||||
|
||||
key=User_Keys.objects.get(username=username,properties__shas="$.device.publicKey=%s"%device["publicKey"])
|
||||
key = UserKey.objects.get(
|
||||
username=username,
|
||||
properties__shas="$.device.publicKey=%s" % device["publicKey"],
|
||||
)
|
||||
key.last_used = timezone.now()
|
||||
key.save()
|
||||
mfa = {"verified": True, "method": "U2F", "id": key.id}
|
||||
if getattr(settings, "MFA_RECHECK", False):
|
||||
mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now()
|
||||
mfa["next_check"] = datetime.datetime.timestamp(
|
||||
(
|
||||
datetime.datetime.now()
|
||||
+ datetime.timedelta(
|
||||
seconds=random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX))))
|
||||
seconds=random.randint(
|
||||
settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
request.session["mfa"] = mfa
|
||||
return True
|
||||
|
||||
|
||||
def auth(request):
|
||||
context = csrf(request)
|
||||
s = sign(request.session["base_username"])
|
||||
@@ -71,9 +93,10 @@ def auth(request):
|
||||
|
||||
return render(request, "U2F/Auth.html")
|
||||
|
||||
|
||||
def start(request):
|
||||
enroll = begin_registration(settings.U2F_APPID, [])
|
||||
request.session['_u2f_enroll_'] = enroll.json
|
||||
request.session["_u2f_enroll_"] = enroll.json
|
||||
context = csrf(request)
|
||||
context["token"] = simplejson.dumps(enroll.data_for_client)
|
||||
context.update(get_redirect_url())
|
||||
@@ -81,17 +104,18 @@ def start(request):
|
||||
|
||||
|
||||
def bind(request):
|
||||
import hashlib
|
||||
enroll = request.session['_u2f_enroll_']
|
||||
enroll = request.session["_u2f_enroll_"]
|
||||
data = simplejson.loads(request.POST["response"])
|
||||
device, cert = complete_registration(enroll, data, [settings.U2F_APPID])
|
||||
cert = x509.load_der_x509_certificate(cert, default_backend())
|
||||
cert_hash = hashlib.md5(cert.public_bytes(Encoding.PEM)).hexdigest()
|
||||
q=User_Keys.objects.filter(key_type="U2F", properties__icontains= cert_hash)
|
||||
q = UserKey.objects.filter(key_type="U2F", properties__icontains=cert_hash)
|
||||
if q.exists():
|
||||
return HttpResponse("This key is registered before, it can't be registered again.")
|
||||
User_Keys.objects.filter(username=request.user.username,key_type="U2F").delete()
|
||||
uk = User_Keys()
|
||||
return HttpResponse(
|
||||
"This key is registered before, it can't be registered again."
|
||||
)
|
||||
UserKey.objects.filter(username=request.user.username, key_type="U2F").delete()
|
||||
uk = UserKey()
|
||||
uk.username = request.user.username
|
||||
uk.owned_by_enterprise = getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False)
|
||||
uk.properties = {"device": simplejson.loads(device.json), "cert": cert_hash}
|
||||
@@ -99,13 +123,19 @@ def bind(request):
|
||||
uk.save()
|
||||
return HttpResponse("OK")
|
||||
|
||||
|
||||
def sign(username):
|
||||
u2f_devices=[d.properties["device"] for d in User_Keys.objects.filter(username=username,key_type="U2F")]
|
||||
u2f_devices = [
|
||||
d.properties["device"]
|
||||
for d in UserKey.objects.filter(username=username, key_type="U2F")
|
||||
]
|
||||
challenge = begin_authentication(settings.U2F_APPID, u2f_devices)
|
||||
return [challenge.json, simplejson.dumps(challenge.data_for_client)]
|
||||
|
||||
|
||||
def verify(request):
|
||||
x = validate(request, request.session["base_username"])
|
||||
if x==True:
|
||||
if x is True:
|
||||
return login(request)
|
||||
else: return x
|
||||
else:
|
||||
return x
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
from django.contrib import admin
|
||||
|
||||
# Register your models here.
|
||||
@@ -1,4 +1,6 @@
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class myAppNameConfig(AppConfig):
|
||||
name = 'mfa'
|
||||
verbose_name = 'A Much Better Name'
|
||||
name = "mfa"
|
||||
verbose_name = "A Much Better Name"
|
||||
|
||||
@@ -1,32 +1,32 @@
|
||||
import pyotp
|
||||
from .models import *
|
||||
from . import TrustedDevice, U2F, FIDO2, totp
|
||||
import simplejson
|
||||
from django.shortcuts import HttpResponse
|
||||
from mfa.views import verify,goto
|
||||
from django.http import JsonResponse
|
||||
|
||||
from . import FIDO2, U2F, TrustedDevice, totp
|
||||
from .models import UserKey
|
||||
from .views import verify
|
||||
|
||||
|
||||
def has_mfa(request, username):
|
||||
if User_Keys.objects.filter(username=username,enabled=1).count()>0:
|
||||
if UserKey.objects.filter(username=username, enabled=1).count() > 0:
|
||||
return verify(request, username)
|
||||
return False
|
||||
|
||||
|
||||
def is_mfa(request, ignore_methods=[]):
|
||||
if request.session.get("mfa", {}).get("verified", False):
|
||||
if not request.session.get("mfa", {}).get("method", None) in ignore_methods:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def recheck(request):
|
||||
method = request.session.get("mfa", {}).get("method", None)
|
||||
if not method:
|
||||
return HttpResponse(simplejson.dumps({"res":False}),content_type="application/json")
|
||||
return JsonResponse({"res": False})
|
||||
if method == "Trusted Device":
|
||||
return HttpResponse(simplejson.dumps({"res":TrustedDevice.verify(request)}),content_type="application/json")
|
||||
return JsonResponse({"res": TrustedDevice.verify(request)})
|
||||
elif method == "U2F":
|
||||
return HttpResponse(simplejson.dumps({"html": U2F.recheck(request).content}), content_type="application/json")
|
||||
return JsonResponse({"html": U2F.recheck(request).content})
|
||||
elif method == "FIDO2":
|
||||
return HttpResponse(simplejson.dumps({"html": FIDO2.recheck(request).content}), content_type="application/json")
|
||||
return JsonResponse({"html": FIDO2.recheck(request).content})
|
||||
elif method == "TOTP":
|
||||
return HttpResponse(simplejson.dumps({"html": totp.recheck(request).content}), content_type="application/json")
|
||||
|
||||
|
||||
|
||||
return JsonResponse({"html": totp.recheck(request).content})
|
||||
|
||||
@@ -1,13 +1,20 @@
|
||||
import time
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.core.urlresolvers import reverse
|
||||
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.urls import reverse
|
||||
|
||||
|
||||
def process(request):
|
||||
next_check=request.session.get('mfa',{}).get("next_check",False)
|
||||
if not next_check: return None
|
||||
next_check = request.session.get("mfa", {}).get("next_check", False)
|
||||
if not next_check:
|
||||
return None
|
||||
now = int(time.time())
|
||||
if now >= next_check:
|
||||
method = request.session["mfa"]["method"]
|
||||
path = request.META["PATH_INFO"]
|
||||
return HttpResponseRedirect(reverse(method+"_auth")+"?next=%s"%(settings.BASE_URL + path).replace("//", "/"))
|
||||
return HttpResponseRedirect(
|
||||
reverse(method + "_auth")
|
||||
+ "?next=%s" % (settings.BASE_URL + path).replace("//", "/")
|
||||
)
|
||||
return None
|
||||
@@ -6,17 +6,24 @@ from django.db import models, migrations
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
]
|
||||
dependencies = []
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='User_Keys',
|
||||
name="User_Keys",
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('username', models.CharField(max_length=50)),
|
||||
('secret_key', models.CharField(max_length=15)),
|
||||
('added_on', models.DateTimeField(auto_now_add=True)),
|
||||
(
|
||||
"id",
|
||||
models.AutoField(
|
||||
verbose_name="ID",
|
||||
serialize=False,
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
),
|
||||
),
|
||||
("username", models.CharField(max_length=50)),
|
||||
("secret_key", models.CharField(max_length=15)),
|
||||
("added_on", models.DateTimeField(auto_now_add=True)),
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
@@ -7,13 +7,13 @@ from django.db import models, migrations
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('mfa', '0001_initial'),
|
||||
("mfa", "0001_initial"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='user_keys',
|
||||
name='key_type',
|
||||
field=models.CharField(default=b'TOTP', max_length=25),
|
||||
model_name="user_keys",
|
||||
name="key_type",
|
||||
field=models.CharField(default=b"TOTP", max_length=25),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -7,13 +7,13 @@ from django.db import models, migrations
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('mfa', '0002_user_keys_key_type'),
|
||||
("mfa", "0002_user_keys_key_type"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name='user_keys',
|
||||
name='secret_key',
|
||||
model_name="user_keys",
|
||||
name="secret_key",
|
||||
field=models.CharField(max_length=32),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -7,13 +7,13 @@ from django.db import models, migrations
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('mfa', '0003_auto_20181114_2159'),
|
||||
("mfa", "0003_auto_20181114_2159"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='user_keys',
|
||||
name='enabled',
|
||||
model_name="user_keys",
|
||||
name="enabled",
|
||||
field=models.BooleanField(default=True),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -7,24 +7,25 @@ import jsonfield.fields
|
||||
|
||||
def modify_json(apps, schema_editor):
|
||||
from django.conf import settings
|
||||
|
||||
if "mysql" in settings.DATABASES.get("default", {}).get("engine", ""):
|
||||
migrations.RunSQL("alter table mfa_user_keys modify column properties json;")
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
dependencies = [
|
||||
('mfa', '0004_user_keys_enabled'),
|
||||
("mfa", "0004_user_keys_enabled"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RemoveField(
|
||||
model_name='user_keys',
|
||||
name='secret_key',
|
||||
model_name="user_keys",
|
||||
name="secret_key",
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='user_keys',
|
||||
name='properties',
|
||||
model_name="user_keys",
|
||||
name="properties",
|
||||
field=jsonfield.fields.JSONField(null=True),
|
||||
),
|
||||
migrations.RunPython(modify_json)
|
||||
migrations.RunPython(modify_json),
|
||||
]
|
||||
|
||||
@@ -7,21 +7,29 @@ from django.db import models, migrations
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('mfa', '0005_auto_20181115_2014'),
|
||||
("mfa", "0005_auto_20181115_2014"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='Trusted_Devices',
|
||||
name="Trusted_Devices",
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('signature', models.CharField(max_length=255)),
|
||||
('key', models.CharField(max_length=6)),
|
||||
('username', models.CharField(max_length=50)),
|
||||
('user_agent', models.CharField(max_length=255)),
|
||||
('status', models.CharField(default=b'adding', max_length=255)),
|
||||
('added_on', models.DateTimeField(auto_now_add=True)),
|
||||
('last_used', models.DateTimeField(default=None, null=True)),
|
||||
(
|
||||
"id",
|
||||
models.AutoField(
|
||||
verbose_name="ID",
|
||||
serialize=False,
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
),
|
||||
),
|
||||
("signature", models.CharField(max_length=255)),
|
||||
("key", models.CharField(max_length=6)),
|
||||
("username", models.CharField(max_length=50)),
|
||||
("user_agent", models.CharField(max_length=255)),
|
||||
("status", models.CharField(default=b"adding", max_length=255)),
|
||||
("added_on", models.DateTimeField(auto_now_add=True)),
|
||||
("last_used", models.DateTimeField(default=None, null=True)),
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
@@ -7,16 +7,16 @@ from django.db import models, migrations
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('mfa', '0006_trusted_devices'),
|
||||
("mfa", "0006_trusted_devices"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.DeleteModel(
|
||||
name='Trusted_Devices',
|
||||
name="Trusted_Devices",
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='user_keys',
|
||||
name='expires',
|
||||
model_name="user_keys",
|
||||
name="expires",
|
||||
field=models.DateTimeField(default=None, null=True, blank=True),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -7,13 +7,13 @@ from django.db import models, migrations
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('mfa', '0007_auto_20181230_1549'),
|
||||
("mfa", "0007_auto_20181230_1549"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='user_keys',
|
||||
name='last_used',
|
||||
model_name="user_keys",
|
||||
name="last_used",
|
||||
field=models.DateTimeField(default=None, null=True, blank=True),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -6,21 +6,23 @@ from django.conf import settings
|
||||
|
||||
|
||||
def update_owned_by_enterprise(apps, schema_editor):
|
||||
user_keys = apps.get_model('mfa', 'user_keys')
|
||||
user_keys.objects.filter(key_type='FIDO2').update(owned_by_enterprise=getattr(settings,"MFA_OWNED_BY_ENTERPRISE",False))
|
||||
user_keys = apps.get_model("mfa", "user_keys")
|
||||
user_keys.objects.filter(key_type="FIDO2").update(
|
||||
owned_by_enterprise=getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False)
|
||||
)
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('mfa', '0008_user_keys_last_used'),
|
||||
("mfa", "0008_user_keys_last_used"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='user_keys',
|
||||
name='owned_by_enterprise',
|
||||
model_name="user_keys",
|
||||
name="owned_by_enterprise",
|
||||
field=models.NullBooleanField(default=None),
|
||||
),
|
||||
migrations.RunPython(update_owned_by_enterprise)
|
||||
migrations.RunPython(update_owned_by_enterprise),
|
||||
]
|
||||
|
||||
@@ -6,13 +6,13 @@ from django.db import migrations, models
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('mfa', '0009_user_keys_owned_by_enterprise'),
|
||||
("mfa", "0009_user_keys_owned_by_enterprise"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name='user_keys',
|
||||
name='key_type',
|
||||
field=models.CharField(default='TOTP', max_length=25),
|
||||
model_name="user_keys",
|
||||
name="key_type",
|
||||
field=models.CharField(default="TOTP", max_length=25),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -6,13 +6,13 @@ from django.db import migrations, models
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('mfa', '0010_auto_20201110_0557'),
|
||||
("mfa", "0010_auto_20201110_0557"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name='user_keys',
|
||||
name='owned_by_enterprise',
|
||||
model_name="user_keys",
|
||||
name="owned_by_enterprise",
|
||||
field=models.BooleanField(blank=True, default=None, null=True),
|
||||
),
|
||||
]
|
||||
|
||||
17
mfa/migrations/0012_rename_user_keys_userkey.py
Normal file
17
mfa/migrations/0012_rename_user_keys_userkey.py
Normal file
@@ -0,0 +1,17 @@
|
||||
# Generated by Django 3.2.4 on 2021-06-23 07:10
|
||||
|
||||
from django.db import migrations
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('mfa', '0011_auto_20210530_0622'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RenameModel(
|
||||
old_name='User_Keys',
|
||||
new_name='UserKey',
|
||||
),
|
||||
]
|
||||
@@ -1,13 +1,10 @@
|
||||
from django.db import models
|
||||
from jsonfield import JSONField
|
||||
from jose import jwt
|
||||
from django.conf import settings
|
||||
#from jsonLookup import shasLookup, hasLookup
|
||||
# JSONField.register_lookup(shasLookup)
|
||||
# JSONField.register_lookup(hasLookup)
|
||||
from django.db import models
|
||||
from jose import jwt
|
||||
from jsonfield import JSONField
|
||||
|
||||
|
||||
class User_Keys(models.Model):
|
||||
class UserKey(models.Model):
|
||||
username = models.CharField(max_length=50)
|
||||
properties = JSONField(null=True)
|
||||
added_on = models.DateTimeField(auto_now_add=True)
|
||||
@@ -17,16 +14,19 @@ class User_Keys(models.Model):
|
||||
last_used = models.DateTimeField(null=True, default=None, blank=True)
|
||||
owned_by_enterprise = models.BooleanField(default=None, null=True, blank=True)
|
||||
|
||||
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
|
||||
if self.key_type == "Trusted Device" and self.properties.get("signature","") == "":
|
||||
self.properties["signature"]= jwt.encode({"username": self.username, "key": self.properties["key"]}, settings.SECRET_KEY)
|
||||
super(User_Keys, self).save(force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields)
|
||||
|
||||
def __unicode__(self):
|
||||
return "%s -- %s"%(self.username,self.key_type)
|
||||
def save(self, *args, **kwargs):
|
||||
if (
|
||||
self.key_type == "Trusted Device"
|
||||
and self.properties.get("signature", "") == ""
|
||||
):
|
||||
self.properties["signature"] = jwt.encode(
|
||||
{"username": self.username, "key": self.properties["key"]},
|
||||
settings.SECRET_KEY,
|
||||
)
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
def __str__(self):
|
||||
return self.__unicode__()
|
||||
return "%s -- %s" % (self.username, self.key_type)
|
||||
|
||||
class Meta:
|
||||
app_label='mfa'
|
||||
app_label = "mfa"
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
from django.test import TestCase
|
||||
|
||||
# Create your tests here.
|
||||
70
mfa/totp.py
70
mfa/totp.py
@@ -1,18 +1,22 @@
|
||||
from django.shortcuts import render
|
||||
from django.views.decorators.cache import never_cache
|
||||
from django.http import HttpResponse
|
||||
from .models import *
|
||||
from django.template.context_processors import csrf
|
||||
import simplejson
|
||||
from django.template.context import RequestContext
|
||||
from django.conf import settings
|
||||
import pyotp
|
||||
from .views import login
|
||||
import datetime
|
||||
from django.utils import timezone
|
||||
import random
|
||||
import time
|
||||
|
||||
import pyotp
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse, JsonResponse
|
||||
from django.shortcuts import render
|
||||
from django.template.context_processors import csrf
|
||||
from django.utils import timezone
|
||||
from django.views.decorators.cache import never_cache
|
||||
|
||||
from .Common import get_redirect_url
|
||||
from .models import UserKey
|
||||
from .views import login
|
||||
|
||||
|
||||
def verify_login(request, username, token):
|
||||
for key in User_Keys.objects.filter(username=username,key_type = "TOTP"):
|
||||
for key in UserKey.objects.filter(username=username, key_type="TOTP"):
|
||||
totp = pyotp.TOTP(key.properties["secret_key"])
|
||||
if totp.verify(token, valid_window=30):
|
||||
key.last_used = timezone.now()
|
||||
@@ -20,55 +24,73 @@ def verify_login(request,username,token):
|
||||
return [True, key.id]
|
||||
return [False]
|
||||
|
||||
|
||||
def recheck(request):
|
||||
context = csrf(request)
|
||||
context["mode"] = "recheck"
|
||||
if request.method == "POST":
|
||||
if verify_login(request, request.user.username, token=request.POST["otp"]):
|
||||
import time
|
||||
request.session["mfa"]["rechecked_at"] = time.time()
|
||||
return HttpResponse(simplejson.dumps({"recheck": True}), content_type="application/json")
|
||||
return JsonResponse({"recheck": True})
|
||||
else:
|
||||
return HttpResponse(simplejson.dumps({"recheck": False}), content_type="application/json")
|
||||
return JsonResponse({"recheck": False})
|
||||
return render(request, "TOTP/recheck.html", context)
|
||||
|
||||
|
||||
@never_cache
|
||||
def auth(request):
|
||||
context = csrf(request)
|
||||
if request.method == "POST":
|
||||
res=verify_login(request,request.session["base_username"],token = request.POST["otp"])
|
||||
res = verify_login(
|
||||
request, request.session["base_username"], token=request.POST["otp"]
|
||||
)
|
||||
if res[0]:
|
||||
mfa = {"verified": True, "method": "TOTP", "id": res[1]}
|
||||
if getattr(settings, "MFA_RECHECK", False):
|
||||
mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now()
|
||||
mfa["next_check"] = datetime.datetime.timestamp(
|
||||
(
|
||||
datetime.datetime.now()
|
||||
+ datetime.timedelta(
|
||||
seconds=random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX))))
|
||||
seconds=random.randint(
|
||||
settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
request.session["mfa"] = mfa
|
||||
return login(request)
|
||||
context["invalid"] = True
|
||||
return render(request, "TOTP/Auth.html", context)
|
||||
|
||||
|
||||
|
||||
def getToken(request):
|
||||
secret_key = pyotp.random_base32()
|
||||
totp = pyotp.TOTP(secret_key)
|
||||
request.session["new_mfa_answer"] = totp.now()
|
||||
return HttpResponse(simplejson.dumps({"qr":pyotp.totp.TOTP(secret_key).provisioning_uri(str(request.user.username), issuer_name = settings.TOKEN_ISSUER_NAME),
|
||||
"secret_key": secret_key}))
|
||||
return JsonResponse(
|
||||
{
|
||||
"qr": pyotp.totp.TOTP(secret_key).provisioning_uri(
|
||||
str(request.user.username), issuer_name=settings.TOKEN_ISSUER_NAME
|
||||
),
|
||||
"secret_key": secret_key,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def verify(request):
|
||||
answer = request.GET["answer"]
|
||||
secret_key = request.GET["key"]
|
||||
totp = pyotp.TOTP(secret_key)
|
||||
if totp.verify(answer, valid_window=60):
|
||||
uk=User_Keys()
|
||||
uk = UserKey()
|
||||
uk.username = request.user.username
|
||||
uk.properties = {"secret_key": secret_key}
|
||||
#uk.name="Authenticatior #%s"%User_Keys.objects.filter(username=user.username,type="TOTP")
|
||||
uk.key_type = "TOTP"
|
||||
uk.save()
|
||||
return HttpResponse("Success")
|
||||
else: return HttpResponse("Error")
|
||||
else:
|
||||
return HttpResponse("Error")
|
||||
|
||||
|
||||
@never_cache
|
||||
def start(request):
|
||||
|
||||
85
mfa/urls.py
85
mfa/urls.py
@@ -1,50 +1,41 @@
|
||||
from . import views,totp,U2F,TrustedDevice,helpers,FIDO2,Email
|
||||
#app_name='mfa'
|
||||
from django.urls import path
|
||||
|
||||
from . import FIDO2, U2F, Email, TrustedDevice, helpers, totp, views
|
||||
|
||||
try:
|
||||
from django.urls import re_path as url
|
||||
except:
|
||||
from django.conf.urls import url
|
||||
urlpatterns = [
|
||||
url(r'totp/start/', totp.start , name="start_new_otop"),
|
||||
url(r'totp/getToken', totp.getToken , name="get_new_otop"),
|
||||
url(r'totp/verify', totp.verify, name="verify_otop"),
|
||||
url(r'totp/auth', totp.auth, name="totp_auth"),
|
||||
url(r'totp/recheck', totp.recheck, name="totp_recheck"),
|
||||
|
||||
url(r'email/start/', Email.start , name="start_email"),
|
||||
url(r'email/auth/', Email.auth , name="email_auth"),
|
||||
|
||||
url(r'u2f/$', U2F.start, name="start_u2f"),
|
||||
url(r'u2f/bind', U2F.bind, name="bind_u2f"),
|
||||
url(r'u2f/auth', U2F.auth, name="u2f_auth"),
|
||||
url(r'u2f/process_recheck', U2F.process_recheck, name="u2f_recheck"),
|
||||
url(r'u2f/verify', U2F.verify, name="u2f_verify"),
|
||||
|
||||
url(r'fido2/$', FIDO2.start, name="start_fido2"),
|
||||
url(r'fido2/auth', FIDO2.auth, name="fido2_auth"),
|
||||
url(r'fido2/begin_auth', FIDO2.authenticate_begin, name="fido2_begin_auth"),
|
||||
url(r'fido2/complete_auth', FIDO2.authenticate_complete, name="fido2_complete_auth"),
|
||||
url(r'fido2/begin_reg', FIDO2.begin_registeration, name="fido2_begin_reg"),
|
||||
url(r'fido2/complete_reg', FIDO2.complete_reg, name="fido2_complete_reg"),
|
||||
url(r'fido2/recheck', FIDO2.recheck, name="fido2_recheck"),
|
||||
|
||||
|
||||
url(r'td/$', TrustedDevice.start, name="start_td"),
|
||||
url(r'td/add', TrustedDevice.add, name="add_td"),
|
||||
url(r'td/send_link', TrustedDevice.send_email, name="td_sendemail"),
|
||||
url(r'td/get-ua', TrustedDevice.getUserAgent, name="td_get_useragent"),
|
||||
url(r'td/trust', TrustedDevice.trust_device, name="td_trust_device"),
|
||||
url(r'u2f/checkTrusted', TrustedDevice.checkTrusted, name="td_checkTrusted"),
|
||||
url(r'u2f/secure_device', TrustedDevice.getCookie, name="td_securedevice"),
|
||||
|
||||
url(r'^$', views.index, name="mfa_home"),
|
||||
url(r'goto/(.*)', views.goto, name="mfa_goto"),
|
||||
url(r'selct_method', views.show_methods, name="mfa_methods_list"),
|
||||
url(r'recheck', helpers.recheck, name="mfa_recheck"),
|
||||
url(r'toggleKey', views.toggleKey, name="toggle_key"),
|
||||
url(r'delete', views.delKey, name="mfa_delKey"),
|
||||
url(r'reset', views.reset_cookie, name="mfa_reset_cookie"),
|
||||
|
||||
path("totp/start/", totp.start, name="start_new_otop"),
|
||||
path("totp/getToken/", totp.get_token, name="get_new_otop"),
|
||||
path("totp/verify/", totp.verify, name="verify_otop"),
|
||||
path("totp/auth/", totp.auth, name="totp_auth"),
|
||||
path("totp/recheck/", totp.recheck, name="totp_recheck"),
|
||||
path("email/start/", Email.start, name="start_email"),
|
||||
path("email/auth/", Email.auth, name="email_auth"),
|
||||
path("u2f/", U2F.start, name="start_u2f"),
|
||||
path("u2f/bind/", U2F.bind, name="bind_u2f"),
|
||||
path("u2f/auth/", U2F.auth, name="u2f_auth"),
|
||||
path("u2f/process_recheck/", U2F.process_recheck, name="u2f_recheck"),
|
||||
path("u2f/verify/", U2F.verify, name="u2f_verify"),
|
||||
path("fido2/", FIDO2.start, name="start_fido2"),
|
||||
path("fido2/auth/", FIDO2.auth, name="fido2_auth"),
|
||||
path("fido2/begin_auth/", FIDO2.authenticate_begin, name="fido2_begin_auth"),
|
||||
path(
|
||||
"fido2/complete_auth/", FIDO2.authenticate_complete, name="fido2_complete_auth"
|
||||
),
|
||||
path("fido2/begin_reg/", FIDO2.begin_registeration, name="fido2_begin_reg"),
|
||||
path("fido2/complete_reg/", FIDO2.complete_reg, name="fido2_complete_reg"),
|
||||
path("fido2/recheck/", FIDO2.recheck, name="fido2_recheck"),
|
||||
path("td/", TrustedDevice.start, name="start_td"),
|
||||
path("td/add/", TrustedDevice.add, name="add_td"),
|
||||
path("td/send_link/", TrustedDevice.send_email, name="td_sendemail"),
|
||||
path("td/get-ua/", TrustedDevice.get_user_agent, name="td_get_useragent"),
|
||||
path("td/trust/", TrustedDevice.trust_device, name="td_trust_device"),
|
||||
path("u2f/checkTrusted/", TrustedDevice.check_trusted, name="td_checkTrusted"),
|
||||
path("u2f/secure_device", TrustedDevice.get_cookie, name="td_securedevice"),
|
||||
path("", views.index, name="mfa_home"),
|
||||
path("goto/<method>/", views.goto, name="mfa_goto"),
|
||||
path("selct_method/", views.show_methods, name="mfa_methods_list"),
|
||||
path("recheck/", helpers.recheck, name="mfa_recheck"),
|
||||
path("toggleKey/", views.toggle_key, name="toggle_key"),
|
||||
path("delete/", views.del_key, name="mfa_delKey"),
|
||||
path("reset/", views.reset_cookie, name="mfa_reset_cookie"),
|
||||
]
|
||||
# print(urlpatterns)
|
||||
56
mfa/views.py
56
mfa/views.py
@@ -1,22 +1,24 @@
|
||||
from django.shortcuts import render
|
||||
from django.http import HttpResponse,HttpResponseRedirect
|
||||
from .models import *
|
||||
try:
|
||||
from django.urls import reverse
|
||||
except:
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.template.context_processors import csrf
|
||||
from django.template.context import RequestContext
|
||||
import importlib
|
||||
|
||||
from django.conf import settings
|
||||
from . import TrustedDevice
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.http import HttpResponse, HttpResponseRedirect
|
||||
from django.shortcuts import render
|
||||
from django.urls import reverse
|
||||
from user_agents import parse
|
||||
|
||||
from . import TrustedDevice
|
||||
from .models import UserKey
|
||||
|
||||
|
||||
@login_required
|
||||
def index(request):
|
||||
keys = []
|
||||
context={"keys":User_Keys.objects.filter(username=request.user.username),"UNALLOWED_AUTHEN_METHODS":settings.MFA_UNALLOWED_METHODS
|
||||
,"HIDE_DISABLE":getattr(settings,"MFA_HIDE_DISABLE",[])}
|
||||
context = {
|
||||
"keys": UserKey.objects.filter(username=request.user.username),
|
||||
"UNALLOWED_AUTHEN_METHODS": settings.MFA_UNALLOWED_METHODS,
|
||||
"HIDE_DISABLE": getattr(settings, "MFA_HIDE_DISABLE", []),
|
||||
}
|
||||
for k in context["keys"]:
|
||||
if k.key_type == "Trusted Device":
|
||||
setattr(k, "device", parse(k.properties.get("user_agent", "-----")))
|
||||
@@ -26,13 +28,15 @@ def index(request):
|
||||
context["keys"] = keys
|
||||
return render(request, "MFA.html", context)
|
||||
|
||||
|
||||
def verify(request, username):
|
||||
request.session["base_username"] = username
|
||||
#request.session["base_password"] = password
|
||||
keys=User_Keys.objects.filter(username=username,enabled=1)
|
||||
keys = UserKey.objects.filter(username=username, enabled=1)
|
||||
methods = list(set([k.key_type for k in keys]))
|
||||
|
||||
if "Trusted Device" in methods and not request.session.get("checked_trusted_device",False):
|
||||
if "Trusted Device" in methods and not request.session.get(
|
||||
"checked_trusted_device", False
|
||||
):
|
||||
if TrustedDevice.verify(request):
|
||||
return login(request)
|
||||
methods.remove("Trusted Device")
|
||||
@@ -41,32 +45,34 @@ def verify(request,username):
|
||||
return HttpResponseRedirect(reverse(methods[0].lower() + "_auth"))
|
||||
return show_methods(request)
|
||||
|
||||
|
||||
def show_methods(request):
|
||||
return render(request, "select_mfa_method.html", {})
|
||||
|
||||
|
||||
def reset_cookie(request):
|
||||
response = HttpResponseRedirect(settings.LOGIN_URL)
|
||||
response.delete_cookie("base_username")
|
||||
return response
|
||||
|
||||
|
||||
def login(request):
|
||||
from django.contrib import auth
|
||||
from django.conf import settings
|
||||
callable_func = __get_callable_function__(settings.MFA_LOGIN_CALLBACK)
|
||||
return callable_func(request, username=request.session["base_username"])
|
||||
|
||||
|
||||
@login_required
|
||||
def delKey(request):
|
||||
key=User_Keys.objects.get(id=request.GET["id"])
|
||||
def del_key(request):
|
||||
key = UserKey.objects.get(id=request.GET["id"])
|
||||
if key.username == request.user.username:
|
||||
key.delete()
|
||||
return HttpResponse("Deleted Successfully")
|
||||
else:
|
||||
return HttpResponse("Error: You own this token so you can't delete it")
|
||||
|
||||
|
||||
def __get_callable_function__(func_path):
|
||||
import importlib
|
||||
if not '.' in func_path:
|
||||
if "." not in func_path:
|
||||
raise Exception("class Name should include modulename.classname")
|
||||
|
||||
parsed_str = func_path.split(".")
|
||||
@@ -77,13 +83,14 @@ def __get_callable_function__(func_path):
|
||||
raise Exception("Module does not have requested function")
|
||||
return callable_func
|
||||
|
||||
|
||||
@login_required
|
||||
def toggleKey(request):
|
||||
def toggle_key(request):
|
||||
id = request.GET["id"]
|
||||
q=User_Keys.objects.filter(username=request.user.username, id=id)
|
||||
q = UserKey.objects.filter(username=request.user.username, id=id)
|
||||
if q.count() == 1:
|
||||
key = q[0]
|
||||
if not key.key_type in settings.MFA_HIDE_DISABLE:
|
||||
if key.key_type not in settings.MFA_HIDE_DISABLE:
|
||||
key.enabled = not key.enabled
|
||||
key.save()
|
||||
return HttpResponse("OK")
|
||||
@@ -92,5 +99,6 @@ def toggleKey(request):
|
||||
else:
|
||||
return HttpResponse("Error")
|
||||
|
||||
|
||||
def goto(request, method):
|
||||
return HttpResponseRedirect(reverse(method.lower() + "_auth"))
|
||||
|
||||
39
setup.py
39
setup.py
@@ -3,29 +3,28 @@
|
||||
from setuptools import find_packages, setup
|
||||
|
||||
setup(
|
||||
name='django-mfa2',
|
||||
version='2.2.0',
|
||||
description='Allows user to add 2FA to their accounts',
|
||||
name="django-mfa2",
|
||||
version="2.2.0",
|
||||
description="Allows user to add 2FA to their accounts",
|
||||
long_description=open("README.md").read(),
|
||||
long_description_content_type="text/markdown",
|
||||
|
||||
author='Mohamed El-Kalioby',
|
||||
author_email = 'mkalioby@mkalioby.com',
|
||||
url = 'https://github.com/mkalioby/django-mfa2/',
|
||||
download_url='https://github.com/mkalioby/django-mfa2/',
|
||||
license='MIT',
|
||||
author="Mohamed El-Kalioby",
|
||||
author_email="mkalioby@mkalioby.com",
|
||||
url="https://github.com/mkalioby/django-mfa2/",
|
||||
download_url="https://github.com/mkalioby/django-mfa2/",
|
||||
license="MIT",
|
||||
packages=find_packages(),
|
||||
install_requires=[
|
||||
'django >= 2.0',
|
||||
'jsonfield',
|
||||
'simplejson',
|
||||
'pyotp',
|
||||
'python-u2flib-server',
|
||||
'ua-parser',
|
||||
'user-agents',
|
||||
'python-jose',
|
||||
'fido2 == 0.9.1',
|
||||
'jsonLookup'
|
||||
"django >= 2.0",
|
||||
"jsonfield",
|
||||
"simplejson",
|
||||
"pyotp",
|
||||
"python-u2flib-server",
|
||||
"ua-parser",
|
||||
"user-agents",
|
||||
"python-jose",
|
||||
"fido2 == 0.9.1",
|
||||
"jsonLookup",
|
||||
],
|
||||
python_requires=">=3.5",
|
||||
include_package_data=True,
|
||||
@@ -48,5 +47,5 @@ setup(
|
||||
"Programming Language :: Python :: 3.7",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Topic :: Software Development :: Libraries :: Python Modules",
|
||||
]
|
||||
],
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user