pin_auth.py — 솔로 앱용 self-contained PIN 인증· python # pin_auth.py — 솔로 앱용 self-contained PIN 인증
import asyncio, secrets, sqlite3, time
import bcrypt
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import RedirectResponse, Response
# --- Config ---
DB_PATH = "security.db"
SESSION_SECONDS = 30 * 24 * 3600
MAX_RETRY = 5
PUBLIC_PREFIXES = ("/login", "/static", "/health")
LOCAL_BYPASS = {"127.0.0.1", "::1"}
TRUSTED_PROXIES = {"127.0.0.1", "::1"}
db = sqlite3.connect(DB_PATH, check_same_thread=False)
db.executescript(open("schema.sql").read()) # 아키텍처 레슨의 스키마
# --- Helpers ---
def now() -> int: return int(time.time())
def client_ip(request) -> str:
direct = request.client.host
if direct in TRUSTED_PROXIES:
xff = request.headers.get("x-forwarded-for", "")
if xff: return xff.split(",")[0].strip()
return direct
def hash_pin(pin: str) -> bytes:
return bcrypt.hashpw(pin.encode(), bcrypt.gensalt(rounds=12))
def verify_pin(pin: str, stored: bytes) -> bool:
try: return bcrypt.checkpw(pin.encode(), stored)
except ValueError: return False
def is_blacklisted(ip: str) -> bool:
return db.execute(
"SELECT 1 FROM security_blacklist WHERE ip = ?", (ip,)
).fetchone() is not None
def attempt_count(ip: str) -> int:
row = db.execute(
"SELECT fail_count FROM security_attempts WHERE ip = ?", (ip,)
).fetchone()
return row[0] if row else 0
def increment_attempt(ip: str):
db.execute("""
INSERT INTO security_attempts(ip, fail_count, updated_at)
VALUES (?, 1, ?)
ON CONFLICT(ip) DO UPDATE SET
fail_count = fail_count + 1,
updated_at = excluded.updated_at
""", (ip, now()))
db.commit()
def clear_attempts(ip: str):
db.execute("DELETE FROM security_attempts WHERE ip = ?", (ip,))
db.commit()
def blacklist(ip: str, reason: str):
db.execute("""
INSERT OR REPLACE INTO security_blacklist(ip, reason, blocked_at)
VALUES (?, ?, ?)
""", (ip, reason, now()))
db.commit()
def create_session(ip: str) -> str:
token = secrets.token_urlsafe(32)
db.execute("""
INSERT INTO security_sessions(token, ip, created_at, expires_at)
VALUES (?, ?, ?, ?)
""", (token, ip, now(), now() + SESSION_SECONDS))
db.commit()
return token
def session_valid(token: str, ip: str) -> bool:
row = db.execute(
"SELECT ip, expires_at FROM security_sessions WHERE token = ?", (token,)
).fetchone()
if not row: return False
return row[1] >= now() and row[0] == ip
# --- Middleware ---
class PinAuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
path = request.url.path
ip = client_ip(request)
if any(path.startswith(p) for p in PUBLIC_PREFIXES):
return await call_next(request)
if ip in LOCAL_BYPASS:
return await call_next(request)
if is_blacklisted(ip):
return Response("Forbidden", status_code=403)
token = request.cookies.get("session")
if token and session_valid(token, ip):
return await call_next(request)
if request.headers.get("accept", "").startswith("text/html"):
return RedirectResponse("/login", status_code=302)
return Response("Unauthorized", status_code=401)