Reactive Programming

Why Reactive Programming Matters

Reactive programming treats data as streams of events over time, enabling declarative composition of asynchronous operations with automatic error handling, backpressure management, and resource cleanup for complex event-driven systems.

Core Benefits:

  • Declarative composition: Chain operators to transform event streams
  • Automatic resource cleanup: Unsubscribe cleans up listeners and timers
  • Backpressure handling: Manage fast producers with slow consumers
  • Error propagation: Errors flow through operator chain automatically
  • Time-based operations: Debounce, throttle, delay built-in
  • Unified API: Same API for events, promises, timers, HTTP requests

Problem: EventEmitter requires manual cleanup, lacks composition operators, and has no backpressure handling. Promises handle single async values but not streams of events over time.

Solution: Use RxJS observables for event streams with declarative operators, automatic cleanup, and production patterns like retry, timeout, and error recovery.

Standard Library First: EventEmitter

Node.js provides EventEmitter for basic event-driven programming without external dependencies.

Basic EventEmitter Pattern

EventEmitter allows objects to emit named events and register listeners.

Pattern:

import { EventEmitter } from "events";
// => Import built-in EventEmitter class
// => No external dependencies

const emitter = new EventEmitter();
// => Create event emitter instance
// => Can emit and listen to events

emitter.on("data", (value: number) => {
  // => Register event listener
  // => Called each time 'data' event emitted
  console.log("Received:", value);
  // => Output: Received: 10
});

emitter.emit("data", 10);
// => Emit event with data
// => All 'data' listeners called synchronously
// => Output: Received: 10

emitter.emit("data", 20);
// => Emit second event
// => Output: Received: 20

emitter.emit("data", 30);
// => Emit third event
// => Output: Received: 30

Multiple listeners:

const emitter = new EventEmitter();

emitter.on("userLogin", (userId: string) => {
  // => First listener
  console.log(`Logging user ${userId} activity`);
});

emitter.on("userLogin", (userId: string) => {
  // => Second listener for same event
  console.log(`Sending welcome email to ${userId}`);
});

emitter.on("userLogin", (userId: string) => {
  // => Third listener
  console.log(`Updating user ${userId} last login timestamp`);
});

emitter.emit("userLogin", "user-123");
// => Output:
// Logging user user-123 activity
// Sending welcome email to user-123
// Updating user user-123 last login timestamp
// => All listeners called in registration order

One-time listeners (automatically removed):

emitter.once("appStarted", () => {
  // => Listener called only once
  // => Automatically removed after first emission
  console.log("Application started");
});

emitter.emit("appStarted");
// => Output: Application started

emitter.emit("appStarted");
// => No output (listener already removed)

Manual cleanup:

function onData(value: number) {
  console.log("Received:", value);
}

emitter.on("data", onData);
// => Register named listener

emitter.emit("data", 10);
// => Output: Received: 10

emitter.off("data", onData);
// => Remove specific listener
// => Must use same function reference

emitter.emit("data", 20);
// => No output (listener removed)

// Remove all listeners for event
emitter.removeAllListeners("data");
// => Removes ALL 'data' listeners

Error handling:

emitter.on("error", (error: Error) => {
  // => Special 'error' event
  // => If not handled, throws and crashes process
  console.error("Error occurred:", error.message);
});

emitter.emit("error", new Error("Something went wrong"));
// => Output: Error occurred: Something went wrong
// => Error handled gracefully (no crash)

// Without error listener:
const unsafeEmitter = new EventEmitter();
unsafeEmitter.emit("error", new Error("Unhandled error"));
// => Throws error and crashes process
// => ALWAYS handle 'error' events in production

Stream simulation (manual composition):

const source = new EventEmitter();
const transformed = new EventEmitter();
// => Two emitters to simulate pipeline

source.on("data", (value: number) => {
  // => Listen to source
  const doubled = value * 2;
  // => Transform: multiply by 2
  transformed.emit("data", doubled);
  // => Emit to next stage
});

transformed.on("data", (value: number) => {
  // => Listen to transformed
  console.log("Doubled:", value);
});

source.emit("data", 5);
// => Output: Doubled: 10

source.emit("data", 10);
// => Output: Doubled: 20

