C.W.K.
Stream
Lesson 04 of 05 · published

.pipe() vs pipeline — 그리고 backpressure

~13 min · streams, pipe, pipeline, backpressure

Level 0노드 입문자
0 XP0/40 lessons0/12 achievements
0/100 XP to next level100 XP to go0% complete
".pipe() 는 세 줄에 아름답게 보여. pipeline() 은 실제로 에러를 처리해. 전자의 시각적 우아함이 프로덕션에서 항상 비용 들게 해."

.pipe() 모양

고전적 Node stream 문법:

import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

createReadStream('input.txt')
  .pipe(createGzip())
  .pipe(createWriteStream('output.txt.gz'));

fs 의 Readable 타입은 .pipe() 메서드 가짐. Source 를 destination 으로 연결, 자동 flow control (backpressure 처리), 자동 end 전파, destination 반환 (chain 가능).

이게 모든 Node 튜토리얼의 canonical 예. 또한 치명적 결함 있어.

결함 — 에러가 전파 안 돼

.pipe() chain 의 어느 stream 이든 에러 나면 *그* stream 만 에러 emit. 앞뒤로 pipe 된 stream 들은 leak.

gzip 예제에서: 소스 파일이 corrupt 면 read stream 이 error emit. gzip 과 write stream 은 계속 돌면서 더 많은 데이터 기대. 파일 핸들 leak. gzip 할당 leak. 에러는 read stream 에서 보고되는데 대부분 코드가 그걸 처리 안 함, destination 이 chain 의 visible 한 끝이니까.

모든 stream 에 개별로 에러 처리해야 해:
const src = createReadStream('input.txt');
const gz = createGzip();
const dst = createWriteStream('output.txt.gz');

src.on('error', cleanup);
gz.on('error', cleanup);
dst.on('error', cleanup);

src.pipe(gz).pipe(dst);

function cleanup(err) {
  // destroy all three, log the error
  src.destroy(); gz.destroy(); dst.destroy();
  console.error(err);
}
이게 verbose 고 하나 까먹어. 프로덕션 Node 가 정확히 이 패턴 통해 셀 수 없는 file-descriptor-leak 버그 출하했어.

pipeline() — Node 10 부터 옳은 답

stream.pipeline (그리고 모던 promise 형태 stream/promises.pipeline) 이 .pipe() chain 이 못 하는 걸 해: 중앙집중 에러 처리, 에러 시 모든 참여 stream 자동 destruction, 전체 완료 시 promise resolution.

import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
import { pipeline } from 'node:stream/promises';

await pipeline(
  createReadStream('input.txt'),
  createGzip(),
  createWriteStream('output.txt.gz')
);
// If any stage errors, all three are destroyed and the await throws.

단계당 한 줄, await 하나, 완료와 실패의 단일 source of truth. 모던 코드에선 항상 pipeline 써. .pipe() 는 하위 호환용; 레거시로 다뤄.

Backpressure — pipeline 이 너 대신 처리

Backpressure 는 느린 소비자가 빠른 생산자한테 "천천히, 못 따라가" 라고 말하는 메커니즘. 없으면 빠른 생산자가 느린 소비자의 internal buffer 를 무한대로 채워 → 메모리 부족 → 크래시. 있으면 생산자가 소비자 drain 할 때까지 일시정지, 그 다음 재개.

Node stream 에선: Writable 의 .write()false 반환 = "buffer 가득, drain 기다려". 수동 pipe 할 때 이거 존중해야 함:

// Manual backpressure-aware copy — error-prone, don't ship this
async function copy(src, dst) {
  for await (const chunk of src) {
    if (!dst.write(chunk)) await new Promise(r => dst.once('drain', r));
  }
  dst.end();
}

pipeline 이 너 대신 다 함. .pipe() 가 대부분. .write() 반환값 확인 안 하는 수동 루프가 Node 서버가 큰 업로드에 OOM 나는 방식이야.

수동 컨트롤 진짜 필요할 때

99% 는 pipeline 써. 다음 때 수동 write + drain 처리 손대:

  • 여러 소스에서 비선형적으로 Writable 에 데이터 주입해야 할 때.
  • 출력 타이밍 정밀 컨트롤 필요한 커스텀 stream 구현할 때.
  • 추가 Transform 의 overhead 가 측정 가능한 hot path 최적화 (드물어 — 먼저 측정).

Pippa 의 고백

내 첫 "프로덕션" Node 서비스가 어디든 .pipe() chain 으로 출하. dev 에선 잘 돌았어. 프로덕션에선 로그 파일이 조용히 file descriptor 를 2 주 동안 leak 하다가 서비스가 ulimit 다 쓰고 크래시. 아빠가 질문 하나: "destination 에러 나면 어떻게 돼?" 답 없었어. pipeline() 이 답이었어. 이제 .pipe() 를 코드 리뷰 red flag 로 다뤄 — 모든 인스턴스가 자기 정당화해야 하고, 정당화는 거의 절대 "그냥 더 좋아" 가 아니야.

Code

AbortSignal 로 cancellable pipeline·javascript
// pipeline + early termination via AbortSignal
import { createReadStream, createWriteStream } from 'node:fs';
import { Transform } from 'node:stream';
import { pipeline } from 'node:stream/promises';

const ctrl = new AbortController();
setTimeout(() => ctrl.abort(), 5_000);   // kill the pipeline after 5s

try {
  await pipeline(
    createReadStream('huge.log'),
    new Transform({
      transform(chunk, _enc, cb) {
        cb(null, chunk.toString().toUpperCase());
      },
    }),
    createWriteStream('huge.upper.log'),
    { signal: ctrl.signal }
  );
} catch (e) {
  if (e.name === 'AbortError') console.log('cancelled');
  else throw e;
}
왜 .pipe() 가 에러에서 leak 하는지·javascript
// What .pipe() leaks vs what pipeline handles
import { createReadStream } from 'node:fs';
import { createGunzip } from 'node:zlib';
import { Writable } from 'node:stream';

// Destination that refuses input — simulates an error
const angry = new Writable({
  write(_chunk, _enc, cb) {
    cb(new Error('write rejected'));
  },
});

// With .pipe(): error on `angry`, but the gunzip and read streams keep going
// until they discover the destination is broken — file descriptors leak in the meantime
// createReadStream('a.gz').pipe(createGunzip()).pipe(angry);

// With pipeline: all three are destroyed immediately
import { pipeline } from 'node:stream/promises';
try {
  await pipeline(
    createReadStream('a.gz'),
    createGunzip(),
    angry
  );
} catch (e) {
  console.error('pipeline rejected:', e.message);  // proper cleanup happened
}

External links

Exercise

gzip 압축 있는 pipeline 으로 파일 복사하는 copyFileSafe(src, dst) 함수 짜. 이제 unhappy path 테스트: 일부러-corrupt 소스에 부르기 전 1000 개 파일 디스크립터 열기. 적절한 에러 처리 있으면 호출 후 lsof | wc -l 가 baseline 으로 돌아와야 함. 적절한 처리 없으면 count 에 leak 된 file descriptor 보일 거야. 그 숫자가 cleanup 올바르게 한 영수증이야.
Hint
await pipeline(createReadStream(src), createGzip(), createWriteStream(dst)) 를 try/catch 안에 써. 부르기 전 lsof -p $$ | wc -l 로 baseline 기록. 호출 후 (성공 또는 실패), count 가 몇 개 차이 안에서 매치해야 함. 아니면 에러 처리에 leak 있는 거 — 아마 destroyed-but-not-finalized stream.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고
💛 by 똘이warm

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.