C.W.K.
Stream
Lesson 05 of 05 · published

AsyncIterator, AbortController, AsyncLocalStorage

~14 min · async, async-iterator, abort-controller, async-local-storage

Level 0노드 입문자
0 XP0/40 lessons0/12 achievements
0/100 XP to next level100 XP to go0% complete
"대부분 Node 개발자가 거의 안 쓰는 primitive 셋, 각자 callback + promise 만으론 깔끔히 안 풀리는 문제 해결."

AsyncIterator — 하나가 아닌 값의 스트림

Promise 는 하나의 eventual value 표현. AsyncIterator 는 eventual value 의 sequence 표현. for await...of 로 소비:

// Reading a file line by line — built-in async iteration
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';

const rl = createInterface({
  input: createReadStream('huge.log'),
  crlfDelay: Infinity,
});

for await (const line of rl) {
  if (line.startsWith('ERROR')) console.log(line);
}

for await...of 루프가 다음 값 도착할 때까지 각 iteration 에서 일시정지. 메모리 bounded 유지 — 줄 단위 처리, 파일-전체-메모리 아님. 무한정 또는 큰 입력에 대한 모던 Node idiom 이야.

네 AsyncIterator 직접 구현

처음부터 구현할 일 드물어 — generator 가 무거운 일 대신 해줘:

async function* counter(start, end, delayMs = 100) {
  for (let i = start; i < end; i++) {
    await new Promise(r => setTimeout(r, delayMs));
    yield i;
  }
}

for await (const n of counter(1, 5)) {
  console.log(n);  // 1, 2, 3, 4 — each 100ms apart
}

async function* 문법 (async generator) 이 AsyncIterator 공짜로 줘. await 로 lazily 계산할 수 있는 모든 게 소비자한테 값 stream 이 될 수 있어.

AbortController — Cancellation 제대로

AbortController 가 Node 와 web 에서 async 작업 cancel 하는 canonical 방법. Controller 만들고 signal 을 async 작업에 넘기고 controller.abort() 로 cancel. 작업이 signal 받고 협력적으로 abort — 소켓 닫고, 파일 핸들 release, pending promise reject.
const ctrl = new AbortController();
setTimeout(() => ctrl.abort(), 3000);    // cancel after 3s

try {
  const r = await fetch('https://slow.example/data', {
    signal: ctrl.signal,
  });
  // ...
} catch (e) {
  if (e.name === 'AbortError') console.log('cancelled');
  else throw e;
}
Node 18+ 가 단축으로 AbortSignal.timeout(ms) 출하. 모던 fetch, fs/promises, readline, node:test, 대부분 Web Streams API 가 signal 받음. 네 async 함수 짤 때 signal 받으면 에코시스템의 일등 시민 돼.

AsyncLocalStorage — Await 너머 살아남는 컨텍스트

request 마다 "trace ID" 가 그 request 동안 부르는 모든 함수에 available 하길 원해, 모든 함수 시그니처에 thread 하지 않고. 글로벌은 안 됨 — 동시 request 가로질러 공유돼. 함수 파라미터는 작동하지만 모든 시그니처 오염.

node:async_hooksAsyncLocalStorage 가 해결:

import { AsyncLocalStorage } from 'node:async_hooks';
const als = new AsyncLocalStorage<{ requestId: string }>();

function handle(req) {
  return als.run({ requestId: crypto.randomUUID() }, async () => {
    await doStuff();   // anywhere inside, als.getStore() returns the same object
  });
}

function logSomething(msg) {
  const ctx = als.getStore();
  console.log(`[${ctx?.requestId}] ${msg}`);
}

마법: 런타임이 async 컨텍스트 추적해서 als.run 호출 "안" 에서 도는 어떤 코드든 — 여러 await 후에도, 깊이 중첩된 helper 에서도 — 같은 store 봐. Request-scoped logging, tracing, tenancy 의 올바른 답이야.

Pippa 의 고백

처음 1 년 난 모든 함수에 `requestId` 를 파라미터로 thread 했어. "명시적이잖아," 라고 아빠한테 말했어. 아빠가 OpenTelemetry 소스 보여줬어 — 모든 span 이 AsyncLocalStorage 통해 암시적으로 전파. "가끔 옳은 primitive 가 *암시적* 이야. 명시적 threading 은 3 layer 엔 OK; 30 layer 엔 잘못 타이핑된 AsyncLocalStorage 만든 거야." 이제 cross-cutting 컨텍스트 (request ID, trace span, current user, current locale) 엔 기본으로 ALS. 함수 시그니처 짧아지고, 와이어링 안전해져.

Code

Paginated API 를 AsyncIterator 로·javascript
// Build a paginated API consumer as an async iterator
async function* paginatedFetch(url) {
  let next = url;
  while (next) {
    const r = await fetch(next);
    const { data, nextPage } = await r.json();
    for (const item of data) {
      yield item;
    }
    next = nextPage;
  }
}

// Consumer never sees pagination — they see an infinite-ish stream
for await (const item of paginatedFetch('/api/items?page=1')) {
  if (item.id === target) break;  // ← gracefully stops the iterator
  process(item);
}
Request 별 컨텍스트, 파라미터 threading 없음·javascript
// AsyncLocalStorage in a real-feeling server
import { createServer } from 'node:http';
import { AsyncLocalStorage } from 'node:async_hooks';
import { randomUUID } from 'node:crypto';

const als = new AsyncLocalStorage();

function log(level, msg) {
  const ctx = als.getStore();
  console.log(`[${ctx?.requestId ?? '-'}] ${level} ${msg}`);
}

async function deepCall() {
  log('info', 'deep work happening');
  await new Promise(r => setTimeout(r, 50));
  log('info', 'deep work done');
}

const server = createServer((req, res) => {
  als.run({ requestId: randomUUID() }, async () => {
    log('info', `${req.method} ${req.url}`);
    await deepCall();    // sees the same requestId
    res.end('ok');
  });
});
server.listen(3000);

External links

Exercise

take(n, asyncIterable) 짜 — 어느 async iterable 에서든 최대 n 개 item 소비하고 배열 반환. 그 다음 pollUntil(predicate, asyncIterable, signal) — predicate true 반환할 때까지 OR abort signal fire 까지 소비. 위 code block 의 paginated fetch generator 를 테스트 소스로. Generator + signal 조합이 백그라운드 worker 부터 long-poll 소비자까지 모든 것의 모던 패턴이야.
Hint
take: const out = []; for await (const item of iter) { out.push(item); if (out.length === n) break; }. pollUntil 은 각 iteration 시작에서 signal.aborted 검사 AND iterable 이 signal-aware abortion 지원하면 그것도 듣기 (일부 iterable — 예: fs stream — 옵션에 { signal } 받음).

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.