// Manual composition is verbose and error-prone
// No built-in operators for map, filter, reduce, etc.

Limitations for production:

  • Manual cleanup: Must track and remove listeners (memory leaks)
  • No composition operators: No map, filter, merge, debounce, etc.
  • No backpressure: Fast producer overwhelms slow consumer
  • No error recovery: Errors must be handled manually
  • No automatic unsubscribe: Listeners remain until explicitly removed
  • Synchronous only: Cannot handle async operations in pipeline
  • No time-based operators: Must implement debounce/throttle manually

When standard library suffices:

  • Simple pub/sub within single module
  • No need for operator composition
  • Low event frequency (no backpressure needed)
  • Short-lived listeners (process lifetime)

Production Framework: RxJS

RxJS provides powerful observables with declarative operators, automatic resource cleanup, and production patterns for complex event streams.

Installation and Basic Setup

npm install rxjs
# => Install RxJS library
# => Industry standard for reactive programming
# => Used by Angular, TypeScript, Node.js ecosystem

Basic observable creation:

import { Observable } from "rxjs";
// => Import Observable class
// => Core primitive for reactive programming

const observable = new Observable<number>((subscriber) => {
  // => Create observable with subscriber function
  // => subscriber has next(), error(), complete() methods
  subscriber.next(10);
  // => Emit value 10
  // => Synchronous emission

  subscriber.next(20);
  // => Emit value 20

  subscriber.next(30);
  // => Emit value 30

  subscriber.complete();
  // => Signal completion (no more values)
  // => Triggers completion callback in subscriber
});

observable.subscribe({
  // => Subscribe to observable
  // => Starts execution
  next: (value) => {
    // => Handle emitted values
    console.log("Value:", value);
    // => Output: Value: 10, Value: 20, Value: 30
  },
  error: (err) => {
    // => Handle errors
    console.error("Error:", err);
  },
  complete: () => {
    // => Handle completion
    console.log("Complete");
    // => Output: Complete
  },
});

Observable from events (EventEmitter conversion):

import { fromEvent } from "rxjs";
// => Create observable from DOM/Node events

const emitter = new EventEmitter();

const observable = fromEvent<number>(emitter, "data");
// => Convert EventEmitter to Observable
// => Type-safe with generics

const subscription = observable.subscribe((value) => {
  // => Subscribe to event stream
  console.log("Received:", value);
});

emitter.emit("data", 10);
// => Output: Received: 10

emitter.emit("data", 20);
// => Output: Received: 20

subscription.unsubscribe();
// => Automatic cleanup
// => Removes EventEmitter listener
// => No memory leak

emitter.emit("data", 30);
// => No output (unsubscribed)

Observable from Promise:

import { from } from "rxjs";
// => Convert Promise to Observable

const promise = fetch("https://api.example.com/users/123").then((r) => r.json());
// => Promise that resolves to User

const observable = from(promise);
// => Convert to Observable
// => Emits single value then completes

observable.subscribe({
  next: (user) => console.log("User:", user),
  // => Output: User: { id: 123, name: "Alice" }
  complete: () => console.log("Complete"),
  // => Output: Complete
});

Declarative Operators

RxJS provides 100+ operators for transforming event streams declaratively.

map (transform values):

import { of } from "rxjs";
import { map } from "rxjs/operators";
// => Import operators

const source = of(1, 2, 3, 4, 5);
// => Create observable from values
// => Emits 1, 2, 3, 4, 5 then completes

const doubled = source.pipe(
  // => pipe applies operators
  map((value) => value * 2),
  // => Transform each value
  // => Multiply by 2
);

doubled.subscribe((value) => console.log(value));
// => Output: 2, 4, 6, 8, 10

filter (select values):

import { filter } from "rxjs/operators";

const source = of(1, 2, 3, 4, 5);

const evens = source.pipe(
  filter((value) => value % 2 === 0),
  // => Keep only even numbers
  // => Predicate function
);

evens.subscribe((value) => console.log(value));
// => Output: 2, 4

Chaining operators:

