import hashlib import math import random import threading import time from datetime import datetime from django.contrib.auth.models import AbstractUser, User from django.core.handlers.wsgi import WSGIRequest from django.http import HttpResponse from tauth.models import tSession from tauth.settings import config _last_trim = 0 def _get_session_id(request: WSGIRequest, /, username: str) -> str: return hashlib.sha3_512(str.encode(f"{request.META['HTTP_USER_AGENT']}-{time.time()}-{random.random()}-{username}")).hexdigest() def _trim_sessions(): if time.time() < _last_trim + 60 * 60: return thread = threading.Thread(target=__th_trim_sessions) thread.start() def __th_trim_sessions(): global _last_trim _last_trim = time.time() tSession.objects.filter( last_use__lt=math.floor(time.time()) - config["session_timeout"] * 60 * 60 ).delete() if config["session_length"] != -1: tSession.objects.filter( created__lt=math.floor(time.time()) - config["session_length"] * 60 * 60 ).delete() def create_session(request: WSGIRequest, response: HttpResponse | None, /, user: User | AbstractUser) -> tSession: _trim_sessions() session = tSession.objects.create( u_for=user, session_id=_get_session_id(request, user.username), created=math.floor(time.time()), last_use=math.floor(time.time()) ) if response is not None: response.set_cookie("session_id", session.session_id, max_age=365 * 24 * 60 * 60 if config["session_length"] == -1 else (config["session_length"] * 60 * 60)) return session def clear_session(request: WSGIRequest, response: HttpResponse | None=None, /, delete_session: bool=True): _trim_sessions() session_id = request.COOKIES.get("session_id") if session_id is None or len(session_id) != 128: return if response is not None: response.set_cookie("session_id", "", max_age=0, expires=datetime(0, 0, 0)) try: session = tSession.objects.get(session_id=session_id) except tSession.DoesNotExist: ... if delete_session: session.delete() def get_user(request: WSGIRequest, /) -> User | None: _trim_sessions() session_id = request.COOKIES.get("session_id") if session_id is None or len(session_id) != 128: return None try: session = tSession.objects.get(session_id=session_id) except tSession.DoesNotExist: return None session.register_usage() return session.u_for def is_authenticated(request: WSGIRequest, /) -> bool: _trim_sessions() session_id = request.COOKIES.get("session_id") if session_id is None or len(session_id) != 128: return False try: tSession.objects.get(session_id=session_id).register_usage() except tSession.DoesNotExist: return False return True