Reactive Streams in Angular: From Observables to Signals

Learn how to simplify Angular state management by moving from traditional RxJS observables to the modern Signals API. Cleaner code, no BehaviorSubjects, and better readability.

Published: 6/6/2024

In these examples the data state is reactively updated whenever selectedID changes.

By using switchMap:

  • Rapid emissions are handled, only the latest is subscribed to, older ones are canceled
  • Angular handles unsubscribing from observables

The code is broken into distinct examples: for observables, for signals. They can be easily combined with selective copy and paste.

The selectedID$ is an observable, and the loaded data$ is also an observable.

import {Injectable} from '@angular/core';
import {BehaviorSubject, catchError, delay, filter, map, Observable, of, switchMap, tap} from 'rxjs';

@Injectable({
  providedIn: 'root'
})
export class AppService {

  // State: Observable
  private _selectedID$ = new BehaviorSubject<string | undefined>(undefined);
  public get selectedID$(): Observable<string | undefined> {
    return this._selectedID$.asObservable();
  }

  public set selectedID$(value: string) {
    this._selectedID$.next(value);
  }

  public data$ = this.selectedID$.pipe(
      filter((id) => id !== undefined),
      tap((id) => console.log('Observable ID:', id)),
      switchMap((id) => this.getByID$(id).pipe(
          catchError(error => {
            return of(error.message);
          })
      )),
  )

  // GET: Get by ID
  public getByID$(id: string): Observable<any> {
    // Mock HTTP call
    return of(id).pipe(
        delay(200),
        map((id) => {
          switch (id) {
            case '1':
              return 'Alice loves apples!';
            case '2':
              return 'Bob eats all the bananas!';
            default:
              throw new Error(`Invalid ID: ${id}`);
          }
        })
    );
  }
}
import {Component, inject} from '@angular/core';
import {RouterOutlet} from '@angular/router';
import {AsyncPipe, JsonPipe} from '@angular/common';

import {AppService} from './app.service';

@Component({
  selector: 'app-root',
  standalone: true,
  imports: [AsyncPipe, JsonPipe],
  template: `
    <button (click)="updateObservableID('1')">Apple</button>
    <button (click)="updateObservableID('2')">Banana</button>
    <button (click)="updateObservableID('13')">Mango</button>
    <pre>{{ data$ | async | json }}</pre>
  `
})
export class AppComponent {
  appService = inject(AppService);

  public data$ = this.appService.data$;

  updateObservableID(value: string) {
    this.appService.selectedID$ = value;
  }
}

The selectedID is a signal, and the loaded data is also a signal.

The method that loads new data uses rxjs-interop to convert between observables and signals.

import {Injectable, Signal, signal} from '@angular/core';
import {catchError, delay, filter, map, Observable, of, switchMap, tap} from 'rxjs';
import {toObservable, toSignal} from '@angular/core/rxjs-interop';

@Injectable({
  providedIn: 'root'
})
export class AppService {

  // State: Signal
  private _selectedID = signal<string | undefined>(undefined);
  public get selectedID(): Signal<string | undefined> {
    return this._selectedID.asReadonly();
  }

  public set selectedID(value: string) {
    this._selectedID.set(value);
  }

  public data = toSignal(
      toObservable<string | undefined>(this.selectedID).pipe(
          filter((id) => id !== undefined),
          tap((id) => console.log('Signal ID:', id)),
          switchMap((id) => this.getByID$(id).pipe(
              catchError(error => {
                return of(error.message);
              })
          )),
      ),
      {initialValue: null}
  )

  // GET: Get by ID
  public getByID$(id: string): Observable<any> {
    // Mock HTTP call
    return of(id).pipe(
        delay(200),
        map((id) => {
          switch (id) {
            case '1':
              return 'Alice loves apples!';
            case '2':
              return 'Bob eats all the bananas!';
            default:
              throw new Error(`Invalid ID: ${id}`);
          }
        })
    );
  }
}
import {Component, inject} from '@angular/core';
import {RouterOutlet} from '@angular/router';
import {JsonPipe} from '@angular/common';

import {AppService} from './app.service';

@Component({
  selector: 'app-root',
  standalone: true,
  imports: [JsonPipe],
  template: `
    <button (click)="updateSignalID('1')">Apple</button>
    <button (click)="updateSignalID('2')">Banana</button>
    <button (click)="updateSignalID('13')">Mango</button>
    <pre>{{ data() | json }}</pre>
  `
})
export class AppComponent {
  appService = inject(AppService);

  public data = this.appService.data;

  updateSignalID(value: string) {
    this.appService.selectedID = value;
  }
}