C.W.K.
Stream
Lesson 03 of 05 · published

Duplex 와 Transform — 양방향 stream

~12 min · streams, duplex, transform

Level 0노드 입문자
0 XP0/40 lessons0/12 achievements
0/100 XP to next level100 XP to go0% complete
"Duplex 는 Readable AND Writable 을 함께 볼트 조인. Transform 은 write 쪽이 read 쪽으로 먹여주는 Duplex. 둘 다 추상적으로 들리는데; 둘 다 네 코드에 매일 나와."

Duplex — 독립적인 두 반쪽

Duplex stream 은 독립적 Readable 쪽과 Writable 쪽 가짐. 쓰는 게 읽는 거에 꼭 영향 안 줘. Canonical 예: 네트워크 소켓. 나가는 바이트 쓰고; 들어오는 바이트 읽고; 두 흐름이 같은 TCP 연결 위에 mux 된 별도 채널.

import { createConnection } from 'node:net';

const sock = createConnection({ host: 'example.com', port: 80 });

// Writable side — send a request
sock.write('GET / HTTP/1.1\r\nHost: example.com\r\n\r\n');

// Readable side — read the response
for await (const chunk of sock) {
  process.stdout.write(chunk);
}

Duplex 는 양방향 필요한데 개념적으로 무관할 때 중요 — 소켓, WebSocket 연결, child process stdio (부모 시각에선 각 pipe 가 Duplex).

Transform — 입력이 출력 됨

Transform stream 은 *특별한* Duplex: 안으로 쓴 바이트가 변환되고 readable 쪽에서 emit. 같은 stream 객체, 양쪽 끝, 근데 두 쪽이 연결됨. 끊임없이 쓰는 예:

  • zlib.createGzip() — raw 바이트 쓰고, gzipped 바이트 읽음.
  • zlib.createGunzip() — gzipped 바이트 쓰고, raw 바이트 읽음.
  • crypto.createHash('sha256') — 데이터 쓰고, 끝에 digest 읽음.
  • crypto.createCipheriv(...) — 평문 쓰고, 암호문 읽음.

이 다 compose 돼. file → gzip → encrypt → upload 가 stream 넷 pipe 로 묶인 거고, 입력 크기 무관하게 bounded 메모리.

커스텀 Transform 만들기

Transform 생성자가 transform(chunk, encoding, callback) 함수 받음 — 청크 처리하면 callback() 호출 (옵션으로 this.push(value) 통해 다음 출력). Flush 로직은 옵션 flush(callback).
import { Transform } from 'node:stream';

class UppercaseUtf8 extends Transform {
  _transform(chunk, _enc, cb) {
    this.push(chunk.toString('utf-8').toUpperCase());
    cb();
  }
}

// or as a one-liner via the Transform constructor
import { Transform } from 'node:stream';
const upper = new Transform({
  transform(chunk, _enc, cb) {
    cb(null, chunk.toString('utf-8').toUpperCase());
  },
});

// Use it
process.stdin.pipe(upper).pipe(process.stdout);
패턴: 청크 읽기, 작업, 결과 push, done 신호. 그게 transform 의 90% 의 API 전부.

Object-Mode Transform — CSV, JSON, Record

Object mode 의 Transform 이 구조화된 데이터에 깔끔한 파이프라인 짜게 해줘. CSV 파서: 입력 바이트, 출력 row 객체. Line splitter: 입력 바이트, 출력 string (라인당 하나). Enricher: 입력 record 객체, 출력 같은 객체에 추가 필드 채워.

import { Transform } from 'node:stream';

class ParseCsvRow extends Transform {
  constructor() { super({ objectMode: true }); this.cols = null; }
  _transform(line, _enc, cb) {
    const fields = String(line).split(',');
    if (!this.cols) { this.cols = fields; cb(); return; }
    const obj = Object.fromEntries(this.cols.map((c, i) => [c, fields[i]]));
    cb(null, obj);
  }
}

