Compare commits
8 Commits
Better_TOT
...
v2.3.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
85b4a25956 | ||
|
|
7415b169c9 | ||
|
|
3d8d970701 | ||
|
|
3f41cff8c3 | ||
|
|
1c95f196fe | ||
|
|
a841bde6cc | ||
|
|
41b7bd2929 | ||
|
|
6cfc4ff5d4 |
13
.github/workflows/main.yml
vendored
13
.github/workflows/main.yml
vendored
@@ -1,13 +0,0 @@
|
||||
on: [push]
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: python
|
||||
run: |
|
||||
sudo apt-get install python3
|
||||
- name: lint
|
||||
run: |
|
||||
pip install black
|
||||
black --check --exclude migrations mfa/ example/ setup.py
|
||||
@@ -1,12 +1,12 @@
|
||||
# Change Log
|
||||
|
||||
## 2.3.0
|
||||
* Some code cleanup thanks to @xi
|
||||
* Added: MFA_TOTP_FAILURE_WINDOW & MFA_TOTP_FAILURE_LIMIT
|
||||
* Fixed: A missing import Thanks @AndreasDickow
|
||||
* Fixed: `MFA.html` now call `{{block.super}}` for head and content blocks
|
||||
* Added: #55 introduced `mfa_base.html` which will be extended by `MFA.html` for better styling
|
||||
|
||||
## 2.2.0
|
||||
* Added: MFA_REDIRECT_AFTER_REGISTRATION settings parameter
|
||||
* Fixed: Deprecation error for NullBooleanField
|
||||
* Fixed: Deprecation error for NULBooleanField
|
||||
|
||||
## 2.1.2
|
||||
* Fixed: Getting timestamp on Python 3.7 as ("%s") is raising an exception
|
||||
|
||||
90
README.md
90
README.md
@@ -1,7 +1,6 @@
|
||||
# django-mfa2
|
||||
A Django app that handles MFA, it supports TOTP, U2F, FIDO2 U2F (Web Authn), Email Tokens , and Trusted Devices
|
||||
|
||||

