100 lines
2.8 KiB
Python
100 lines
2.8 KiB
Python
|
import hashlib
|
||
|
import math
|
||
|
import random
|
||
|
import threading
|
||
|
import time
|
||
|
from datetime import datetime
|
||
|
|
||
|
from django.contrib.auth.models import AbstractUser, User
|
||
|
from django.core.handlers.wsgi import WSGIRequest
|
||
|
from django.http import HttpResponse
|
||
|
|
||
|
from tauth.models import tSession
|
||
|
from tauth.settings import config
|
||
|
|
||
|
_last_trim = 0
|
||
|
|
||
|
def _get_session_id(request: WSGIRequest, /, username: str) -> str:
|
||
|
return hashlib.sha3_512(str.encode(f"{request.META['HTTP_USER_AGENT']}-{time.time()}-{random.random()}-{username}")).hexdigest()
|
||
|
|
||
|
def _trim_sessions():
|
||
|
if time.time() < _last_trim + 60 * 60:
|
||
|
return
|
||
|
|
||
|
thread = threading.Thread(target=__th_trim_sessions)
|
||
|
thread.start()
|
||
|
|
||
|
def __th_trim_sessions():
|
||
|
global _last_trim
|
||
|
_last_trim = time.time()
|
||
|
|
||
|
tSession.objects.filter(
|
||
|
last_use__lt=math.floor(time.time()) - config["session_timeout"] * 60 * 60
|
||
|
).delete()
|
||
|
|
||
|
if config["session_length"] != -1:
|
||
|
tSession.objects.filter(
|
||
|
created__lt=math.floor(time.time()) - config["session_length"] * 60 * 60
|
||
|
).delete()
|
||
|
|
||
|
def create_session(request: WSGIRequest, response: HttpResponse | None, /, user: User | AbstractUser) -> tSession:
|
||
|
_trim_sessions()
|
||
|
session = tSession.objects.create(
|
||
|
u_for=user,
|
||
|
session_id=_get_session_id(request, user.username),
|
||
|
created=math.floor(time.time()),
|
||
|
last_use=math.floor(time.time())
|
||
|
)
|
||
|
|
||
|
if response is not None:
|
||
|
response.set_cookie("session_id", session.session_id, max_age=365 * 24 * 60 * 60 if config["session_length"] == -1 else (config["session_length"] * 60 * 60))
|
||
|
|
||
|
return session
|
||
|
|
||
|
def clear_session(request: WSGIRequest, response: HttpResponse | None=None, /, delete_session: bool=True):
|
||
|
_trim_sessions()
|
||
|
session_id = request.COOKIES.get("session_id")
|
||
|
|
||
|
if session_id is None or len(session_id) != 128:
|
||
|
return
|
||
|
|
||
|
if response is not None:
|
||
|
response.set_cookie("session_id", "", max_age=0, expires=datetime(0, 0, 0))
|
||
|
|
||
|
try:
|
||
|
session = tSession.objects.get(session_id=session_id)
|
||
|
except tSession.DoesNotExist:
|
||
|
...
|
||
|
|
||
|
if delete_session:
|
||
|
session.delete()
|
||
|
|
||
|
def get_user(request: WSGIRequest, /) -> User | None:
|
||
|
_trim_sessions()
|
||
|
session_id = request.COOKIES.get("session_id")
|
||
|
|
||
|
if session_id is None or len(session_id) != 128:
|
||
|
return None
|
||
|
|
||
|
try:
|
||
|
session = tSession.objects.get(session_id=session_id)
|
||
|
except tSession.DoesNotExist:
|
||
|
return None
|
||
|
|
||
|
session.register_usage()
|
||
|
return session.u_for
|
||
|
|
||
|
def is_authenticated(request: WSGIRequest, /) -> bool:
|
||
|
_trim_sessions()
|
||
|
session_id = request.COOKIES.get("session_id")
|
||
|
|
||
|
if session_id is None or len(session_id) != 128:
|
||
|
return False
|
||
|
|
||
|
try:
|
||
|
tSession.objects.get(session_id=session_id).register_usage()
|
||
|
except tSession.DoesNotExist:
|
||
|
return False
|
||
|
|
||
|
return True
|