const result = source.pipe(
  // => Chain multiple operators
  filter((value) => value % 2 === 0),
  // => Step 1: Keep evens (2, 4)
  map((value) => value * 10),
  // => Step 2: Multiply by 10 (20, 40)
  map((value) => `Value: ${value}`),
  // => Step 3: Format as string
);

result.subscribe((value) => console.log(value));
// => Output: Value: 20, Value: 40
// => Operators applied in order

reduce (accumulate values):

import { reduce } from "rxjs/operators";

const source = of(1, 2, 3, 4, 5);

const sum = source.pipe(
  reduce((acc, value) => acc + value, 0),
  // => Accumulate sum
  // => Initial value: 0
  // => Emits single value when source completes
);

sum.subscribe((value) => console.log("Sum:", value));
// => Output: Sum: 15
// => Emitted after source completes

debounceTime (ignore rapid events):

import { debounceTime } from "rxjs/operators";

const searchInput = fromEvent<Event>(inputElement, "input");
// => Observable from input events

const debouncedSearch = searchInput.pipe(
  debounceTime(300),
  // => Wait 300ms after last event
  // => Ignores rapid typing
  map((event) => (event.target as HTMLInputElement).value),
  // => Extract input value
);

debouncedSearch.subscribe((query) => {
  // => Called only after user stops typing for 300ms
  console.log("Search:", query);
  performSearch(query);
  // => Prevents excessive API calls
});

throttleTime (limit event rate):

import { throttleTime } from "rxjs/operators";

const clicks = fromEvent(button, "click");
// => Observable from button clicks

const throttled = clicks.pipe(
  throttleTime(1000),
  // => Emit at most once per 1000ms
  // => Ignores clicks during throttle period
);

throttled.subscribe(() => {
  console.log("Button clicked (throttled)");
  // => Called at most once per second
  // => Prevents button spam
});

switchMap (switch to new observable):

import { switchMap } from "rxjs/operators";

const searchInput = fromEvent<Event>(inputElement, "input");

const results = searchInput.pipe(
  debounceTime(300),
  map((event) => (event.target as HTMLInputElement).value),
  switchMap((query) => {
    // => Switch to new observable
    // => Cancels previous HTTP request if new one starts
    return from(fetch(`/api/search?q=${query}`).then((r) => r.json()));
    // => Returns observable from Promise
  }),
);

results.subscribe((data) => {
  console.log("Search results:", data);
  // => Only receives results from latest search
  // => Previous requests cancelled automatically
});

mergeMap (merge multiple observables):

import { mergeMap } from "rxjs/operators";

const userIds = of("1", "2", "3");

const users = userIds.pipe(
  mergeMap((id) => {
    // => Transform each id to observable
    // => All observables run concurrently
    return from(fetchUser(id));
    // => Fetch user (returns Promise)
  }),
);

users.subscribe((user) => console.log("User:", user));
// => Output: User: {...}, User: {...}, User: {...}
// => Fetches all users concurrently
// => Results may arrive in any order

catchError (error recovery):

import { catchError } from "rxjs/operators";

const source = from(fetchUser("123"));

const withFallback = source.pipe(
  catchError((error) => {
    // => Handle error
    console.error("Fetch failed:", error);
    return of({ id: "123", name: "Unknown" });
    // => Return fallback observable
    // => Stream continues with fallback value
  }),
);

withFallback.subscribe((user) => console.log("User:", user));
// => Output: User: { id: "123", name: "Unknown" } (if fetch fails)

retry (automatic retry):

import { retry } from "rxjs/operators";

const source = from(fetchUser("123"));

const withRetry = source.pipe(
  retry(3),
  // => Retry up to 3 times on error
  // => Re-subscribes to source observable
  catchError((error) => {
    // => After 3 failed retries
    console.error("Failed after 3 retries:", error);
    return of(null);
    // => Return null as fallback
  }),
);

withRetry.subscribe((user) => console.log("User:", user));
// => Retries 3 times before giving up

Subjects (Hot Observables)

Subjects are both observable and observer, allowing multicasting to multiple subscribers.

Subject (multicast):

import { Subject } from "rxjs";

const subject = new Subject<number>();
// => Create subject
// => Can emit values AND be subscribed to

subject.subscribe((value) => console.log("Subscriber A:", value));
// => First subscriber