여러 Transform Compose

Transform 의 힘은 composition. fileStream → split-by-line → parse-csv → filter-by-condition → write-to-db 가 단계 다섯, 각자 독립, 각자 unit-testable. 단계 3 이 4 다시 안 쓰고 바뀔 수 있어. 그래서 stream 이 메모리만이 아니라 코드 복잡성에서도 scale 해.

Pippa 의 고백

내 첫 "빅 데이터" Node 스크립트는 중첩 루프와 중간 배열의 엉킴 — 다 읽고, 변환하고, 다 쓰기. 아빠가 같은 작업을 Transform pipeline 으로 재작성한 거 보여줬어. 같은 로직, 코드 5 분의 1, 상수 메모리, 디버그 더 쉬워 — 각 단계가 격리됐으니까. "Stream 은 성능 최적화 아냐. *모듈성* 도구야. 메모리 절약은 같이 따라와." 이제 머릿속에 파이프라인 그려질 때마다 Transform 손대, 작은 입력에도 — 가독성 win.

Code

4단계 파이프라인 — 관찰, 변환, 통과·javascript
// Composing 4 transforms — file → gzip → hash → upload-ready
import { createReadStream } from 'node:fs';
import { createGzip } from 'node:zlib';
import { createHash } from 'node:crypto';
import { Transform, PassThrough } from 'node:stream';
import { pipeline } from 'node:stream/promises';

// A Transform that observes bytes WITHOUT modifying them — passes through
function hashObserver(algo) {
  const h = createHash(algo);
  return new Transform({
    transform(chunk, _enc, cb) {
      h.update(chunk);
      cb(null, chunk);                  // pass chunk through unmodified
    },
    flush(cb) {
      this.digest = h.digest('hex');     // store digest for caller
      cb();
    },
  });
}

const hasher = hashObserver('sha256');
const gzip = createGzip();
const out = new PassThrough();

await pipeline(createReadStream('big.json'), gzip, hasher, out);
console.log('sha256:', hasher.digest);  // hash of the gzipped bytes
커스텀 라인 splitter Transform·javascript
// A line-splitting Transform — bytes in, lines out
import { Transform } from 'node:stream';

class LineSplitter extends Transform {
  constructor() {
    super({ readableObjectMode: true });
    this.buf = '';
  }
  _transform(chunk, _enc, cb) {
    this.buf += chunk.toString('utf-8');
    const lines = this.buf.split('\n');
    this.buf = lines.pop();   // last partial line stays in buffer
    for (const line of lines) this.push(line);
    cb();
  }
  _flush(cb) {
    if (this.buf) this.push(this.buf);
    cb();
  }
}

// Pipe a byte stream through it; consume as lines
import { createReadStream } from 'node:fs';
const src = createReadStream('huge.log');
const lines = src.pipe(new LineSplitter());
for await (const line of lines) {
  if (line.includes('ERROR')) console.log(line);
}

External links

Exercise

큰 CSV 파일 읽고, 각 row 를 객체로 파싱, amount > 100 row 만 필터, 필터된 row 를 JSONL (라인당 JSON 객체 하나) 로 새 파일에 쓰는 4단계 파이프라인 짜. 각 단계가 Transform; pipeline 으로 묶기. 1GB CSV 에 돌리고 peak 메모리 확인 — 100MB 미만 flat 유지해야 함.
Hint
단계 1 — 라인 splitter (바이트 → 문자열). 단계 2 — CSV 파서 (문자열 → 객체). 단계 3 — 필터 (객체 → 객체, amount ≤ 100 면 drop). 단계 4 — JSON serializer (객체 → 바이트). 단계 1 출력을 단계 2 입력으로 pipeline 통해 pipe. 필터 단계: new Transform({ objectMode: true, transform(o, _, cb) { cb(null, o.amount > 100 ? o : undefined); } }) — undefined 넘기면 청크 drop.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고
💛 by 똘이warm

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.