C.W.K.
Stream
Lesson 06 of 06 · published

전부 합치기 — 풀 모듈

~15 min · full-module, drop-in, fastapi

Level 0Greenhorn
0 XP0/53 lessons0/14 achievements
0/100 XP to next level100 XP to go0% complete

모든 조각 한 곳에, FastAPI / Starlette 앱에 drop-in 가능. 약 100줄, bcrypt 외 외부 의존성 없음.

빠진 거 (다음 트랙에서)

  • Brute force 방어 — lockout/blacklist 사이클, exponential backoff. Track 6.
  • Killswitch UI — 어떤 디바이스에서든 누를 admin "Revoke All" 버튼. Track 7.
  • 가시성 — session, blacklist, 최근 실패 리스트하는 dashboard. Track 8.

Code

pin_auth.py — 솔로 앱용 self-contained PIN 인증·python
# pin_auth.py — 솔로 앱용 self-contained PIN 인증
import asyncio, secrets, sqlite3, time
import bcrypt
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import RedirectResponse, Response

# --- Config ---
DB_PATH         = "security.db"
SESSION_SECONDS = 30 * 24 * 3600
MAX_RETRY       = 5
PUBLIC_PREFIXES = ("/login", "/static", "/health")
LOCAL_BYPASS    = {"127.0.0.1", "::1"}
TRUSTED_PROXIES = {"127.0.0.1", "::1"}

db = sqlite3.connect(DB_PATH, check_same_thread=False)
db.executescript(open("schema.sql").read())   # 아키텍처 레슨의 스키마

# --- Helpers ---
def now() -> int: return int(time.time())

def client_ip(request) -> str:
    direct = request.client.host
    if direct in TRUSTED_PROXIES:
        xff = request.headers.get("x-forwarded-for", "")
        if xff: return xff.split(",")[0].strip()
    return direct

def hash_pin(pin: str) -> bytes:
    return bcrypt.hashpw(pin.encode(), bcrypt.gensalt(rounds=12))

def verify_pin(pin: str, stored: bytes) -> bool:
    try: return bcrypt.checkpw(pin.encode(), stored)
    except ValueError: return False

def is_blacklisted(ip: str) -> bool:
    return db.execute(
      "SELECT 1 FROM security_blacklist WHERE ip = ?", (ip,)
    ).fetchone() is not None

def attempt_count(ip: str) -> int:
    row = db.execute(
      "SELECT fail_count FROM security_attempts WHERE ip = ?", (ip,)
    ).fetchone()
    return row[0] if row else 0

def increment_attempt(ip: str):
    db.execute("""
      INSERT INTO security_attempts(ip, fail_count, updated_at)
      VALUES (?, 1, ?)
      ON CONFLICT(ip) DO UPDATE SET
        fail_count = fail_count + 1,
        updated_at = excluded.updated_at
    """, (ip, now()))
    db.commit()

def clear_attempts(ip: str):
    db.execute("DELETE FROM security_attempts WHERE ip = ?", (ip,))
    db.commit()

def blacklist(ip: str, reason: str):
    db.execute("""
      INSERT OR REPLACE INTO security_blacklist(ip, reason, blocked_at)
      VALUES (?, ?, ?)
    """, (ip, reason, now()))
    db.commit()

def create_session(ip: str) -> str:
    token = secrets.token_urlsafe(32)
    db.execute("""
      INSERT INTO security_sessions(token, ip, created_at, expires_at)
      VALUES (?, ?, ?, ?)
    """, (token, ip, now(), now() + SESSION_SECONDS))
    db.commit()
    return token

def session_valid(token: str, ip: str) -> bool:
    row = db.execute(
      "SELECT ip, expires_at FROM security_sessions WHERE token = ?", (token,)
    ).fetchone()
    if not row: return False
    return row[1] >= now() and row[0] == ip

# --- Middleware ---
class PinAuthMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        path = request.url.path
        ip   = client_ip(request)
        if any(path.startswith(p) for p in PUBLIC_PREFIXES):
            return await call_next(request)
        if ip in LOCAL_BYPASS:
            return await call_next(request)
        if is_blacklisted(ip):
            return Response("Forbidden", status_code=403)
        token = request.cookies.get("session")
        if token and session_valid(token, ip):
            return await call_next(request)
        if request.headers.get("accept", "").startswith("text/html"):
            return RedirectResponse("/login", status_code=302)
        return Response("Unauthorized", status_code=401)
main.py 에서 wire·python
from fastapi import FastAPI
from pin_auth import PinAuthMiddleware

app = FastAPI()
app.add_middleware(PinAuthMiddleware)

@app.get("/")
async def home():
    return {"hello": "authed user"}

External links

Exercise

전체 pin_auth.py 를 새 FastAPI 앱에 drop, /health (public) 와 / (보호) 추가. End-to-end 테스트: GET /health (200), GET / (/login 으로 302), 맞는 PIN 으로 POST /login (쿠키 set, / 로 redirect), GET / 다시 (200), POST /admin/security/revoke-all (session clear), GET / (/login 으로 302). 일주일짜리 디버깅 사가가 이제 네가 컨트롤하는 100줄 파일.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.