subject.subscribe((value) => console.log("Subscriber B:", value));
// => Second subscriber

subject.next(10);
// => Emit to all subscribers
// => Output:
// Subscriber A: 10
// Subscriber B: 10

subject.next(20);
// => Output:
// Subscriber A: 20
// Subscriber B: 20

// Subjects are hot (emit whether subscribers exist or not)

BehaviorSubject (current value):

import { BehaviorSubject } from "rxjs";

const subject = new BehaviorSubject<number>(0);
// => Create BehaviorSubject with initial value 0
// => Always has current value

console.log("Current value:", subject.value);
// => Output: Current value: 0
// => Can access current value synchronously

subject.subscribe((value) => console.log("Subscriber A:", value));
// => Output: Subscriber A: 0
// => New subscribers immediately receive current value

subject.next(10);
// => Output: Subscriber A: 10

subject.subscribe((value) => console.log("Subscriber B:", value));
// => Output: Subscriber B: 10
// => Receives current value (10) immediately

subject.next(20);
// => Output:
// Subscriber A: 20
// Subscriber B: 20

// Use case: State management, current user, configuration

ReplaySubject (buffer emissions):

import { ReplaySubject } from "rxjs";

const subject = new ReplaySubject<number>(3);
// => Replay last 3 emissions to new subscribers
// => Buffer size: 3

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
// => Emitted to existing subscribers

subject.subscribe((value) => console.log("Subscriber A:", value));
// => Output: Subscriber A: 2, Subscriber A: 3, Subscriber A: 4
// => Receives last 3 values (buffer size)

subject.next(5);
// => Output: Subscriber A: 5

// Use case: Recent messages, event history, logging

AsyncSubject (last value only):

import { AsyncSubject } from "rxjs";

const subject = new AsyncSubject<number>();
// => Emits only last value when complete

subject.next(10);
subject.next(20);
subject.next(30);
// => Values buffered (not emitted yet)

subject.subscribe((value) => console.log("Subscriber A:", value));
// => No output yet (not completed)

subject.complete();
// => Output: Subscriber A: 30
// => Emits last value (30) on completion

// Use case: HTTP requests, async computations

Hot vs Cold Observables

Cold observable (unicast, lazy):

const cold = new Observable<number>((subscriber) => {
  // => Producer created on each subscription
  // => Each subscriber gets independent execution
  console.log("Producer started");
  subscriber.next(Math.random());
  // => Random value per subscriber
});

cold.subscribe((value) => console.log("Subscriber A:", value));
// => Output: Producer started, Subscriber A: 0.123

cold.subscribe((value) => console.log("Subscriber B:", value));
// => Output: Producer started, Subscriber B: 0.456
// => Different value (independent producer)

// Cold observables:
// - Lazy (start on subscription)
// - Unicast (one producer per subscriber)
// - Independent (each subscriber gets own execution)

Hot observable (multicast, eager):

const subject = new Subject<number>();
// => Hot observable
// => Single shared producer

subject.subscribe((value) => console.log("Subscriber A:", value));
subject.subscribe((value) => console.log("Subscriber B:", value));

subject.next(Math.random());
// => Output:
// Subscriber A: 0.789
// Subscriber B: 0.789
// => Same value (shared producer)

// Hot observables:
// - Eager (producer active before subscription)
// - Multicast (single producer, many subscribers)
// - Shared (all subscribers receive same values)

Backpressure Handling

Control fast producers with slow consumers to prevent memory overflow.

throttleTime (sampling):

import { interval } from "rxjs";
import { throttleTime } from "rxjs/operators";

const fast = interval(10);
// => Emit every 10ms (fast producer)

const throttled = fast.pipe(
  throttleTime(1000),
  // => Emit at most once per 1000ms
  // => Drops intermediate values
);

throttled.subscribe((value) => console.log("Throttled:", value));
// => Output: Throttled: 0, Throttled: 100, Throttled: 200, ...
// => Only ~10% of values (sampling)

bufferTime (batching):

import { bufferTime } from "rxjs/operators";

const fast = interval(10);

const buffered = fast.pipe(
  bufferTime(1000),
  // => Collect values for 1000ms
  // => Emit array of collected values
);

