JS Tower
Node.jsМодуль 3: Файловая система

Потоки (Streams)

Readable, Writable, Transform, pipe

Цель урока

В этом уроке ты научишься:

  • Понимать типы потоков
  • Использовать pipe для передачи данных
  • Создавать свои потоки

Зачем нужны потоки

Потоки позволяют обрабатывать данные по частям, не загружая всё в память.

const fs = require('fs');

// Плохо: загружает весь файл в память
const data = fs.readFileSync('huge-file.txt');

// Хорошо: читает по частям
const stream = fs.createReadStream('huge-file.txt');
stream.on('data', (chunk) => {
  console.log('Получен кусок:', chunk.length, 'байт');
});

Типы потоков

ТипОписаниеПример
ReadableЧтение данныхfs.createReadStream
WritableЗапись данныхfs.createWriteStream
DuplexЧтение и записьnet.Socket
TransformПреобразование данныхzlib.createGzip

Readable Stream

Создание

const fs = require('fs');

const readable = fs.createReadStream('file.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024  // Размер буфера (64KB)
});

События

readable.on('data', (chunk) => {
  console.log('Данные:', chunk);
});

readable.on('end', () => {
  console.log('Чтение завершено');
});

readable.on('error', (err) => {
  console.error('Ошибка:', err.message);
});

Режимы чтения

// Flowing mode (автоматическое чтение)
readable.on('data', (chunk) => {});

// Paused mode (ручное чтение)
readable.on('readable', () => {
  let chunk;
  while ((chunk = readable.read()) !== null) {
    console.log(chunk);
  }
});

// Управление потоком
readable.pause();
readable.resume();

Writable Stream

Создание

const fs = require('fs');

const writable = fs.createWriteStream('output.txt', {
  encoding: 'utf8'
});

Запись

writable.write('Первая строка\n');
writable.write('Вторая строка\n');
writable.end('Последняя строка');  // Завершает поток

writable.on('finish', () => {
  console.log('Запись завершена');
});

writable.on('error', (err) => {
  console.error('Ошибка:', err.message);
});

Backpressure

const writable = fs.createWriteStream('output.txt');

function write(data, callback) {
  // write() возвращает false если буфер переполнен
  if (!writable.write(data)) {
    // Ждём освобождения буфера
    writable.once('drain', callback);
  } else {
    process.nextTick(callback);
  }
}

pipe

Соединяет потоки:

const fs = require('fs');

const readable = fs.createReadStream('input.txt');
const writable = fs.createWriteStream('output.txt');

// Копирование файла
readable.pipe(writable);

// С обработкой завершения
readable
  .pipe(writable)
  .on('finish', () => console.log('Копирование завершено'));

Цепочка pipe

const fs = require('fs');
const zlib = require('zlib');

// Сжатие файла
fs.createReadStream('file.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('file.txt.gz'));

// Распаковка
fs.createReadStream('file.txt.gz')
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('file.txt'));

Transform Stream

Преобразует данные на лету:

const { Transform } = require('stream');

const upperCase = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin
  .pipe(upperCase)
  .pipe(process.stdout);

Практический пример

const { Transform } = require('stream');
const fs = require('fs');

// Нумерация строк
const lineNumberer = new Transform({
  transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');
    const numbered = lines
      .map((line, i) => `${this.lineNum++}: ${line}`)
      .join('\n');
    this.push(numbered);
    callback();
  }
});
lineNumberer.lineNum = 1;

fs.createReadStream('input.txt')
  .pipe(lineNumberer)
  .pipe(fs.createWriteStream('numbered.txt'));

pipeline (рекомендуется)

const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');

async function compress(input, output) {
  await pipeline(
    fs.createReadStream(input),
    zlib.createGzip(),
    fs.createWriteStream(output)
  );
  console.log('Сжатие завершено');
}

compress('file.txt', 'file.txt.gz');

Преимущество pipeline

pipeline автоматически обрабатывает ошибки и закрывает потоки.


Практика

Задание 1: Копирование файла

Задача: Скопируй большой файл используя потоки.

Решение:

const fs = require('fs');
const { pipeline } = require('stream/promises');

async function copyFile(src, dest) {
  await pipeline(
    fs.createReadStream(src),
    fs.createWriteStream(dest)
  );
}

Задание 2: Transform

Задача: Создай поток, который заменяет слово.

Решение:

const { Transform } = require('stream');

function createReplacer(search, replace) {
  return new Transform({
    transform(chunk, encoding, callback) {
      const result = chunk.toString().replaceAll(search, replace);
      this.push(result);
      callback();
    }
  });
}

Проверь себя

  1. Когда использовать потоки вместо readFile?
  2. Что такое backpressure?
  3. Чем pipeline лучше pipe?