import { BehaviorSubject, filter, Observable } from 'rxjs';

export interface EventSourceHandlers {
  onMessage?: () => void;
  onEstablish?: (disconnect?: boolean) => void;
  onError?: (isFixed?: boolean) => void;
}

export abstract class StreamBase<T> {
  private readonly _items$ = new BehaviorSubject<T | undefined>(undefined);
  public items$ = this._items$.asObservable().pipe(filter(value => value != null)) as Observable<T>;

  abstract subscribeToStream(token: string, handlers?: EventSourceHandlers): Observable<T>;

  abstract unsubscribeFromStream(): void;

  protected emitValue(value: T | undefined): void {
    this._items$.next(value);
  }
}