buffered.subscribe((batch) => console.log("Batch:", batch));
// => Output: Batch: [0, 1, 2, ..., 99]
// => Processes batches instead of individual values
// => Reduces consumer load

bufferCount (batch by count):

import { bufferCount } from "rxjs/operators";

const source = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

const batched = source.pipe(
  bufferCount(3),
  // => Collect 3 values per batch
  // => Emit array when batch full
);

batched.subscribe((batch) => console.log("Batch:", batch));
// => Output:
// Batch: [1, 2, 3]
// Batch: [4, 5, 6]
// Batch: [7, 8, 9]
// Batch: [10]

Production Benefits

  • Declarative composition: 100+ operators for stream transformation
  • Automatic cleanup: Unsubscribe removes all listeners and timers
  • Error propagation: Errors flow through operator chain
  • Backpressure: Throttle, debounce, buffer for flow control
  • Time-based operators: Delay, timeout, interval built-in
  • Hot and cold: Support both unicast and multicast patterns
  • Type safety: Full TypeScript support with generics

Trade-offs

  • Learning curve: Many operators and concepts to master
  • Bundle size: ~20KB minified (can tree-shake)
  • Complexity: Overkill for simple event handling
  • Debugging: Stack traces can be unclear

When to use RxJS

  • Complex event streams (UI interactions, WebSockets)
  • Operator composition (map, filter, merge, debounce)
  • Automatic cleanup required (prevent memory leaks)
  • Backpressure handling (fast producer, slow consumer)
  • Time-based operations (debounce, throttle, retry)
  • Reactive state management (Redux Observable, NGRX)

Reactive Pattern Progression Diagram

  %% Color Palette: Blue #0173B2, Orange #DE8F05, Teal #029E73, Purple #CC78BC
%% All colors are color-blind friendly and meet WCAG AA contrast standards

graph TB
    A[EventEmitter] -->|Need operators| B[RxJS Observables]
    A -->|Need cleanup| B
    A -->|Need backpressure| B

    A:::standard
    B:::framework

    classDef standard fill:#CC78BC,stroke:#000000,color:#FFFFFF,stroke-width:2px
    classDef framework fill:#029E73,stroke:#000000,color:#FFFFFF,stroke-width:2px

    subgraph Standard[" Standard Library "]
        A
    end

    subgraph Production[" Production Framework "]
        B
    end

    style Standard fill:#F0F0F0,stroke:#0173B2,stroke-width:3px
    style Production fill:#F0F0F0,stroke:#029E73,stroke-width:3px

Production Patterns and Best Practices

Unsubscribe on Component Destroy

Prevent memory leaks by unsubscribing when component destroyed.

Pattern (manual unsubscribe):

import { Subscription } from "rxjs";

class UserComponent {
  private subscription = new Subscription();
  // => Subscription container

  constructor() {
    const user$ = this.userService.getCurrentUser();

    this.subscription.add(
      user$.subscribe((user) => {
        // => Subscribe to user stream
        this.displayUser(user);
      }),
    );

    const notifications$ = this.notificationService.getNotifications();

    this.subscription.add(
      notifications$.subscribe((notification) => {
        // => Subscribe to notifications
        this.showNotification(notification);
      }),
    );
  }

  destroy() {
    // => Called when component destroyed
    this.subscription.unsubscribe();
    // => Unsubscribe from ALL subscriptions
    // => Prevents memory leaks
  }
}

Pattern (takeUntil):

import { Subject } from "rxjs";
import { takeUntil } from "rxjs/operators";

class UserComponent {
  private destroy$ = new Subject<void>();
  // => Subject to signal destruction

  constructor() {
    this.userService
      .getCurrentUser()
      .pipe(
        takeUntil(this.destroy$),
        // => Unsubscribe when destroy$ emits
        // => Declarative cleanup
      )
      .subscribe((user) => this.displayUser(user));

    this.notificationService
      .getNotifications()
      .pipe(takeUntil(this.destroy$))
      .subscribe((notification) => this.showNotification(notification));
  }

  destroy() {
    this.destroy$.next();
    // => Emit value to trigger unsubscribe
    this.destroy$.complete();
    // => Complete subject
  }
}

Error Recovery Strategies