|
||||
### Pip Stats
|
||||
[](https://badge.fury.io/py/django-mfa2)
|
||||
[](https://pepy.tech/project/django-mfa2)
|
||||
@@ -61,9 +60,9 @@ Depends on
|
||||
'mfa',
|
||||
'......')
|
||||
```
|
||||
1. Collect Static Files
|
||||
2. Collect Static Files
|
||||
`python manage.py collectstatic`
|
||||
1. Add the following settings to your file
|
||||
3. Add the following settings to your file
|
||||
|
||||
```python
|
||||
MFA_UNALLOWED_METHODS=() # Methods that shouldn't be allowed for the user
|
||||
@@ -78,8 +77,6 @@ Depends on
|
||||
MFA_OWNED_BY_ENTERPRISE = FALSE # Who owns security keys
|
||||
|
||||
TOKEN_ISSUER_NAME="PROJECT_NAME" #TOTP Issuer name
|
||||
MFA_TOTP_FAILURE_LIMIT = 3 # Allowed TOTP Failures / user
|
||||
MFA_TOTP_FAILURE_WINDOW = 5 # The number of minutes to check failed logins against.
|
||||
|
||||
U2F_APPID="https://localhost" #URL For U2F
|
||||
FIDO_SERVER_ID=u"localehost" # Server rp id for FIDO2, it the full domain of your project
|
||||
@@ -98,9 +95,7 @@ Depends on
|
||||
* Starting version 1.7.0, Key owners can be specified.
|
||||
* Starting version 2.2.0
|
||||
* Added: `MFA_SUCCESS_REGISTRATION_MSG` & `MFA_REDIRECT_AFTER_REGISTRATION`
|
||||
* Starting version 2.3.0
|
||||
* Added: `MFA_TOTP_FAILURE_LIMIT` & `MFA_TOTP_FAILURE_WINDOW`
|
||||
1. Break your login function
|
||||
4. Break your login function
|
||||
|
||||
Usually your login function will check for username and password, log the user in if the username and password are correct and create the user session, to support mfa, this has to change
|
||||
|
||||
@@ -110,78 +105,75 @@ Depends on
|
||||
* if user doesn't have mfa then call your function to create the user session
|
||||
|
||||
```python
|
||||
from mfa.helpers import has_mfa
|
||||
|
||||
def login(request): # this function handles the login form POST
|
||||
def login(request): # this function handles the login form POST
|
||||
user = auth.authenticate(username=username, password=password)
|
||||
if user is not None: # if the user object exist
|
||||
res = has_mfa(username=username, request=request) # has_mfa returns false or HttpResponseRedirect
|
||||
if user is not None: # if the user object exist
|
||||
from mfa.helpers import has_mfa
|
||||
res = has_mfa(username = username,request=request) # has_mfa returns false or HttpResponseRedirect
|
||||
if res:
|
||||
return res
|
||||
return log_user_in(request, username=user.username)
|
||||
# log_user_in is a function that handles creating user session, it should be in the setting file as MFA_CALLBACK
|
||||
return log_user_in(request,username=user.username)
|
||||
#log_user_in is a function that handles creatung user session, it should be in the setting file as MFA_CALLBACK
|
||||
```
|
||||
1. Add mfa to urls.py
|
||||
5. Add mfa to urls.py
|
||||
```python
|
||||
import mfa
|
||||
import mfa.TrustedDevice
|
||||
|
||||
urls_patterns = [
|
||||
'...',
|
||||
url(r'^mfa/', include('mfa.urls')),
|
||||
url(r'devices/add$', mfa.TrustedDevice.add,name="mfa_add_new_trusted_device"), # This short link to add new trusted device
|
||||
'....',
|
||||
]
|
||||
```
|
||||
1. Provide `mfa_auth_base.html` in your templates with block called 'head' and 'content'
|
||||
The template will be included during the user login.
|
||||
urls_patterns= [
|
||||
'...',
|
||||
url(r'^mfa/', include('mfa.urls')),
|
||||
url(r'devices/add$', mfa.TrustedDevice.add,name="mfa_add_new_trusted_device"), # This short link to add new trusted device
|
||||
'....',
|
||||
]
|
||||
```
|
||||
6. Provide `mfa_auth_base.html` in your templates with block called 'head' and 'content', The template will be included during the user login, the template shall be close to the login template.
|
||||
If you will use Email Token method, then you have to provide template named `mfa_email_token_template.html` that will content the format of the email with parameter named `user` and `otp`.
|
||||
1. To match the look and feel of your project, MFA includes `base.html` but it needs blocks named `head` & `content` to added its content to it.
|
||||
1. Somewhere in your app, add a link to 'mfa_home'
|
||||
7. To match the look and feel of your project, MFA includes `base.html` but it needs blocks named `head` & `content` to added its content to it.
|
||||
**Note:** Starting v2.3.0, a new template `mfa_base.html` is introduced, this template is used by `MFA.html` so you can control the styling better and current `mfa_base.html` extends `base.html`
|
||||
8. Somewhere in your app, add a link to 'mfa_home'
|
||||
```<li><a href="{% url 'mfa_home' %}">Security</a> </li>```
|
||||
|
||||
For Example, See https://github.com/mkalioby/AutoDeploy/commit/5f1d94b1804e0aa33c79e9e8530ce849d9eb78cc in AutDeploy Project
|
||||
|
||||
For Example, See 'example' app
|
||||
|
||||
# Going Passwordless
|
||||
|
||||
To be able to go passwordless for returning users, create a cookie named 'base_username' containing username as shown in snippet below
|
||||
```python
|
||||
response = render(request, 'Dashboard.html', context))
|
||||
if request.session.get("mfa", {}).get("verified", False) and getattr(settings, "MFA_QUICKLOGIN", False):
|
||||
if request.session["mfa"]["method"] != "Trusted Device":
|
||||
response.set_cookie("base_username", request.user.username, path="/", max_age=15 * 24 * 60 * 60)
|
||||
return response
|
||||
response = render(request, 'Dashboard.html', context))
|
||||
if request.session.get("mfa",{}).get("verified",False) and getattr(settings,"MFA_QUICKLOGIN",False):
|
||||
if request.session["mfa"]["method"]!="Trusted Device":
|
||||
response.set_cookie("base_username", request.user.username, path="/",max_age = 15*24*60*60)
|
||||
return response
|
||||
```
|
||||
|
||||
Second, update the GET part of your login view
|
||||
```python
|
||||
from mfa.helpers import has_mfa
|
||||
|
||||
if "mfa" in settings.INSTALLED_APPS and getattr(settings, "MFA_QUICKLOGIN", False) and request.COOKIES.get('base_username'):
|
||||
username=request.COOKIES.get('base_username')
|
||||
res = has_mfa(username=username, request=request)
|
||||
if res:
|
||||
return res
|
||||
# continue and return the form.
|
||||
if "mfa" in settings.INSTALLED_APPS and getattr(settings,"MFA_QUICKLOGIN",False) and request.COOKIES.get('base_username'):
|
||||
username=request.COOKIES.get('base_username')
|
||||
from mfa.helpers import has_mfa
|
||||
res = has_mfa(username = username,request=request,)
|
||||
if res: return res
|
||||
## continue and return the form.
|
||||
```
|
||||
# Checking MFA on Client Side
|
||||
|
||||
Sometimes you like to verify that the user is still there so simple you can ask django-mfa2 to check that for you
|
||||
|
||||
```html
|
||||
{% include 'mfa_check.html' %}
|
||||
{% include 'mfa_check.html' %}
|
||||
```
|
||||
````js
|
||||
function success_func() {
|
||||
// logic if mfa check succeeds
|
||||
//logic if mfa check succeeds
|
||||
}
|
||||
function fail_func() {
|
||||
// logic if mfa check fails
|
||||
//logic if mfa check fails
|
||||
}
|
||||
function some_func() {
|
||||
recheck_mfa(success_func, fail_func, MUST_BE_MFA)
|
||||
// MUST_BE_MFA true or false, if the user must has with MFA
|
||||
}
|
||||
recheck_mfa(success_func,fail_func,MUST_BE_MFA)
|
||||
//MUST_BE_MFA true or false, if the user must has with MFA
|
||||
}
|
||||
|
||||
````
|
||||
|
||||
@@ -191,6 +183,8 @@ function some_func() {
|
||||
* [swainn](https://github.com/swainn)
|
||||
* [unramk](https://github.com/unramk)
|
||||
* [willingham](https://github.com/willingham)
|
||||
* [AndreasDickow](https://github.com/AndreasDickow)
|
||||
* [mnelson4](https://github.com/mnelson4)
|
||||
|
||||
|
||||
# Security contact information
|
||||
|
||||
@@ -48,7 +48,7 @@
|
||||
'....',
|
||||
]
|
||||
```
|
||||
1. Provide `mfa_auth_base.html` in your templates with block called 'head' and 'content'
|
||||
1. Provide `mfa_auth_base.html` in your templaes with block called 'head' and 'content'
|
||||
The template will be included during the user login.
|
||||
If you will use Email Token method, then you have to provide template named `mfa_email_token_template.html` that will content the format of the email with parameter named `user` and `otp`.
|
||||
1. To match the look and feel of your project, MFA includes `base.html` but it needs blocks named `head` & `content` to added its content to it.
|
||||
|
||||
@@ -1,36 +1,30 @@
|
||||
from django.contrib.auth import authenticate, login, logout
|
||||
from django.contrib.auth.models import User
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.shortcuts import render
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.urls import reverse
|
||||
|
||||
from mfa.helpers import has_mfa
|
||||
|
||||
|
||||
from django.contrib.auth import authenticate,login,logout
|
||||
from django.contrib.auth.models import User
|
||||
def loginView(request):
|
||||
context = {}
|
||||
if request.method == "POST":
|
||||
username = request.POST["username"]
|
||||
password = request.POST["password"]
|
||||
user = authenticate(username=username, password=password)
|
||||
context={}
|
||||
if request.method=="POST":
|
||||
username=request.POST["username"]
|
||||
password=request.POST["password"]
|
||||
user=authenticate(username=username,password=password)
|
||||
if user:
|
||||
res = has_mfa(
|
||||
username=username, request=request
|
||||
) # has_mfa returns false or HttpResponseRedirect
|
||||
from mfa.helpers import has_mfa
|
||||
res = has_mfa(username = username, request = request) # has_mfa returns false or HttpResponseRedirect
|
||||
if res:
|
||||
return res
|
||||
return create_session(request, user.username)
|
||||
context["invalid"] = True
|
||||
return create_session(request,user.username)
|
||||
context["invalid"]=True
|
||||
return render(request, "login.html", context)
|
||||
|
||||
|
||||
def create_session(request, username):
|
||||
user = User.objects.get(username=username)
|
||||
user.backend = "django.contrib.auth.backends.ModelBackend"
|
||||
def create_session(request,username):
|
||||
user=User.objects.get(username=username)
|
||||
user.backend='django.contrib.auth.backends.ModelBackend'
|
||||
login(request, user)
|
||||
return HttpResponseRedirect(reverse("home"))
|
||||
return HttpResponseRedirect(reverse('home'))
|
||||
|
||||
|
||||
def logoutView(request):
|
||||
logout(request)
|
||||
return render(request, "logout.html", {})
|
||||
return render(request,"logout.html",{})
|
||||
@@ -20,7 +20,7 @@ BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
# See https://docs.djangoproject.com/en/2.0/howto/deployment/checklist/
|
||||
|
||||
# SECURITY WARNING: keep the secret key used in production secret!
|
||||
SECRET_KEY = "#9)q!_i3@pr-^3oda(e^3$x!kq3b4f33#5l@+=+&vuz+p6gb3g"
|
||||
SECRET_KEY = '#9)q!_i3@pr-^3oda(e^3$x!kq3b4f33#5l@+=+&vuz+p6gb3g'
|
||||
|
||||
# SECURITY WARNING: don't run with debug turned on in production!
|
||||
DEBUG = True
|
||||
@@ -31,54 +31,54 @@ ALLOWED_HOSTS = []
|
||||
# Application definition
|
||||
|
||||
INSTALLED_APPS = [
|
||||
"django.contrib.admin",
|
||||
"django.contrib.auth",
|
||||
"django.contrib.contenttypes",
|
||||
"django.contrib.sessions",
|
||||
"django.contrib.messages",
|
||||
"django.contrib.staticfiles",
|
||||
"mfa",
|
||||
"sslserver",
|
||||
'django.contrib.admin',
|
||||
'django.contrib.auth',
|
||||
'django.contrib.contenttypes',
|
||||
'django.contrib.sessions',
|
||||
'django.contrib.messages',
|
||||
'django.contrib.staticfiles',
|
||||
'mfa',
|
||||
'sslserver'
|
||||
]
|
||||
|
||||
MIDDLEWARE = [
|
||||
"django.middleware.security.SecurityMiddleware",
|
||||
"django.contrib.sessions.middleware.SessionMiddleware",
|
||||
"django.middleware.common.CommonMiddleware",
|
||||
"django.middleware.csrf.CsrfViewMiddleware",
|
||||
"django.contrib.auth.middleware.AuthenticationMiddleware",
|
||||
"django.contrib.messages.middleware.MessageMiddleware",
|
||||
"django.middleware.clickjacking.XFrameOptionsMiddleware",
|
||||
'django.middleware.security.SecurityMiddleware',
|
||||
'django.contrib.sessions.middleware.SessionMiddleware',
|
||||
'django.middleware.common.CommonMiddleware',
|
||||
'django.middleware.csrf.CsrfViewMiddleware',
|
||||
'django.contrib.auth.middleware.AuthenticationMiddleware',
|
||||
'django.contrib.messages.middleware.MessageMiddleware',
|
||||
'django.middleware.clickjacking.XFrameOptionsMiddleware',
|
||||
]
|
||||
|
||||
ROOT_URLCONF = "example.urls"
|
||||
ROOT_URLCONF = 'example.urls'
|
||||
|
||||
TEMPLATES = [
|
||||
{
|
||||
"BACKEND": "django.template.backends.django.DjangoTemplates",
|
||||
"DIRS": [os.path.join(BASE_DIR, "example", "templates")],
|
||||
"APP_DIRS": True,
|
||||
"OPTIONS": {
|
||||
"context_processors": [
|
||||
"django.template.context_processors.debug",
|
||||
"django.template.context_processors.request",
|
||||
"django.contrib.auth.context_processors.auth",
|
||||
"django.contrib.messages.context_processors.messages",
|
||||
'BACKEND': 'django.template.backends.django.DjangoTemplates',
|
||||
'DIRS': [os.path.join(BASE_DIR ,'example','templates' )],
|
||||
'APP_DIRS': True,
|
||||
'OPTIONS': {
|
||||
'context_processors': [
|
||||
'django.template.context_processors.debug',
|
||||
'django.template.context_processors.request',
|
||||
'django.contrib.auth.context_processors.auth',
|
||||
'django.contrib.messages.context_processors.messages',
|
||||
],
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
WSGI_APPLICATION = "example.wsgi.application"
|
||||
WSGI_APPLICATION = 'example.wsgi.application'
|
||||
|
||||
|
||||
# Database
|
||||
# https://docs.djangoproject.com/en/2.0/ref/settings/#databases
|
||||
|
||||
DATABASES = {
|
||||
"default": {
|
||||
"ENGINE": "django.db.backends.sqlite3",
|
||||
"NAME": "test_db",
|
||||
'default': {
|
||||
'ENGINE': 'django.db.backends.sqlite3',
|
||||
'NAME': 'test_db',
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,28 +86,28 @@ DATABASES = {
|
||||
# Password validation
|
||||
# https://docs.djangoproject.com/en/2.0/ref/settings/#auth-password-validators
|
||||
|
||||
# AUTH_PASSWORD_VALIDATORS = [
|
||||
# {
|
||||
# "NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator",
|
||||
# },
|
||||
# {
|
||||
# "NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
|
||||
# },
|
||||
# {
|
||||
# "NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
|
||||
# },
|
||||
# {
|
||||
# "NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",
|
||||
# },
|
||||
# ]
|
||||
AUTH_PASSWORD_VALIDATORS = [
|
||||
{
|
||||
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
|
||||
},
|
||||
{
|
||||
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
|
||||
},
|
||||
{
|
||||
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
|
||||
},
|
||||
{
|
||||
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# Internationalization
|
||||
# https://docs.djangoproject.com/en/2.0/topics/i18n/
|
||||
|
||||
LANGUAGE_CODE = "en-us"
|
||||
LANGUAGE_CODE = 'en-us'
|
||||
|
||||
TIME_ZONE = "UTC"
|
||||
TIME_ZONE = 'UTC'
|
||||
|
||||
USE_I18N = True
|
||||
|
||||
@@ -119,33 +119,32 @@ USE_TZ = True
|
||||
# Static files (CSS, JavaScript, Images)
|
||||
# https://docs.djangoproject.com/en/2.0/howto/static-files/
|
||||
|
||||
STATIC_URL = "/static/"
|
||||
# STATIC_ROOT=(os.path.join(BASE_DIR,'static'))
|
||||
STATICFILES_DIRS = [os.path.join(BASE_DIR, "static")]
|
||||
LOGIN_URL = "/auth/login"
|
||||
STATIC_URL = '/static/'
|
||||
#STATIC_ROOT=(os.path.join(BASE_DIR,'static'))
|
||||
STATICFILES_DIRS=[os.path.join(BASE_DIR,'static')]
|
||||
LOGIN_URL="/auth/login"
|
||||
|
||||
EMAIL_FROM = "Test App"
|
||||
EMAIL_HOST = "smtp.gmail.com"
|
||||
EMAIL_PORT = 587
|
||||
EMAIL_HOST_USER = ""
|
||||
EMAIL_HOST_PASSWORD = ""
|
||||
EMAIL_USE_TLS = True
|
||||
EMAIL_FROM='Test App'
|
||||
EMAIL_HOST="smtp.gmail.com"
|
||||
EMAIL_PORT=587
|
||||
EMAIL_HOST_USER=""
|
||||
EMAIL_HOST_PASSWORD=''
|
||||
EMAIL_USE_TLS=True
|
||||
|
||||
|
||||
MFA_UNALLOWED_METHODS = () # Methods that shouldn't be allowed for the user
|
||||
MFA_LOGIN_CALLBACK = "example.auth.create_session" # A function that should be called by username to login the user in session
|
||||
MFA_RECHECK = True # Allow random rechecking of the user
|
||||
MFA_RECHECK_MIN = 10 # Minimum interval in seconds
|
||||
MFA_RECHECK_MAX = 30 # Maximum in seconds
|
||||
MFA_QUICKLOGIN = True # Allow quick login for returning users by provide only their 2FA
|
||||
MFA_HIDE_DISABLE = ("",) # Can the user disable his key (Added in 1.2.0).
|
||||
MFA_REDIRECT_AFTER_REGISTRATION = "registered"
|
||||
MFA_SUCCESS_REGISTRATION_MSG = "Go to Home"
|
||||
|
||||
TOKEN_ISSUER_NAME = "PROJECT_NAME" # TOTP Issuer name
|
||||
MFA_UNALLOWED_METHODS=() # Methods that shouldn't be allowed for the user
|
||||
MFA_LOGIN_CALLBACK="example.auth.create_session" # A function that should be called by username to login the user in session
|
||||
MFA_RECHECK=True # Allow random rechecking of the user
|
||||
MFA_RECHECK_MIN=10 # Minimum interval in seconds
|
||||
MFA_RECHECK_MAX=30 # Maximum in seconds
|
||||
MFA_QUICKLOGIN=True # Allow quick login for returning users by provide only their 2FA
|
||||
MFA_HIDE_DISABLE=('',) # Can the user disable his key (Added in 1.2.0).
|
||||
MFA_REDIRECT_AFTER_REGISTRATION="registered"
|
||||
MFA_SUCCESS_REGISTRATION_MSG="Go to Home"
|
||||
|
||||
U2F_APPID = "https://localhost" # URL For U2F
|
||||
FIDO_SERVER_ID = (
|
||||
u"localhost" # Server rp id for FIDO2, it the full domain of your project
|
||||
)
|
||||
FIDO_SERVER_NAME = u"PROJECT_NAME"
|
||||
TOKEN_ISSUER_NAME="PROJECT_NAME" #TOTP Issuer name
|
||||
|
||||
U2F_APPID="https://localhost" #URL For U2F
|
||||
FIDO_SERVER_ID=u"localhost" # Server rp id for FIDO2, it the full domain of your project
|
||||
FIDO_SERVER_NAME=u"PROJECT_NAME"
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
<div class="card-header">Login</div>
|
||||
<div class="card-body">
|
||||
{% block content %}
|
||||
|
||||
{% endblock %}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -14,15 +14,14 @@ Including another URLconf
|
||||
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
|
||||
"""
|
||||
from django.contrib import admin
|
||||
from django.urls import include, path
|
||||
|
||||
from . import auth, views
|
||||
|
||||
from django.urls import path,re_path,include
|
||||
from . import views,auth
|
||||
urlpatterns = [
|
||||
path("admin/", admin.site.urls),
|
||||
path("mfa/", include("mfa.urls")),
|
||||
path("auth/login/", auth.loginView, name="login"),
|
||||
path("auth/logout/", auth.logoutView, name="logout"),
|
||||
path("", views.home, name="home"),
|
||||
path("registered/", views.registered, name="registered"),
|
||||
path('admin/', admin.site.urls),
|
||||
path('mfa/', include('mfa.urls')),
|
||||
path('auth/login',auth.loginView,name="login"),
|
||||
path('auth/logout',auth.logoutView,name="logout"),
|
||||
|
||||
re_path('^$',views.home,name='home'),
|
||||
path('registered/',views.registered,name='registered')
|
||||
]
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.shortcuts import render
|
||||
from django.contrib.auth.decorators import login_required
|
||||
|
||||
|
||||
@login_required()
|
||||
def home(request):
|
||||
return render(request, "home.html", {})
|
||||
|
||||
return render(request,"home.html",{})
|
||||
|
||||
@login_required()
|
||||
def registered(request):
|
||||
return render(request, "home.html", {"registered": True})
|
||||
return render(request,"home.html",{"registered":True})
|
||||
|
||||
0
mfa/ApproveLogin.py
Normal file
0
mfa/ApproveLogin.py
Normal file
@@ -1,22 +1,19 @@
|
||||
from django.conf import settings
|
||||
from django.core.mail import EmailMessage
|
||||
from django.urls import reverse
|
||||
try:
|
||||
from django.urls import reverse
|
||||
except:
|
||||
from django.core.urlresolver import reverse
|
||||
|
||||
|
||||
def send(to, subject, body):
|
||||
def send(to,subject,body):
|
||||
from_email_address = settings.EMAIL_HOST_USER
|
||||
if "@" not in from_email_address:
|
||||
if '@' not in from_email_address:
|
||||
from_email_address = settings.DEFAULT_FROM_EMAIL
|
||||
From = "%s <%s>" % (settings.EMAIL_FROM, from_email_address)
|
||||
email = EmailMessage(subject, body, From, to)
|
||||
email = EmailMessage(subject,body,From,to)
|
||||
email.content_subtype = "html"
|
||||
return email.send(False)
|
||||
|
||||
|
||||
def get_redirect_url():
|
||||
return {
|
||||
"redirect_html": reverse(
|
||||
getattr(settings, "MFA_REDIRECT_AFTER_REGISTRATION", "mfa_home")
|
||||
),
|
||||
"reg_success_msg": getattr(settings, "MFA_SUCCESS_REGISTRATION_MSG"),
|
||||
}
|
||||
return {"redirect_html": reverse(getattr(settings, 'MFA_REDIRECT_AFTER_REGISTRATION', 'mfa_home')),
|
||||
"reg_success_msg":getattr(settings,"MFA_SUCCESS_REGISTRATION_MSG")}
|
||||
|
||||
98
mfa/Email.py
98
mfa/Email.py
@@ -1,90 +1,66 @@
|
||||
import datetime
|
||||
import random
|
||||
from random import randint
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.shortcuts import render
|
||||
from django.template.context_processors import csrf
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from django.views.decorators.cache import never_cache
|
||||
|
||||
from .Common import send
|
||||
from .models import UserKey
|
||||
from django.template.context_processors import csrf
|
||||
import datetime,random
|
||||
from random import randint
|
||||
from .models import *
|
||||
#from django.template.context import RequestContext
|
||||
from .views import login
|
||||
from .Common import send
|
||||
|
||||
|
||||
def send_email(request, username, secret):
|
||||
def sendEmail(request,username,secret):
|
||||
"""Send Email to the user after rendering `mfa_email_token_template`"""
|
||||
from django.contrib.auth import get_user_model
|
||||
User = get_user_model()
|
||||
key = getattr(User, "USERNAME_FIELD", "username")
|
||||
key = getattr(User, 'USERNAME_FIELD', 'username')
|
||||
kwargs = {key: username}
|
||||
user = User.objects.get(**kwargs)
|
||||
res = render(
|
||||
request,
|
||||
"mfa_email_token_template.html",
|
||||
{"request": request, "user": user, "otp": secret},
|
||||
)
|
||||
return send([user.email], "OTP", res.content.decode())
|
||||
|
||||
res=render(request,"mfa_email_token_template.html",{"request":request,"user":user,'otp':secret})
|
||||
return send([user.email],"OTP", res.content.decode())
|
||||
|
||||
@never_cache
|
||||
def start(request):
|
||||
"""Start adding email as a 2nd factor"""
|
||||
context = csrf(request)
|
||||
if request.method == "POST":
|
||||
if request.session["email_secret"] == request.POST["otp"]: # if successful
|
||||
uk = UserKey()
|
||||
uk.username = request.user.username
|
||||
uk.key_type = "Email"
|
||||
uk.enabled = 1
|
||||
if request.session["email_secret"] == request.POST["otp"]: #if successful
|
||||
uk=User_Keys()
|
||||
uk.username=request.user.username
|
||||
uk.key_type="Email"
|
||||
uk.enabled=1
|
||||
uk.save()
|
||||
return HttpResponseRedirect(
|
||||
reverse(
|
||||
getattr(settings, "MFA_REDIRECT_AFTER_REGISTRATION", "mfa_home")
|
||||
)
|
||||
)
|
||||
from django.http import HttpResponseRedirect
|
||||
try:
|
||||
from django.core.urlresolvers import reverse
|
||||
except:
|
||||
from django.urls import reverse
|
||||
return HttpResponseRedirect(reverse(getattr(settings,'MFA_REDIRECT_AFTER_REGISTRATION','mfa_home')))
|
||||
context["invalid"] = True
|
||||
else:
|
||||
request.session["email_secret"] = str(
|
||||
randint(0, 100000)
|
||||
) # generate a random integer
|
||||
if send_email(request, request.user.username, request.session["email_secret"]):
|
||||
request.session["email_secret"] = str(randint(0,100000)) #generate a random integer
|
||||
if sendEmail(request, request.user.username, request.session["email_secret"]):
|
||||
context["sent"] = True
|
||||
return render(request, "Email/Add.html", context)
|
||||
|
||||
|
||||
return render(request,"Email/Add.html", context)
|
||||
@never_cache
|
||||
def auth(request):
|
||||
"""Authenticating the user by email."""
|
||||
context = csrf(request)
|
||||
if request.method == "POST":
|
||||
if request.session["email_secret"] == request.POST["otp"].strip():
|
||||
uk = UserKey.objects.get(
|
||||
username=request.session["base_username"], key_type="Email"
|
||||
)
|
||||
mfa = {"verified": True, "method": "Email", "id": uk.id}
|
||||
context=csrf(request)
|
||||
if request.method=="POST":
|
||||
if request.session["email_secret"]==request.POST["otp"].strip():
|
||||
uk = User_Keys.objects.get(username=request.session["base_username"], key_type="Email")
|
||||
mfa = {"verified": True, "method": "Email","id":uk.id}
|
||||
if getattr(settings, "MFA_RECHECK", False):
|
||||
mfa["next_check"] = datetime.datetime.timestamp(
|
||||
datetime.datetime.now()
|
||||
+ datetime.timedelta(
|
||||
seconds=random.randint(
|
||||
settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX
|
||||
)
|
||||
)
|
||||
)
|
||||
mfa["next_check"] = datetime.datetime.timestamp(datetime.datetime.now() + datetime.timedelta(
|
||||
seconds = random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX)))
|
||||
request.session["mfa"] = mfa
|
||||
|
||||
uk.last_used = timezone.now()
|
||||
from django.utils import timezone
|
||||
uk.last_used=timezone.now()
|
||||
uk.save()
|
||||
return login(request)
|
||||
context["invalid"] = True
|
||||
context["invalid"]=True
|
||||
else:
|
||||
request.session["email_secret"] = str(randint(0, 100000))
|
||||
if send_email(
|
||||
request, request.session["base_username"], request.session["email_secret"]
|
||||
):
|
||||
if sendEmail(request, request.session["base_username"], request.session["email_secret"]):
|
||||
context["sent"] = True
|
||||
return render(request, "Email/Auth.html", context)
|
||||
return render(request,"Email/Auth.html", context)
|
||||
|
||||
186
mfa/FIDO2.py
186
mfa/FIDO2.py
@@ -1,23 +1,21 @@
|
||||
import datetime
|
||||
import random
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse, JsonResponse
|
||||
from django.shortcuts import render
|
||||
from django.template.context_processors import csrf
|
||||
from django.utils import timezone
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from fido2 import cbor
|
||||
from fido2.client import ClientData
|
||||
from fido2.ctap2 import AttestationObject, AttestedCredentialData, AuthenticatorData
|
||||
from fido2.server import Fido2Server, PublicKeyCredentialRpEntity
|
||||
from fido2.ctap2 import AttestationObject, AuthenticatorData
|
||||
from django.template.context_processors import csrf
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from django.shortcuts import render
|
||||
# from django.template.context import RequestContext
|
||||
import simplejson
|
||||
from fido2 import cbor
|
||||
from django.http import HttpResponse
|
||||
from django.conf import settings
|
||||
from .models import *
|
||||
from fido2.utils import websafe_decode, websafe_encode
|
||||
|
||||
from .Common import get_redirect_url
|
||||
from .models import UserKey
|
||||
from fido2.ctap2 import AttestedCredentialData
|
||||
from .views import login, reset_cookie
|
||||
import datetime
|
||||
from .Common import get_redirect_url
|
||||
from django.utils import timezone
|
||||
|
||||
|
||||
def recheck(request):
|
||||
@@ -28,7 +26,7 @@ def recheck(request):
|
||||
return render(request, "FIDO2/recheck.html", context)
|
||||
|
||||
|
||||
def get_server():
|
||||
def getServer():
|
||||
"""Get Server Info from settings and returns a Fido2Server"""
|
||||
rp = PublicKeyCredentialRpEntity(settings.FIDO_SERVER_ID, settings.FIDO_SERVER_NAME)
|
||||
return Fido2Server(rp)
|
||||
@@ -36,20 +34,15 @@ def get_server():
|
||||
|
||||
def begin_registeration(request):
|
||||
"""Starts registering a new FIDO Device, called from API"""
|
||||
server = get_server()
|
||||
registration_data, state = server.register_begin(
|
||||
{
|
||||
u"id": request.user.username.encode("utf8"),
|
||||
u"name": (request.user.first_name + " " + request.user.last_name),
|
||||
u"displayName": request.user.username,
|
||||
},
|
||||
get_user_credentials(request.user.username),
|
||||
)
|
||||
request.session["fido_state"] = state
|
||||
server = getServer()
|
||||
registration_data, state = server.register_begin({
|
||||
u'id': request.user.username.encode("utf8"),
|
||||
u'name': (request.user.first_name + " " + request.user.last_name),
|
||||
u'displayName': request.user.username,
|
||||
}, getUserCredentials(request.user.username))
|
||||
request.session['fido_state'] = state
|
||||
|
||||
return HttpResponse(
|
||||
cbor.encode(registration_data), content_type="application/octet-stream"
|
||||
)
|
||||
return HttpResponse(cbor.encode(registration_data), content_type = 'application/octet-stream')
|
||||
|
||||
|
||||
@csrf_exempt
|
||||
@@ -58,28 +51,29 @@ def complete_reg(request):
|
||||
try:
|
||||
data = cbor.decode(request.body)
|
||||
|
||||
client_data = ClientData(data["clientDataJSON"])
|
||||
att_obj = AttestationObject((data["attestationObject"]))
|
||||
server = get_server()
|
||||
client_data = ClientData(data['clientDataJSON'])
|
||||
att_obj = AttestationObject((data['attestationObject']))
|
||||
server = getServer()
|
||||
auth_data = server.register_complete(
|
||||
request.session["fido_state"], client_data, att_obj
|
||||
request.session['fido_state'],
|
||||
client_data,
|
||||
att_obj
|
||||
)
|
||||
encoded = websafe_encode(auth_data.credential_data)
|
||||
uk = UserKey()
|
||||
uk = User_Keys()
|
||||
uk.username = request.user.username
|
||||
uk.properties = {
|
||||
"device": encoded,
|
||||
"type": att_obj.fmt,
|
||||
}
|
||||
uk.properties = {"device": encoded, "type": att_obj.fmt, }
|
||||
uk.owned_by_enterprise = getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False)
|
||||
uk.key_type = "FIDO2"
|
||||
uk.save()
|
||||
return JsonResponse({"status": "OK"})
|
||||
return HttpResponse(simplejson.dumps({'status': 'OK'}))
|
||||
except Exception as exp:
|
||||
print(traceback.format_exc())
|
||||
return JsonResponse(
|
||||
{"status": "ERR", "message": "Error on server, please try again later"}
|
||||
)
|
||||
try:
|
||||
from raven.contrib.django.raven_compat.models import client
|
||||
client.captureException()
|
||||
except:
|
||||
pass
|
||||
return HttpResponse(simplejson.dumps({'status': 'ERR', "message": "Error on server, please try again later"}))
|
||||
|
||||
|
||||
def start(request):
|
||||
@@ -89,12 +83,10 @@ def start(request):
|
||||
return render(request, "FIDO2/Add.html", context)
|
||||
|
||||
|
||||
def get_user_credentials(username):
|
||||
def getUserCredentials(username):
|
||||
credentials = []
|
||||
for uk in UserKey.objects.filter(username=username, key_type="FIDO2"):
|
||||
credentials.append(
|
||||
AttestedCredentialData(websafe_decode(uk.properties["device"]))
|
||||
)
|
||||
for uk in User_Keys.objects.filter(username = username, key_type = "FIDO2"):
|
||||
credentials.append(AttestedCredentialData(websafe_decode(uk.properties["device"])))
|
||||
return credentials
|
||||
|
||||
|
||||
@@ -104,13 +96,11 @@ def auth(request):
|
||||
|
||||
|
||||
def authenticate_begin(request):
|
||||
server = get_server()
|
||||
credentials = get_user_credentials(
|
||||
request.session.get("base_username", request.user.username)
|
||||
)
|
||||
server = getServer()
|
||||
credentials = getUserCredentials(request.session.get("base_username", request.user.username))
|
||||
auth_data, state = server.authenticate_begin(credentials)
|
||||
request.session["fido_state"] = state
|
||||
return HttpResponse(cbor.encode(auth_data), content_type="application/octet-stream")
|
||||
request.session['fido_state'] = state
|
||||
return HttpResponse(cbor.encode(auth_data), content_type = "application/octet-stream")
|
||||
|
||||
|
||||
@csrf_exempt
|
||||
@@ -118,70 +108,64 @@ def authenticate_complete(request):
|
||||
try:
|
||||
credentials = []
|
||||
username = request.session.get("base_username", request.user.username)
|
||||
server = get_server()
|
||||
credentials = get_user_credentials(username)
|
||||
server = getServer()
|
||||
credentials = getUserCredentials(username)
|
||||
data = cbor.decode(request.body)
|
||||
credential_id = data["credentialId"]
|
||||
client_data = ClientData(data["clientDataJSON"])
|
||||
auth_data = AuthenticatorData(data["authenticatorData"])
|
||||
signature = data["signature"]
|
||||
credential_id = data['credentialId']
|
||||
client_data = ClientData(data['clientDataJSON'])
|
||||
auth_data = AuthenticatorData(data['authenticatorData'])
|
||||
signature = data['signature']
|
||||
try:
|
||||
cred = server.authenticate_complete(
|
||||
request.session.pop("fido_state"),
|
||||
request.session.pop('fido_state'),
|
||||
credentials,
|
||||
credential_id,
|
||||
client_data,
|
||||
auth_data,
|
||||
signature,
|
||||
signature
|
||||
)
|
||||
except ValueError:
|
||||
return JsonResponse(
|
||||
{
|
||||
"status": "ERR",
|
||||
"message": "Wrong challenge received, make sure that this is your security and try again.",
|
||||
}
|
||||
)
|
||||
return HttpResponse(simplejson.dumps({'status': "ERR",
|
||||
"message": "Wrong challenge received, make sure that this is your security and try again."}),
|
||||
content_type = "application/json")
|
||||
except Exception as excep:
|
||||
print(traceback.format_exc())
|
||||
return JsonResponse({"status": "ERR", "message": excep.message})
|
||||
try:
|
||||
from raven.contrib.django.raven_compat.models import client
|
||||
client.captureException()
|
||||
except:
|
||||
pass
|
||||
return HttpResponse(simplejson.dumps({'status': "ERR",
|
||||
"message": excep.message}),
|
||||
content_type = "application/json")
|
||||
|
||||
if request.session.get("mfa_recheck", False):
|
||||
import time
|
||||
request.session["mfa"]["rechecked_at"] = time.time()
|
||||
return JsonResponse({"status": "OK"})
|
||||
return HttpResponse(simplejson.dumps({'status': "OK"}),
|
||||
content_type = "application/json")
|
||||
else:
|
||||
keys = UserKey.objects.filter(
|
||||
username=username, key_type="FIDO2", enabled=1
|
||||
)
|
||||
import random
|
||||
keys = User_Keys.objects.filter(username = username, key_type = "FIDO2", enabled = 1)
|
||||
for k in keys:
|
||||
if (
|
||||
AttestedCredentialData(
|
||||
websafe_decode(k.properties["device"])
|
||||
).credential_id
|
||||
== cred.credential_id
|
||||
):
|
||||
if AttestedCredentialData(websafe_decode(k.properties["device"])).credential_id == cred.credential_id:
|
||||
k.last_used = timezone.now()
|
||||
k.save()
|
||||
mfa = {"verified": True, "method": "FIDO2", "id": k.id}
|
||||
mfa = {"verified": True, "method": "FIDO2", 'id': k.id}
|
||||
if getattr(settings, "MFA_RECHECK", False):
|
||||
mfa["next_check"] = datetime.datetime.timestamp(
|
||||
(
|
||||
datetime.datetime.now()
|
||||
+ datetime.timedelta(
|
||||
seconds=random.randint(
|
||||
settings.MFA_RECHECK_MIN,
|
||||
settings.MFA_RECHECK_MAX,
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now() + datetime.timedelta(
|
||||
seconds = random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX))))
|
||||
request.session["mfa"] = mfa
|
||||
if not request.user.is_authenticated:
|
||||
try:
|
||||
authenticated = request.user.is_authenticated
|
||||
except:
|
||||
authenticated = request.user.is_authenticated()
|
||||
if not authenticated:
|
||||
res = login(request)
|
||||
if "location" not in res:
|
||||
return reset_cookie(request)
|
||||
return JsonResponse(
|
||||
{"status": "OK", "redirect": res["location"]}
|
||||
)
|
||||
return JsonResponse({"status": "OK"})
|
||||
if not "location" in res: return reset_cookie(request)
|
||||
return HttpResponse(simplejson.dumps({'status': "OK", "redirect": res["location"]}),
|
||||
content_type = "application/json")
|
||||
return HttpResponse(simplejson.dumps({'status': "OK"}),
|
||||
content_type = "application/json")
|
||||
except Exception as exp:
|
||||
return JsonResponse({"status": "ERR", "message": str(exp)})
|
||||
return HttpResponse(simplejson.dumps({'status': "ERR", "message": exp.message}),
|
||||
content_type = "application/json")
|
||||
|
||||
@@ -1,162 +1,134 @@
|
||||
import random
|
||||
import string
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import user_agents
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse
|
||||
import random
|
||||
from django.shortcuts import render
|
||||
from django.http import HttpResponse
|
||||
from django.template.context import RequestContext
|
||||
from django.template.context_processors import csrf
|
||||
from .models import *
|
||||
import user_agents
|
||||
from django.utils import timezone
|
||||
from jose import jwt
|
||||
|
||||
from .Common import send
|
||||
from .models import UserKey
|
||||
|
||||
|
||||
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
|
||||
x = "".join(random.choice(chars) for _ in range(size))
|
||||
if not UserKey.objects.filter(properties__shas="$.key=" + x).exists():
|
||||
return x
|
||||
else:
|
||||
return id_generator(size, chars)
|
||||
x=''.join(random.choice(chars) for _ in range(size))
|
||||
if not User_Keys.objects.filter(properties__shas="$.key="+x).exists(): return x
|
||||
else: return id_generator(size,chars)
|
||||
|
||||
|
||||
def get_user_agent(request):
|
||||
id = id = request.session.get("td_id", None)
|
||||
def getUserAgent(request):
|
||||
id=id=request.session.get("td_id",None)
|
||||
if id:
|
||||
tk = UserKey.objects.get(id=id)
|
||||
if tk.properties.get("user_agent", "") != "":
|
||||
tk=User_Keys.objects.get(id=id)
|
||||
if tk.properties.get("user_agent","")!="":
|
||||
ua = user_agents.parse(tk.properties["user_agent"])
|
||||
res = render(None, "TrustedDevices/user-agent.html", context={"ua": ua})
|
||||
res = render(None, "TrustedDevices/user-agent.html", context={"ua":ua})
|
||||
return HttpResponse(res)
|
||||
return HttpResponse("")
|
||||
|
||||
|
||||
def trust_device(request):
|
||||
tk = UserKey.objects.get(id=request.session["td_id"])
|
||||
tk.properties["status"] = "trusted"
|
||||
tk = User_Keys.objects.get(id=request.session["td_id"])
|
||||
tk.properties["status"]="trusted"
|
||||
tk.save()
|
||||
del request.session["td_id"]
|
||||
return HttpResponse("OK")
|
||||
|
||||
|
||||
def check_trusted(request):
|
||||
def checkTrusted(request):
|
||||
res = ""
|
||||
id = request.session.get("td_id", "")
|
||||
if id != "":
|
||||
id=request.session.get("td_id","")
|
||||
if id!="":
|
||||
try:
|
||||
tk = UserKey.objects.get(id=id)
|
||||
if tk.properties["status"] == "trusted":
|
||||
res = "OK"
|
||||
tk = User_Keys.objects.get(id=id)
|
||||
if tk.properties["status"] == "trusted": res = "OK"
|
||||
except:
|
||||
pass
|
||||
return HttpResponse(res)
|
||||
|
||||
|
||||
def get_cookie(request):
|
||||
tk = UserKey.objects.get(id=request.session["td_id"])
|
||||
def getCookie(request):
|
||||
tk = User_Keys.objects.get(id=request.session["td_id"])
|
||||
|
||||
if tk.properties["status"] == "trusted":
|
||||
context = {"added": True}
|
||||
response = render(request, "TrustedDevices/Done.html", context)
|
||||
|
||||
context={"added":True}
|
||||
response = render(request,"TrustedDevices/Done.html", context)
|
||||
from datetime import datetime, timedelta
|
||||
expires = datetime.now() + timedelta(days=180)
|
||||
tk.expires = expires
|
||||
tk.expires=expires
|
||||
tk.save()
|
||||
response.set_cookie("deviceid", tk.properties["signature"], expires=expires)
|
||||
return response
|
||||
|
||||
|
||||
def add(request):
|
||||
context = csrf(request)
|
||||
if request.method == "GET":
|
||||
return render(request, "TrustedDevices/Add.html", context)
|
||||
context=csrf(request)
|
||||
if request.method=="GET":
|
||||
return render(request,"TrustedDevices/Add.html",context)
|
||||
else:
|
||||
key = request.POST["key"].replace("-", "").replace(" ", "").upper()
|
||||
key=request.POST["key"].replace("-","").replace(" ","").upper()
|
||||
context["username"] = request.POST["username"]
|
||||
context["key"] = request.POST["key"]
|
||||
trusted_keys = UserKey.objects.filter(
|
||||
username=request.POST["username"], properties__has="$.key=" + key
|
||||
)
|
||||
cookie = False
|
||||
trusted_keys=User_Keys.objects.filter(username=request.POST["username"],properties__has="$.key="+key)
|
||||
cookie=False
|
||||
if trusted_keys.exists():
|
||||
tk = trusted_keys[0]
|
||||
request.session["td_id"] = tk.id
|
||||
ua = request.META["HTTP_USER_AGENT"]
|
||||
agent = user_agents.parse(ua)
|
||||
tk=trusted_keys[0]
|
||||
request.session["td_id"]=tk.id
|
||||
ua=request.META['HTTP_USER_AGENT']
|
||||
agent=user_agents.parse(ua)
|
||||
if agent.is_pc:
|
||||
context["invalid"] = "This is a PC, it can't used as a trusted device."
|
||||
context["invalid"]="This is a PC, it can't used as a trusted device."
|
||||
else:
|
||||
tk.properties["user_agent"] = ua
|
||||
tk.properties["user_agent"]=ua
|
||||
tk.save()
|
||||
context["success"] = True
|
||||
context["success"]=True
|
||||
# tk.properties["user_agent"]=ua
|
||||
# tk.save()
|
||||
# context["success"]=True
|
||||
|
||||
else:
|
||||
context[
|
||||
"invalid"
|
||||
] = "The username or key is wrong, please check and try again."
|
||||
|
||||
return render(request, "TrustedDevices/Add.html", context)
|
||||
context["invalid"]="The username or key is wrong, please check and try again."
|
||||
|
||||
return render(request,"TrustedDevices/Add.html", context)
|
||||
|
||||
def start(request):
|
||||
if (
|
||||
UserKey.objects.filter(
|
||||
username=request.user.username, key_type="Trusted Device"
|
||||
).count()
|
||||
>= 2
|
||||
):
|
||||
return render(request, "TrustedDevices/start.html", {"not_allowed": True})
|
||||
td = None
|
||||
if not request.session.get("td_id", None):
|
||||
td = UserKey()
|
||||
td.username = request.user.username
|
||||
td.properties = {"key": id_generator(), "status": "adding"}
|
||||
td.key_type = "Trusted Device"
|
||||
if User_Keys.objects.filter(username=request.user.username,key_type="Trusted Device").count()>= 2:
|
||||
return render(request,"TrustedDevices/start.html",{"not_allowed":True})
|
||||
td=None
|
||||
if not request.session.get("td_id",None):
|
||||
td=User_Keys()
|
||||
td.username=request.user.username
|
||||
td.properties={"key":id_generator(),"status":"adding"}
|
||||
td.key_type="Trusted Device"
|
||||
td.save()
|
||||
request.session["td_id"] = td.id
|
||||
request.session["td_id"]=td.id
|
||||
try:
|
||||
if td is None:
|
||||
td = UserKey.objects.get(id=request.session["td_id"])
|
||||
context = {"key": td.properties["key"]}
|
||||
if td==None: td=User_Keys.objects.get(id=request.session["td_id"])
|
||||
context={"key":td.properties["key"]}
|
||||
except:
|
||||
del request.session["td_id"]
|
||||
return start(request)
|
||||
return render(request, "TrustedDevices/start.html", context)
|
||||
|
||||
return render(request,"TrustedDevices/start.html",context)
|
||||
|
||||
def send_email(request):
|
||||
body = render(request, "TrustedDevices/email.html", {}).content
|
||||
e = request.user.email
|
||||
if e == "":
|
||||
e = request.session.get("user", {}).get("email", "")
|
||||
if e == "":
|
||||
body=render(request,"TrustedDevices/email.html",{}).content
|
||||
from .Common import send
|
||||
e=request.user.email
|
||||
if e=="":
|
||||
e=request.session.get("user",{}).get("email","")
|
||||
if e=="":
|
||||
res = "User has no email on the system."
|
||||
elif send([e], "Add Trusted Device Link", body):
|
||||
res = "Sent Successfully"
|
||||
elif send([e],"Add Trusted Device Link",body):
|
||||
res="Sent Successfully"
|
||||
else:
|
||||
res = "Error occured, please try again later."
|
||||
res="Error occured, please try again later."
|
||||
return HttpResponse(res)
|
||||
|
||||
|
||||
def verify(request):
|
||||
if request.COOKIES.get("deviceid", None):
|
||||
json = jwt.decode(request.COOKIES.get("deviceid"), settings.SECRET_KEY)
|
||||
if json["username"].lower() == request.session["base_username"].lower():
|
||||
if request.COOKIES.get('deviceid',None):
|
||||
from jose import jwt
|
||||
json= jwt.decode(request.COOKIES.get('deviceid'),settings.SECRET_KEY)
|
||||
if json["username"].lower()== request.session['base_username'].lower():
|
||||
try:
|
||||
uk = UserKey.objects.get(
|
||||
username=request.POST["username"].lower(),
|
||||
properties__has="$.key=" + json["key"],
|
||||
)
|
||||
uk = User_Keys.objects.get(username=request.POST["username"].lower(), properties__has="$.key=" + json["key"])
|
||||
if uk.enabled and uk.properties["status"] == "trusted":
|
||||
uk.last_used = timezone.now()
|
||||
uk.last_used=timezone.now()
|
||||
uk.save()
|
||||
request.session["mfa"] = {
|
||||
"verified": True,
|
||||
"method": "Trusted Device",
|
||||
"id": uk.id,
|
||||
}
|
||||
request.session["mfa"] = {"verified": True, "method": "Trusted Device","id":uk.id}
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
133
mfa/U2F.py
133
mfa/U2F.py
@@ -1,141 +1,112 @@
|
||||
import datetime
|
||||
import hashlib
|
||||
import random
|
||||
import time
|
||||
|
||||
import simplejson
|
||||
from u2flib_server.u2f import (begin_registration, begin_authentication,
|
||||
complete_registration, complete_authentication)
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.serialization import Encoding
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse, JsonResponse
|
||||
from django.shortcuts import render
|
||||
import simplejson
|
||||
#from django.template.context import RequestContext
|
||||
from django.template.context_processors import csrf
|
||||
from django.utils import timezone
|
||||
from u2flib_server.u2f import (
|
||||
begin_authentication,
|
||||
begin_registration,
|
||||
complete_authentication,
|
||||
complete_registration,
|
||||
)
|
||||
|
||||
from .Common import get_redirect_url
|
||||
from .models import UserKey
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse
|
||||
from .models import *
|
||||
from .views import login
|
||||
|
||||
from .Common import get_redirect_url
|
||||
import datetime
|
||||
from django.utils import timezone
|
||||
|
||||
def recheck(request):
|
||||
context = csrf(request)
|
||||
context["mode"] = "recheck"
|
||||
context["mode"]="recheck"
|
||||
s = sign(request.user.username)
|
||||
request.session["_u2f_challenge_"] = s[0]
|
||||
context["token"] = s[1]
|
||||
request.session["mfa_recheck"] = True
|
||||
return render(request, "U2F/recheck.html", context)
|
||||
|
||||
request.session["mfa_recheck"]=True
|
||||
return render(request,"U2F/recheck.html", context)
|
||||
|
||||
def process_recheck(request):
|
||||
x = validate(request, request.user.username)
|
||||
if x is True:
|
||||
x=validate(request,request.user.username)
|
||||
if x==True:
|
||||
import time
|
||||
request.session["mfa"]["rechecked_at"] = time.time()
|
||||
return JsonResponse({"recheck": True})
|
||||
return HttpResponse(simplejson.dumps({"recheck":True}),content_type="application/json")
|
||||
return x
|
||||
|
||||
|
||||
def check_errors(request, data):
|
||||
if "errorCode" in data:
|
||||
if data["errorCode"] == 0:
|
||||
return True
|
||||
if data["errorCode"] == 0: return True
|
||||
if data["errorCode"] == 4:
|
||||
return HttpResponse("Invalid Security Key")
|
||||
if data["errorCode"] == 1:
|
||||
return auth(request)
|
||||
return True
|
||||
def validate(request,username):
|
||||
import datetime, random
|
||||
|
||||
|
||||
def validate(request, username):
|
||||
data = simplejson.loads(request.POST["response"])
|
||||
|
||||
res = check_errors(request, data)
|
||||
if res is not True:
|
||||
res= check_errors(request,data)
|
||||
if res!=True:
|
||||
return res
|
||||
|
||||
challenge = request.session.pop("_u2f_challenge_")
|
||||
challenge = request.session.pop('_u2f_challenge_')
|
||||
device, c, t = complete_authentication(challenge, data, [settings.U2F_APPID])
|
||||
|
||||
key = UserKey.objects.get(
|
||||
username=username,
|
||||
properties__shas="$.device.publicKey=%s" % device["publicKey"],
|
||||
)
|
||||
key.last_used = timezone.now()
|
||||
key=User_Keys.objects.get(username=username,properties__shas="$.device.publicKey=%s"%device["publicKey"])
|
||||
key.last_used=timezone.now()
|
||||
key.save()
|
||||
mfa = {"verified": True, "method": "U2F", "id": key.id}
|
||||
mfa = {"verified": True, "method": "U2F","id":key.id}
|
||||
if getattr(settings, "MFA_RECHECK", False):
|
||||
mfa["next_check"] = datetime.datetime.timestamp(
|
||||
(
|
||||
datetime.datetime.now()
|
||||
+ datetime.timedelta(
|
||||
seconds=random.randint(
|
||||
settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now()
|
||||
+ datetime.timedelta(
|
||||
seconds=random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX))))
|
||||
request.session["mfa"] = mfa
|
||||
return True
|
||||
|
||||
|
||||
def auth(request):
|
||||
context = csrf(request)
|
||||
s = sign(request.session["base_username"])
|
||||
request.session["_u2f_challenge_"] = s[0]
|
||||
context["token"] = s[1]
|
||||
|
||||
return render(request, "U2F/Auth.html")
|
||||
context=csrf(request)
|
||||
s=sign(request.session["base_username"])
|
||||
request.session["_u2f_challenge_"]=s[0]
|
||||
context["token"]=s[1]
|
||||
|
||||
return render(request,"U2F/Auth.html")
|
||||
|
||||
def start(request):
|
||||
enroll = begin_registration(settings.U2F_APPID, [])
|
||||
request.session["_u2f_enroll_"] = enroll.json
|
||||
context = csrf(request)
|
||||
context["token"] = simplejson.dumps(enroll.data_for_client)
|
||||
request.session['_u2f_enroll_'] = enroll.json
|
||||
context=csrf(request)
|
||||
context["token"]=simplejson.dumps(enroll.data_for_client)
|
||||
context.update(get_redirect_url())
|
||||
return render(request, "U2F/Add.html", context)
|
||||
return render(request,"U2F/Add.html",context)
|
||||
|
||||
|
||||
def bind(request):
|
||||
enroll = request.session["_u2f_enroll_"]
|
||||
data = simplejson.loads(request.POST["response"])
|
||||
import hashlib
|
||||
enroll = request.session['_u2f_enroll_']
|
||||
data=simplejson.loads(request.POST["response"])
|
||||
device, cert = complete_registration(enroll, data, [settings.U2F_APPID])
|
||||
cert = x509.load_der_x509_certificate(cert, default_backend())
|
||||
cert_hash = hashlib.md5(cert.public_bytes(Encoding.PEM)).hexdigest()
|
||||
q = UserKey.objects.filter(key_type="U2F", properties__icontains=cert_hash)
|
||||
cert_hash=hashlib.md5(cert.public_bytes(Encoding.PEM)).hexdigest()
|
||||
q=User_Keys.objects.filter(key_type="U2F", properties__icontains= cert_hash)
|
||||
if q.exists():
|
||||
return HttpResponse(
|
||||
"This key is registered before, it can't be registered again."
|
||||
)
|
||||
UserKey.objects.filter(username=request.user.username, key_type="U2F").delete()
|
||||
uk = UserKey()
|
||||
return HttpResponse("This key is registered before, it can't be registered again.")
|
||||
User_Keys.objects.filter(username=request.user.username,key_type="U2F").delete()
|
||||
uk = User_Keys()
|
||||
uk.username = request.user.username
|
||||
uk.owned_by_enterprise = getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False)
|
||||
uk.properties = {"device": simplejson.loads(device.json), "cert": cert_hash}
|
||||
uk.properties = {"device":simplejson.loads(device.json),"cert":cert_hash}
|
||||
uk.key_type = "U2F"
|
||||
uk.save()
|
||||
return HttpResponse("OK")
|
||||
|
||||
|
||||
def sign(username):
|
||||
u2f_devices = [
|
||||
d.properties["device"]
|
||||
for d in UserKey.objects.filter(username=username, key_type="U2F")
|
||||
]
|
||||
u2f_devices=[d.properties["device"] for d in User_Keys.objects.filter(username=username,key_type="U2F")]
|
||||
challenge = begin_authentication(settings.U2F_APPID, u2f_devices)
|
||||
return [challenge.json, simplejson.dumps(challenge.data_for_client)]
|
||||
|
||||
return [challenge.json,simplejson.dumps(challenge.data_for_client)]
|
||||
|
||||
def verify(request):
|
||||
x = validate(request, request.session["base_username"])
|
||||
if x is True:
|
||||
x= validate(request,request.session["base_username"])
|
||||
if x==True:
|
||||
return login(request)
|
||||
else:
|
||||
return x
|
||||
else: return x
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "2.2.0"
|
||||
__version__="2.2.0"
|
||||
|
||||
3
mfa/admin.py
Normal file
3
mfa/admin.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from django.contrib import admin
|
||||
|
||||
# Register your models here.
|
||||
@@ -1,6 +1,4 @@
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class myAppNameConfig(AppConfig):
|
||||
name = "mfa"
|
||||
verbose_name = "A Much Better Name"
|
||||
name = 'mfa'
|
||||
verbose_name = 'A Much Better Name'
|
||||
@@ -1,32 +1,32 @@
|
||||
from django.http import JsonResponse
|
||||
|
||||
from . import FIDO2, U2F, TrustedDevice, totp
|
||||
from .models import UserKey
|
||||
from .views import verify
|
||||
|
||||
|
||||
def has_mfa(request, username):
|
||||
if UserKey.objects.filter(username=username, enabled=1).count() > 0:
|
||||
import pyotp
|
||||
from .models import *
|
||||
from . import TrustedDevice, U2F, FIDO2, totp
|
||||
import simplejson
|
||||
from django.shortcuts import HttpResponse
|
||||
from mfa.views import verify,goto
|
||||
def has_mfa(request,username):
|
||||
if User_Keys.objects.filter(username=username,enabled=1).count()>0:
|
||||
return verify(request, username)
|
||||
return False
|
||||
|
||||
|
||||
def is_mfa(request, ignore_methods=[]):
|
||||
if request.session.get("mfa", {}).get("verified", False):
|
||||
if not request.session.get("mfa", {}).get("method", None) in ignore_methods:
|
||||
def is_mfa(request,ignore_methods=[]):
|
||||
if request.session.get("mfa",{}).get("verified",False):
|
||||
if not request.session.get("mfa",{}).get("method",None) in ignore_methods:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def recheck(request):
|
||||
method = request.session.get("mfa", {}).get("method", None)
|
||||
method=request.session.get("mfa",{}).get("method",None)
|
||||
if not method:
|
||||
return JsonResponse({"res": False})
|
||||
if method == "Trusted Device":
|
||||
return JsonResponse({"res": TrustedDevice.verify(request)})
|
||||
elif method == "U2F":
|
||||
return JsonResponse({"html": U2F.recheck(request).content})
|
||||
return HttpResponse(simplejson.dumps({"res":False}),content_type="application/json")
|
||||
if method=="Trusted Device":
|
||||
return HttpResponse(simplejson.dumps({"res":TrustedDevice.verify(request)}),content_type="application/json")
|
||||
elif method=="U2F":
|
||||
return HttpResponse(simplejson.dumps({"html": U2F.recheck(request).content}), content_type="application/json")
|
||||
elif method == "FIDO2":
|
||||
return JsonResponse({"html": FIDO2.recheck(request).content})
|
||||
elif method == "TOTP":
|
||||
return JsonResponse({"html": totp.recheck(request).content})
|
||||
return HttpResponse(simplejson.dumps({"html": FIDO2.recheck(request).content}), content_type="application/json")
|
||||
elif method=="TOTP":
|
||||
return HttpResponse(simplejson.dumps({"html": totp.recheck(request).content}), content_type="application/json")
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,20 +1,13 @@
|
||||
import time
|
||||
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponseRedirect
|
||||
from django.urls import reverse
|
||||
|
||||
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.conf import settings
|
||||
def process(request):
|
||||
next_check = request.session.get("mfa", {}).get("next_check", False)
|
||||
if not next_check:
|
||||
return None
|
||||
now = int(time.time())
|
||||
next_check=request.session.get('mfa',{}).get("next_check",False)
|
||||
if not next_check: return None
|
||||
now=int(time.time())
|
||||
if now >= next_check:
|
||||
method = request.session["mfa"]["method"]
|
||||
method=request.session["mfa"]["method"]
|
||||
path = request.META["PATH_INFO"]
|
||||
return HttpResponseRedirect(
|
||||
reverse(method + "_auth")
|
||||
+ "?next=%s" % (settings.BASE_URL + path).replace("//", "/")
|
||||
)
|
||||
return HttpResponseRedirect(reverse(method+"_auth")+"?next=%s"%(settings.BASE_URL + path).replace("//", "/"))
|
||||
return None
|
||||
@@ -6,24 +6,17 @@ from django.db import models, migrations
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = []
|
||||
dependencies = [
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="User_Keys",
|
||||
name='User_Keys',
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.AutoField(
|
||||
verbose_name="ID",
|
||||
serialize=False,
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
),
|
||||
),
|
||||
("username", models.CharField(max_length=50)),
|
||||
("secret_key", models.CharField(max_length=15)),
|
||||
("added_on", models.DateTimeField(auto_now_add=True)),
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('username', models.CharField(max_length=50)),
|
||||
('secret_key', models.CharField(max_length=15)),
|
||||
('added_on', models.DateTimeField(auto_now_add=True)),
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
@@ -7,13 +7,13 @@ from django.db import models, migrations
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("mfa", "0001_initial"),
|
||||
('mfa', '0001_initial'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name="user_keys",
|
||||
name="key_type",
|
||||
field=models.CharField(default=b"TOTP", max_length=25),
|
||||
model_name='user_keys',
|
||||
name='key_type',
|
||||
field=models.CharField(default=b'TOTP', max_length=25),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -7,13 +7,13 @@ from django.db import models, migrations
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("mfa", "0002_user_keys_key_type"),
|
||||
('mfa', '0002_user_keys_key_type'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name="user_keys",
|
||||
name="secret_key",
|
||||
model_name='user_keys',
|
||||
name='secret_key',
|
||||
field=models.CharField(max_length=32),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -7,13 +7,13 @@ from django.db import models, migrations
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("mfa", "0003_auto_20181114_2159"),
|
||||
('mfa', '0003_auto_20181114_2159'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name="user_keys",
|
||||
name="enabled",
|
||||
model_name='user_keys',
|
||||
name='enabled',
|
||||
field=models.BooleanField(default=True),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -7,25 +7,24 @@ import jsonfield.fields
|
||||
|
||||
def modify_json(apps, schema_editor):
|
||||
from django.conf import settings
|
||||
|
||||
if "mysql" in settings.DATABASES.get("default", {}).get("engine", ""):
|
||||
migrations.RunSQL("alter table mfa_user_keys modify column properties json;")
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
dependencies = [
|
||||
("mfa", "0004_user_keys_enabled"),
|
||||
('mfa', '0004_user_keys_enabled'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RemoveField(
|
||||
model_name="user_keys",
|
||||
name="secret_key",
|
||||
model_name='user_keys',
|
||||
name='secret_key',
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name="user_keys",
|
||||
name="properties",
|
||||
model_name='user_keys',
|
||||
name='properties',
|
||||
field=jsonfield.fields.JSONField(null=True),
|
||||
),
|
||||
migrations.RunPython(modify_json),
|
||||
migrations.RunPython(modify_json)
|
||||
]
|
||||
|
||||
@@ -7,29 +7,21 @@ from django.db import models, migrations
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("mfa", "0005_auto_20181115_2014"),
|
||||
('mfa', '0005_auto_20181115_2014'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="Trusted_Devices",
|
||||
name='Trusted_Devices',
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.AutoField(
|
||||
verbose_name="ID",
|
||||
serialize=False,
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
),
|
||||
),
|
||||
("signature", models.CharField(max_length=255)),
|
||||
("key", models.CharField(max_length=6)),
|
||||
("username", models.CharField(max_length=50)),
|
||||
("user_agent", models.CharField(max_length=255)),
|
||||
("status", models.CharField(default=b"adding", max_length=255)),
|
||||
("added_on", models.DateTimeField(auto_now_add=True)),
|
||||
("last_used", models.DateTimeField(default=None, null=True)),
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('signature', models.CharField(max_length=255)),
|
||||
('key', models.CharField(max_length=6)),
|
||||
('username', models.CharField(max_length=50)),
|
||||
('user_agent', models.CharField(max_length=255)),
|
||||
('status', models.CharField(default=b'adding', max_length=255)),
|
||||
('added_on', models.DateTimeField(auto_now_add=True)),
|
||||
('last_used', models.DateTimeField(default=None, null=True)),
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
@@ -7,16 +7,16 @@ from django.db import models, migrations
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("mfa", "0006_trusted_devices"),
|
||||
('mfa', '0006_trusted_devices'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.DeleteModel(
|
||||
name="Trusted_Devices",
|
||||
name='Trusted_Devices',
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name="user_keys",
|
||||
name="expires",
|
||||
model_name='user_keys',
|
||||
name='expires',
|
||||
field=models.DateTimeField(default=None, null=True, blank=True),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -7,13 +7,13 @@ from django.db import models, migrations
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("mfa", "0007_auto_20181230_1549"),
|
||||
('mfa', '0007_auto_20181230_1549'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name="user_keys",
|
||||
name="last_used",
|
||||
model_name='user_keys',
|
||||
name='last_used',
|
||||
field=models.DateTimeField(default=None, null=True, blank=True),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -6,23 +6,21 @@ from django.conf import settings
|
||||
|
||||
|
||||
def update_owned_by_enterprise(apps, schema_editor):
|
||||
user_keys = apps.get_model("mfa", "user_keys")
|
||||
user_keys.objects.filter(key_type="FIDO2").update(
|
||||
owned_by_enterprise=getattr(settings, "MFA_OWNED_BY_ENTERPRISE", False)
|
||||
)
|
||||
user_keys = apps.get_model('mfa', 'user_keys')
|
||||
user_keys.objects.filter(key_type='FIDO2').update(owned_by_enterprise=getattr(settings,"MFA_OWNED_BY_ENTERPRISE",False))
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("mfa", "0008_user_keys_last_used"),
|
||||
('mfa', '0008_user_keys_last_used'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name="user_keys",
|
||||
name="owned_by_enterprise",
|
||||
model_name='user_keys',
|
||||
name='owned_by_enterprise',
|
||||
field=models.NullBooleanField(default=None),
|
||||
),
|
||||
migrations.RunPython(update_owned_by_enterprise),
|
||||
migrations.RunPython(update_owned_by_enterprise)
|
||||
]
|
||||
|
||||
@@ -6,13 +6,13 @@ from django.db import migrations, models
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("mfa", "0009_user_keys_owned_by_enterprise"),
|
||||
('mfa', '0009_user_keys_owned_by_enterprise'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name="user_keys",
|
||||
name="key_type",
|
||||
field=models.CharField(default="TOTP", max_length=25),
|
||||
model_name='user_keys',
|
||||
name='key_type',
|
||||
field=models.CharField(default='TOTP', max_length=25),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -6,13 +6,13 @@ from django.db import migrations, models
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("mfa", "0010_auto_20201110_0557"),
|
||||
('mfa', '0010_auto_20201110_0557'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name="user_keys",
|
||||
name="owned_by_enterprise",
|
||||
model_name='user_keys',
|
||||
name='owned_by_enterprise',
|
||||
field=models.BooleanField(blank=True, default=None, null=True),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -1,17 +0,0 @@
|
||||
# Generated by Django 3.2.4 on 2021-06-23 07:10
|
||||
|
||||
from django.db import migrations
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("mfa", "0011_auto_20210530_0622"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RenameModel(
|
||||
old_name="User_Keys",
|
||||
new_name="UserKey",
|
||||
),
|
||||
]
|
||||
@@ -1,35 +0,0 @@
|
||||
# Generated by Django 2.2 on 2021-06-24 15:05
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("mfa", "0012_rename_user_keys_userkey"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="OTPTracker",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.AutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
("actor", models.CharField(max_length=50)),
|
||||
("value", models.CharField(max_length=6)),
|
||||
("success", models.BooleanField(blank=True)),
|
||||
("done_on", models.DateTimeField(auto_now=True)),
|
||||
],
|
||||
),
|
||||
migrations.AddIndex(
|
||||
model_name="otptracker",
|
||||
index=models.Index(fields=["actor"], name="mfa_otptrac_usernam_1f423f_idx"),
|
||||
),
|
||||
]
|
||||
@@ -1,45 +1,32 @@
|
||||
from django.conf import settings
|
||||
from django.db import models
|
||||
from jose import jwt
|
||||
from jsonfield import JSONField
|
||||
from jose import jwt
|
||||
from django.conf import settings
|
||||
#from jsonLookup import shasLookup, hasLookup
|
||||
# JSONField.register_lookup(shasLookup)
|
||||
# JSONField.register_lookup(hasLookup)
|
||||
|
||||
|
||||
class UserKey(models.Model):
|
||||
username = models.CharField(max_length=50)
|
||||
properties = JSONField(null=True)
|
||||
added_on = models.DateTimeField(auto_now_add=True)
|
||||
key_type = models.CharField(max_length=25, default="TOTP")
|
||||
enabled = models.BooleanField(default=True)
|
||||
expires = models.DateTimeField(null=True, default=None, blank=True)
|
||||
last_used = models.DateTimeField(null=True, default=None, blank=True)
|
||||
owned_by_enterprise = models.BooleanField(default=None, null=True, blank=True)
|
||||
class User_Keys(models.Model):
|
||||
username=models.CharField(max_length = 50)
|
||||
properties=JSONField(null = True)
|
||||
added_on=models.DateTimeField(auto_now_add = True)
|
||||
key_type=models.CharField(max_length = 25,default = "TOTP")
|
||||
enabled=models.BooleanField(default=True)
|
||||
expires=models.DateTimeField(null=True,default=None,blank=True)
|
||||
last_used=models.DateTimeField(null=True,default=None,blank=True)
|
||||
owned_by_enterprise=models.BooleanField(default=None,null=True,blank=True)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
if (
|
||||
self.key_type == "Trusted Device"
|
||||
and self.properties.get("signature", "") == ""
|
||||
):
|
||||
self.properties["signature"] = jwt.encode(
|
||||
{"username": self.username, "key": self.properties["key"]},
|
||||
settings.SECRET_KEY,
|
||||
)
|
||||
super().save(*args, **kwargs)
|
||||
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
|
||||
if self.key_type == "Trusted Device" and self.properties.get("signature","") == "":
|
||||
self.properties["signature"]= jwt.encode({"username": self.username, "key": self.properties["key"]}, settings.SECRET_KEY)
|
||||
super(User_Keys, self).save(force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields)
|
||||
|
||||
def __unicode__(self):
|
||||
return "%s -- %s"%(self.username,self.key_type)
|
||||
|
||||
def __str__(self):
|
||||
return "%s -- %s" % (self.username, self.key_type)
|
||||
return self.__unicode__()
|
||||
|
||||
class Meta:
|
||||
app_label = "mfa"
|
||||
|
||||
|
||||
class OTPTracker(models.Model):
|
||||
actor = models.CharField(
|
||||
max_length=50, help_text="Username"
|
||||
) # named this way for indexing purpose.
|
||||
value = models.CharField(max_length=6)
|
||||
success = models.BooleanField(blank=True)
|
||||
done_on = models.DateTimeField(auto_now=True)
|
||||
|
||||
class Meta:
|
||||
app_label = "mfa"
|
||||
indexes = [models.Index(fields=["actor"])]
|
||||
app_label='mfa'
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
{% extends "base.html" %}
|
||||
{% extends "mfa_base.html" %}
|
||||
{% load static %}
|
||||
{% block head %}
|
||||
{{block.super}}
|
||||
<script type="text/javascript">
|
||||
function confirmDel(id) {
|
||||
$.ajax({
|
||||
@@ -39,6 +40,7 @@
|
||||
<script src="{% static 'mfa/js/bootstrap-toggle.min.js'%}"></script>
|
||||
{% endblock %}
|
||||
{% block content %}
|
||||
{{block.super}}
|
||||
<br/>
|
||||
<br/>
|
||||
<div class="container">
|
||||
|
||||
@@ -28,7 +28,7 @@
|
||||
{% csrf_token %}
|
||||
{% if invalid %}
|
||||
<div class="alert alert-danger">
|
||||
{{ invalid_msg }}
|
||||
Sorry, The provided token is not valid.
|
||||
</div>
|
||||
{% endif %}
|
||||
{% if quota %}
|
||||
|
||||
7
mfa/templates/mfa_base.html
Normal file
7
mfa/templates/mfa_base.html
Normal file
@@ -0,0 +1,7 @@
|
||||
{% extends 'base.html' %}
|
||||
{% block head %}
|
||||
{{ block.super }}
|
||||
{% endblock %}
|
||||
{% block content %}
|
||||
{{ block.super }}
|
||||
{% endblock %}
|
||||
3
mfa/tests.py
Normal file
3
mfa/tests.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from django.test import TestCase
|
||||
|
||||
# Create your tests here.
|
||||
137
mfa/totp.py
137
mfa/totp.py
@@ -1,120 +1,77 @@
|
||||
import datetime
|
||||
import random
|
||||
import time
|
||||
|
||||
import pyotp
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse, JsonResponse
|
||||
from django.shortcuts import render
|
||||
from django.template.context_processors import csrf
|
||||
from django.utils import timezone
|
||||
from django.views.decorators.cache import never_cache
|
||||
|
||||
from django.http import HttpResponse
|
||||
from .Common import get_redirect_url
|
||||
from .models import UserKey, OTPTracker
|
||||
from .models import *
|
||||
from django.template.context_processors import csrf
|
||||
import simplejson
|
||||
from django.template.context import RequestContext
|
||||
from django.conf import settings
|
||||
import pyotp
|
||||
from .views import login
|
||||
|
||||
|
||||
def verify_login(request, username, token):
|
||||
FAILURE_LIMIT = getattr("settings", "MFA_TOTP_FAILURE_LIMIT", 3)
|
||||
start_time = timezone.now() + datetime.timedelta(
|
||||
minutes=-1 * getattr(settings, "MFA_TOTP_FAILURE_WINDOW", 5)
|
||||
)
|
||||
if (
|
||||
OTPTracker.objects.filter(
|
||||
done_on__gt=start_time, actor=username, success=0
|
||||
).count()
|
||||
>= FAILURE_LIMIT
|
||||
):
|
||||
return [
|
||||
False,
|
||||
"Using this method is temporarily suspended on your account, use another method, or later again later ",
|
||||
]
|
||||
for key in UserKey.objects.filter(username=username, key_type="TOTP"):
|
||||
import datetime
|
||||
from django.utils import timezone
|
||||
import random
|
||||
def verify_login(request,username,token):
|
||||
for key in User_Keys.objects.filter(username=username,key_type = "TOTP"):
|
||||
totp = pyotp.TOTP(key.properties["secret_key"])
|
||||
if totp.verify(token, valid_window=30):
|
||||
if OTPTracker.objects.filter(actor=username, value=token).exists():
|
||||
return [
|
||||
False,
|
||||
"This code is used before, please generate another token",
|
||||
]
|
||||
OTPTracker.objects.create(actor=username, value=token, success=True)
|
||||
key.last_used = timezone.now()
|
||||
if totp.verify(token,valid_window = 30):
|
||||
key.last_used=timezone.now()
|
||||
key.save()
|
||||
return [True, key.id]
|
||||
OTPTracker.objects.create(actor=username, value=token, success=False)
|
||||
return [False, "Invalid Token"]
|
||||
|
||||
return [True,key.id]
|
||||
return [False]
|
||||
|
||||
def recheck(request):
|
||||
context = csrf(request)
|
||||
context["mode"] = "recheck"
|
||||
context["mode"]="recheck"
|
||||
if request.method == "POST":
|
||||
if verify_login(request, request.user.username, token=request.POST["otp"]):
|
||||
if verify_login(request,request.user.username, token=request.POST["otp"]):
|
||||
import time
|
||||
request.session["mfa"]["rechecked_at"] = time.time()
|
||||
return JsonResponse({"recheck": True})
|
||||
return HttpResponse(simplejson.dumps({"recheck": True}), content_type="application/json")
|
||||
else:
|
||||
return JsonResponse({"recheck": False})
|
||||
return render(request, "TOTP/recheck.html", context)
|
||||
|
||||
return HttpResponse(simplejson.dumps({"recheck": False}), content_type="application/json")
|
||||
return render(request,"TOTP/recheck.html", context)
|
||||
|
||||
@never_cache
|
||||
def auth(request):
|
||||
context = csrf(request)
|
||||
if request.method == "POST":
|
||||
res = verify_login(
|
||||
request, request.session["base_username"], token=request.POST["otp"]
|
||||
)
|
||||
context=csrf(request)
|
||||
if request.method=="POST":
|
||||
res=verify_login(request,request.session["base_username"],token = request.POST["otp"])
|
||||
if res[0]:
|
||||
mfa = {"verified": True, "method": "TOTP", "id": res[1]}
|
||||
mfa = {"verified": True, "method": "TOTP","id":res[1]}
|
||||
if getattr(settings, "MFA_RECHECK", False):
|
||||
mfa["next_check"] = datetime.datetime.timestamp(
|
||||
(
|
||||
datetime.datetime.now()
|
||||
+ datetime.timedelta(
|
||||
seconds=random.randint(
|
||||
settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
mfa["next_check"] = datetime.datetime.timestamp((datetime.datetime.now()
|
||||
+ datetime.timedelta(
|
||||
seconds=random.randint(settings.MFA_RECHECK_MIN, settings.MFA_RECHECK_MAX))))
|
||||
request.session["mfa"] = mfa
|
||||
return login(request)
|
||||
context["invalid"] = True
|
||||
context["invalid_msg"] = res[1]
|
||||
return render(request, "TOTP/Auth.html", context)
|
||||
context["invalid"]=True
|
||||
return render(request,"TOTP/Auth.html", context)
|
||||
|
||||
|
||||
def get_token(request):
|
||||
secret_key = pyotp.random_base32()
|
||||
|
||||
def getToken(request):
|
||||
secret_key=pyotp.random_base32()
|
||||
totp = pyotp.TOTP(secret_key)
|
||||
request.session["new_mfa_answer"] = totp.now()
|
||||
return JsonResponse(
|
||||
{
|
||||
"qr": pyotp.totp.TOTP(secret_key).provisioning_uri(
|
||||
str(request.user.username), issuer_name=settings.TOKEN_ISSUER_NAME
|
||||
),
|
||||
"secret_key": secret_key,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
request.session["new_mfa_answer"]=totp.now()
|
||||
return HttpResponse(simplejson.dumps({"qr":pyotp.totp.TOTP(secret_key).provisioning_uri(str(request.user.username), issuer_name = settings.TOKEN_ISSUER_NAME),
|
||||
"secret_key": secret_key}))
|
||||
def verify(request):
|
||||
answer = request.GET["answer"]
|
||||
secret_key = request.GET["key"]
|
||||
answer=request.GET["answer"]
|
||||
secret_key=request.GET["key"]
|
||||
totp = pyotp.TOTP(secret_key)
|
||||
if totp.verify(answer, valid_window=60):
|
||||
uk = UserKey()
|
||||
uk.username = request.user.username
|
||||
uk.properties = {"secret_key": secret_key}
|
||||
uk.key_type = "TOTP"
|
||||
if totp.verify(answer,valid_window = 60):
|
||||
uk=User_Keys()
|
||||
uk.username=request.user.username
|
||||
uk.properties={"secret_key":secret_key}
|
||||
#uk.name="Authenticatior #%s"%User_Keys.objects.filter(username=user.username,type="TOTP")
|
||||
uk.key_type="TOTP"
|
||||
uk.save()
|
||||
return HttpResponse("Success")
|
||||
else:
|
||||
return HttpResponse("Error")
|
||||
|
||||
else: return HttpResponse("Error")
|
||||
|
||||
@never_cache
|
||||
def start(request):
|
||||
"""Start Adding Time One Time Password (TOTP)"""
|
||||
return render(request, "TOTP/Add.html", get_redirect_url())
|
||||
return render(request,"TOTP/Add.html",get_redirect_url())
|
||||
|
||||
87
mfa/urls.py
87
mfa/urls.py
@@ -1,41 +1,50 @@
|
||||
from django.urls import path
|
||||
|
||||
from . import FIDO2, U2F, Email, TrustedDevice, helpers, totp, views
|
||||
from . import views,totp,U2F,TrustedDevice,helpers,FIDO2,Email
|
||||
#app_name='mfa'
|
||||
|
||||
try:
|
||||
from django.urls import re_path as url
|
||||
except:
|
||||
from django.conf.urls import url
|
||||
urlpatterns = [
|
||||
path("totp/start/", totp.start, name="start_new_otop"),
|
||||
path("totp/getToken/", totp.get_token, name="get_new_otop"),
|
||||
path("totp/verify/", totp.verify, name="verify_otop"),
|
||||
path("totp/auth/", totp.auth, name="totp_auth"),
|
||||
path("totp/recheck/", totp.recheck, name="totp_recheck"),
|
||||
path("email/start/", Email.start, name="start_email"),
|
||||
path("email/auth/", Email.auth, name="email_auth"),
|
||||
path("u2f/", U2F.start, name="start_u2f"),
|
||||
path("u2f/bind/", U2F.bind, name="bind_u2f"),
|
||||
path("u2f/auth/", U2F.auth, name="u2f_auth"),
|
||||
path("u2f/process_recheck/", U2F.process_recheck, name="u2f_recheck"),
|
||||
path("u2f/verify/", U2F.verify, name="u2f_verify"),
|
||||
path("fido2/", FIDO2.start, name="start_fido2"),
|
||||
path("fido2/auth/", FIDO2.auth, name="fido2_auth"),
|
||||
path("fido2/begin_auth/", FIDO2.authenticate_begin, name="fido2_begin_auth"),
|
||||
path(
|
||||
"fido2/complete_auth/", FIDO2.authenticate_complete, name="fido2_complete_auth"
|
||||
),
|
||||
path("fido2/begin_reg/", FIDO2.begin_registeration, name="fido2_begin_reg"),
|
||||
path("fido2/complete_reg/", FIDO2.complete_reg, name="fido2_complete_reg"),
|
||||
path("fido2/recheck/", FIDO2.recheck, name="fido2_recheck"),
|
||||
path("td/", TrustedDevice.start, name="start_td"),
|
||||
path("td/add/", TrustedDevice.add, name="add_td"),
|
||||
path("td/send_link/", TrustedDevice.send_email, name="td_sendemail"),
|
||||
path("td/get-ua/", TrustedDevice.get_user_agent, name="td_get_useragent"),
|
||||
path("td/trust/", TrustedDevice.trust_device, name="td_trust_device"),
|
||||
path("u2f/checkTrusted/", TrustedDevice.check_trusted, name="td_checkTrusted"),
|
||||
path("u2f/secure_device", TrustedDevice.get_cookie, name="td_securedevice"),
|
||||
path("", views.index, name="mfa_home"),
|
||||
path("goto/<method>/", views.goto, name="mfa_goto"),
|
||||
path("selct_method/", views.show_methods, name="mfa_methods_list"),
|
||||
path("recheck/", helpers.recheck, name="mfa_recheck"),
|
||||
path("toggleKey/", views.toggle_key, name="toggle_key"),
|
||||
path("delete/", views.del_key, name="mfa_delKey"),
|
||||
path("reset/", views.reset_cookie, name="mfa_reset_cookie"),
|
||||
]
|
||||
url(r'totp/start/', totp.start , name="start_new_otop"),
|
||||
url(r'totp/getToken', totp.getToken , name="get_new_otop"),
|
||||
url(r'totp/verify', totp.verify, name="verify_otop"),
|
||||
url(r'totp/auth', totp.auth, name="totp_auth"),
|
||||
url(r'totp/recheck', totp.recheck, name="totp_recheck"),
|
||||
|
||||
url(r'email/start/', Email.start , name="start_email"),
|
||||
url(r'email/auth/', Email.auth , name="email_auth"),
|
||||
|
||||
url(r'u2f/$', U2F.start, name="start_u2f"),
|
||||
url(r'u2f/bind', U2F.bind, name="bind_u2f"),
|
||||
url(r'u2f/auth', U2F.auth, name="u2f_auth"),
|
||||
url(r'u2f/process_recheck', U2F.process_recheck, name="u2f_recheck"),
|
||||
url(r'u2f/verify', U2F.verify, name="u2f_verify"),
|
||||
|
||||
url(r'fido2/$', FIDO2.start, name="start_fido2"),
|
||||
url(r'fido2/auth', FIDO2.auth, name="fido2_auth"),
|
||||
url(r'fido2/begin_auth', FIDO2.authenticate_begin, name="fido2_begin_auth"),
|
||||
url(r'fido2/complete_auth', FIDO2.authenticate_complete, name="fido2_complete_auth"),
|
||||
url(r'fido2/begin_reg', FIDO2.begin_registeration, name="fido2_begin_reg"),
|
||||
url(r'fido2/complete_reg', FIDO2.complete_reg, name="fido2_complete_reg"),
|
||||
url(r'fido2/recheck', FIDO2.recheck, name="fido2_recheck"),
|
||||
|
||||
|
||||
url(r'td/$', TrustedDevice.start, name="start_td"),
|
||||
url(r'td/add', TrustedDevice.add, name="add_td"),
|
||||
url(r'td/send_link', TrustedDevice.send_email, name="td_sendemail"),
|
||||
url(r'td/get-ua', TrustedDevice.getUserAgent, name="td_get_useragent"),
|
||||
url(r'td/trust', TrustedDevice.trust_device, name="td_trust_device"),
|
||||
url(r'u2f/checkTrusted', TrustedDevice.checkTrusted, name="td_checkTrusted"),
|
||||
url(r'u2f/secure_device', TrustedDevice.getCookie, name="td_securedevice"),
|
||||
|
||||
url(r'^$', views.index, name="mfa_home"),
|
||||
url(r'goto/(.*)', views.goto, name="mfa_goto"),
|
||||
url(r'selct_method', views.show_methods, name="mfa_methods_list"),
|
||||
url(r'recheck', helpers.recheck, name="mfa_recheck"),
|
||||
url(r'toggleKey', views.toggleKey, name="toggle_key"),
|
||||
url(r'delete', views.delKey, name="mfa_delKey"),
|
||||
url(r'reset', views.reset_cookie, name="mfa_reset_cookie"),
|
||||
|
||||
]
|
||||
# print(urlpatterns)
|
||||
100
mfa/views.py
100
mfa/views.py
@@ -1,97 +1,90 @@
|
||||
import importlib
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.http import HttpResponse, HttpResponseRedirect
|
||||
from django.shortcuts import render
|
||||
from django.urls import reverse
|
||||
from user_agents import parse
|
||||
|
||||
from django.http import HttpResponse,HttpResponseRedirect
|
||||
from .models import *
|
||||
try:
|
||||
from django.urls import reverse
|
||||
except:
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.template.context_processors import csrf
|
||||
from django.template.context import RequestContext
|
||||
from django.conf import settings
|
||||
from . import TrustedDevice
|
||||
from .models import UserKey
|
||||
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from user_agents import parse
|
||||
|
||||
@login_required
|
||||
def index(request):
|
||||
keys = []
|
||||
context = {
|
||||
"keys": UserKey.objects.filter(username=request.user.username),
|
||||
"UNALLOWED_AUTHEN_METHODS": settings.MFA_UNALLOWED_METHODS,
|
||||
"HIDE_DISABLE": getattr(settings, "MFA_HIDE_DISABLE", []),
|
||||
}
|
||||
keys=[]
|
||||
context={"keys":User_Keys.objects.filter(username=request.user.username),"UNALLOWED_AUTHEN_METHODS":settings.MFA_UNALLOWED_METHODS
|
||||
,"HIDE_DISABLE":getattr(settings,"MFA_HIDE_DISABLE",[])}
|
||||
for k in context["keys"]:
|
||||
if k.key_type == "Trusted Device":
|
||||
setattr(k, "device", parse(k.properties.get("user_agent", "-----")))
|
||||
if k.key_type =="Trusted Device" :
|
||||
setattr(k,"device",parse(k.properties.get("user_agent","-----")))
|
||||
elif k.key_type == "FIDO2":
|
||||
setattr(k, "device", k.properties.get("type", "----"))
|
||||
setattr(k,"device",k.properties.get("type","----"))
|
||||
keys.append(k)
|
||||
context["keys"] = keys
|
||||
return render(request, "MFA.html", context)
|
||||
context["keys"]=keys
|
||||
return render(request,"MFA.html",context)
|
||||
|
||||
|
||||
def verify(request, username):
|
||||
def verify(request,username):
|
||||
request.session["base_username"] = username
|
||||
keys = UserKey.objects.filter(username=username, enabled=1)
|
||||
methods = list(set([k.key_type for k in keys]))
|
||||
#request.session["base_password"] = password
|
||||
keys=User_Keys.objects.filter(username=username,enabled=1)
|
||||
methods=list(set([k.key_type for k in keys]))
|
||||
|
||||
if "Trusted Device" in methods and not request.session.get(
|
||||
"checked_trusted_device", False
|
||||
):
|
||||
if "Trusted Device" in methods and not request.session.get("checked_trusted_device",False):
|
||||
if TrustedDevice.verify(request):
|
||||
return login(request)
|
||||
methods.remove("Trusted Device")
|
||||
request.session["mfa_methods"] = methods
|
||||
if len(methods) == 1:
|
||||
return HttpResponseRedirect(reverse(methods[0].lower() + "_auth"))
|
||||
if len(methods)==1:
|
||||
return HttpResponseRedirect(reverse(methods[0].lower()+"_auth"))
|
||||
return show_methods(request)
|
||||
|
||||
|
||||
def show_methods(request):
|
||||
return render(request, "select_mfa_method.html", {})
|
||||
|
||||
return render(request,"select_mfa_method.html", {})
|
||||
|
||||
def reset_cookie(request):
|
||||
response = HttpResponseRedirect(settings.LOGIN_URL)
|
||||
response=HttpResponseRedirect(settings.LOGIN_URL)
|
||||
response.delete_cookie("base_username")
|
||||
return response
|
||||
|
||||
|
||||
def login(request):
|
||||
from django.contrib import auth
|
||||
from django.conf import settings
|
||||
callable_func = __get_callable_function__(settings.MFA_LOGIN_CALLBACK)
|
||||
return callable_func(request, username=request.session["base_username"])
|
||||
return callable_func(request,username=request.session["base_username"])
|
||||
|
||||
|
||||
@login_required
|
||||
def del_key(request):
|
||||
key = UserKey.objects.get(id=request.GET["id"])
|
||||
def delKey(request):
|
||||
key=User_Keys.objects.get(id=request.GET["id"])
|
||||
if key.username == request.user.username:
|
||||
key.delete()
|
||||
return HttpResponse("Deleted Successfully")
|
||||
else:
|
||||
return HttpResponse("Error: You own this token so you can't delete it")
|
||||
|
||||
|
||||
def __get_callable_function__(func_path):
|
||||
if "." not in func_path:
|
||||
import importlib
|
||||
if not '.' in func_path:
|
||||
raise Exception("class Name should include modulename.classname")
|
||||
|
||||
parsed_str = func_path.split(".")
|
||||
module_name, func_name = ".".join(parsed_str[:-1]), parsed_str[-1]
|
||||
module_name , func_name = ".".join(parsed_str[:-1]) , parsed_str[-1]
|
||||
imported_module = importlib.import_module(module_name)
|
||||
callable_func = getattr(imported_module, func_name)
|
||||
callable_func = getattr(imported_module,func_name)
|
||||
if not callable_func:
|
||||
raise Exception("Module does not have requested function")
|
||||
return callable_func
|
||||
|
||||
|
||||
@login_required
|
||||
def toggle_key(request):
|
||||
id = request.GET["id"]
|
||||
q = UserKey.objects.filter(username=request.user.username, id=id)
|
||||
if q.count() == 1:
|
||||
key = q[0]
|
||||
if key.key_type not in settings.MFA_HIDE_DISABLE:
|
||||
key.enabled = not key.enabled
|
||||
def toggleKey(request):
|
||||
id=request.GET["id"]
|
||||
q=User_Keys.objects.filter(username=request.user.username, id=id)
|
||||
if q.count()==1:
|
||||
key=q[0]
|
||||
if not key.key_type in settings.MFA_HIDE_DISABLE:
|
||||
key.enabled=not key.enabled
|
||||
key.save()
|
||||
return HttpResponse("OK")
|
||||
else:
|
||||
@@ -99,6 +92,5 @@ def toggle_key(request):
|
||||
else:
|
||||
return HttpResponse("Error")
|
||||
|
||||
|
||||
def goto(request, method):
|
||||
return HttpResponseRedirect(reverse(method.lower() + "_auth"))
|
||||
def goto(request,method):
|
||||
return HttpResponseRedirect(reverse(method.lower()+"_auth"))
|
||||
|
||||
43
setup.py
43
setup.py
@@ -3,32 +3,33 @@
|
||||
from setuptools import find_packages, setup
|
||||
|
||||
setup(
|
||||
name="django-mfa2",
|
||||
version="2.2.0",
|
||||
description="Allows user to add 2FA to their accounts",
|
||||
name='django-mfa2',
|
||||
version='2.3.0',
|
||||
description='Allows user to add 2FA to their accounts',
|
||||
long_description=open("README.md").read(),
|
||||
long_description_content_type="text/markdown",
|
||||
author="Mohamed El-Kalioby",
|
||||
author_email="mkalioby@mkalioby.com",
|
||||
url="https://github.com/mkalioby/django-mfa2/",
|
||||
download_url="https://github.com/mkalioby/django-mfa2/",
|
||||
license="MIT",
|
||||
|
||||
author='Mohamed El-Kalioby',
|
||||
author_email = 'mkalioby@mkalioby.com',
|
||||
url = 'https://github.com/mkalioby/django-mfa2/',
|
||||
download_url='https://github.com/mkalioby/django-mfa2/',
|
||||
license='MIT',
|
||||
packages=find_packages(),
|
||||
install_requires=[
|
||||
"django >= 2.0",
|
||||
"jsonfield",
|
||||
"simplejson",
|
||||
"pyotp",
|
||||
"python-u2flib-server",
|
||||
"ua-parser",
|
||||
"user-agents",
|
||||
"python-jose",
|
||||
"fido2 == 0.9.1",
|
||||
"jsonLookup",
|
||||
],
|
||||
'django >= 2.0',
|
||||
'jsonfield',
|
||||
'simplejson',
|
||||
'pyotp',
|
||||
'python-u2flib-server',
|
||||
'ua-parser',
|
||||
'user-agents',
|
||||
'python-jose',
|
||||
'fido2 == 0.9.1',
|
||||
'jsonLookup'
|
||||
],
|
||||
python_requires=">=3.5",
|
||||
include_package_data=True,
|
||||
zip_safe=False, # because we're including static files
|
||||
zip_safe=False, # because we're including static files
|
||||
classifiers=[
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"Environment :: Web Environment",
|
||||
@@ -47,5 +48,5 @@ setup(
|
||||
"Programming Language :: Python :: 3.7",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Topic :: Software Development :: Libraries :: Python Modules",
|
||||
],
|
||||
]
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user