retry with delay:

import { retryWhen, delay, take } from "rxjs/operators";

const source = from(fetchUser("123"));

const withRetry = source.pipe(
  retryWhen((errors) =>
    errors.pipe(
      // => Error stream
      delay(1000),
      // => Wait 1 second between retries
      take(3),
      // => Retry max 3 times
    ),
  ),
  catchError((error) => {
    console.error("Failed after retries:", error);
    return of(null);
  }),
);

withRetry.subscribe((user) => console.log("User:", user));
// => Retries 3 times with 1 second delay

exponential backoff:

import { retryWhen, scan, delayWhen, timer } from "rxjs/operators";

const source = from(fetchUser("123"));

const withBackoff = source.pipe(
  retryWhen((errors) =>
    errors.pipe(
      scan((retryCount) => {
        // => Track retry count
        if (retryCount >= 5) {
          throw new Error("Max retries exceeded");
        }
        return retryCount + 1;
      }, 0),
      delayWhen((retryCount) => {
        // => Exponential delay
        const delayMs = Math.pow(2, retryCount) * 1000;
        // => 2s, 4s, 8s, 16s, 32s
        console.log(`Retry ${retryCount} after ${delayMs}ms`);
        return timer(delayMs);
      }),
    ),
  ),
);

withBackoff.subscribe({
  next: (user) => console.log("User:", user),
  error: (err) => console.error("Failed:", err),
});

Combining Multiple Streams

combineLatest (wait for all, emit on any change):

import { combineLatest } from "rxjs";

const user$ = this.userService.getCurrentUser();
const settings$ = this.settingsService.getSettings();

const combined$ = combineLatest([user$, settings$]);
// => Waits for both to emit at least once
// => Then emits on any change

combined$.subscribe(([user, settings]) => {
  console.log("User:", user, "Settings:", settings);
  // => Receives latest value from both
});

forkJoin (wait for all, emit once when all complete):

import { forkJoin } from "rxjs";

const users$ = from(fetchUsers());
const posts$ = from(fetchPosts());
const comments$ = from(fetchComments());

const all$ = forkJoin({ users: users$, posts: posts$, comments: comments$ });
// => Waits for ALL observables to complete
// => Emits single object with results

all$.subscribe(({ users, posts, comments }) => {
  console.log(`Loaded ${users.length} users, ${posts.length} posts, ${comments.length} comments`);
});
// => Like Promise.all but for observables

merge (combine multiple streams):

import { merge } from "rxjs";

const clicks$ = fromEvent(button, "click");
const keypresses$ = fromEvent(document, "keypress");

const interactions$ = merge(clicks$, keypresses$);
// => Combines both streams
// => Emits from either source

interactions$.subscribe(() => {
  console.log("User interaction detected");
});

State Management with BehaviorSubject

Centralized state with reactive updates.

Pattern:

class UserStore {
  private userSubject = new BehaviorSubject<User | null>(null);
  // => Current user state
  // => Initial value: null (not logged in)

  public user$ = this.userSubject.asObservable();
  // => Expose as observable (read-only)
  // => Components subscribe to this

  async login(email: string, password: string): Promise<void> {
    const user = await authService.login(email, password);
    // => Authenticate user

    this.userSubject.next(user);
    // => Update state
    // => All subscribers receive new user
  }

  logout(): void {
    this.userSubject.next(null);
    // => Clear user state
    // => All subscribers receive null
  }

  getCurrentUser(): User | null {
    return this.userSubject.value;
    // => Synchronous access to current value
  }
}

const userStore = new UserStore();

// Component 1: Header
userStore.user$.subscribe((user) => {
  if (user) {
    console.log(`Welcome, ${user.name}`);
  } else {
    console.log("Not logged in");
  }
});

// Component 2: Sidebar
userStore.user$.subscribe((user) => {
  // => Receives same state updates
  updateSidebar(user);
});

await userStore.login("alice@example.com", "password");
// => Both components receive updated user

Trade-offs and When to Use Each

EventEmitter (Standard Library)

Use when:

  • Simple pub/sub within single module
  • No operator composition needed
  • Event-driven architecture (simple)
  • No automatic cleanup required

Avoid when:

  • Need declarative operators (map, filter, debounce)
  • Automatic cleanup required (prevent leaks)
  • Backpressure handling needed
  • Complex stream transformations

RxJS Observables (Production)

Use when:

  • Complex event streams (UI, WebSockets, timers)
  • Operator composition (map, filter, merge, switchMap)
  • Automatic cleanup critical (prevent memory leaks)
  • Backpressure handling (throttle, debounce, buffer)
  • Time-based operations (delay, timeout, interval)
  • State management with reactive updates

Avoid when:

  • Simple event handling (EventEmitter simpler)
  • Team unfamiliar with reactive programming
  • Bundle size constraints (use tree-shaking)

Common Pitfalls

Pitfall 1: Forgetting to Unsubscribe

Problem: Subscriptions remain active, causing memory leaks.

Solution: Always unsubscribe or use takeUntil.

// Bad: No unsubscribe
interval(1000).subscribe((value) => console.log(value));
// => Runs forever (memory leak)

// Good: Manual unsubscribe
const subscription = interval(1000).subscribe((value) => console.log(value));
setTimeout(() => subscription.unsubscribe(), 5000);
// => Unsubscribe after 5 seconds

// Better: takeUntil
const destroy$ = new Subject<void>();
interval(1000)
  .pipe(takeUntil(destroy$))
  .subscribe((value) => console.log(value));
setTimeout(() => destroy$.next(), 5000);
// => Declarative cleanup

Pitfall 2: Nested Subscriptions

Problem: Nested subscriptions are hard to manage and unsubscribe.

Solution: Use switchMap, mergeMap, or concatMap.

// Bad: Nested subscriptions
userService.getCurrentUser().subscribe((user) => {
  orderService.getOrders(user.id).subscribe((orders) => {
    // => Nested subscription (hard to cleanup)
    console.log(orders);
  });
});

// Good: switchMap
userService
  .getCurrentUser()
  .pipe(
    switchMap((user) => orderService.getOrders(user.id)),
    // => Flat composition
  )
  .subscribe((orders) => console.log(orders));

Pitfall 3: Not Handling Errors

Problem: Unhandled errors terminate observable stream.

Solution: Use catchError to recover.

// Bad: No error handling
source.subscribe((value) => console.log(value));
// => Error terminates stream (no more emissions)

// Good: catchError
source
  .pipe(
    catchError((error) => {
      console.error("Error:", error);
      return of(null);
      // => Recover with fallback
    }),
  )
  .subscribe((value) => console.log(value));

Pitfall 4: Hot vs Cold Confusion

Problem: Cold observable creates new producer per subscriber.

Solution: Use share() or Subject for multicast.

// Cold: Each subscriber gets separate HTTP request
const cold$ = from(fetchUser("123"));
cold$.subscribe((user) => console.log("A:", user)); // => HTTP request 1
cold$.subscribe((user) => console.log("B:", user)); // => HTTP request 2

// Hot: Single HTTP request, shared result
const hot$ = cold$.pipe(share());
// => Multicast to all subscribers
hot$.subscribe((user) => console.log("A:", user)); // => HTTP request 1
hot$.subscribe((user) => console.log("B:", user)); // => Reuses request 1

Summary

Reactive programming treats data as streams of events over time with declarative operators. Standard library EventEmitter lacks composition and cleanup, while RxJS provides observables with automatic resource management and production patterns.

Progression path:

  1. Start with EventEmitter: Simple pub/sub fundamentals
  2. Add RxJS: Declarative operators and automatic cleanup
  3. Master patterns: Subjects, backpressure, error recovery

Production checklist:

  • ✅ Unsubscribe or use takeUntil (prevent memory leaks)
  • ✅ Error handling with catchError (prevent stream termination)
  • ✅ Backpressure management (throttle, debounce, buffer)
  • ✅ Flatten nested subscriptions (use switchMap/mergeMap)
  • ✅ Choose hot vs cold appropriately (multicast vs unicast)
  • ✅ State management with BehaviorSubject (reactive state)
  • ✅ Retry with backoff (handle transient failures)

Choose reactive strategy based on complexity: EventEmitter for simple events, RxJS for complex event streams.

Last updated