import { GrpcMessage, GrpcMetadata, GrpcStatusEvent } from '@ngx-grpc/common';
import * as googleProtobuf from '@ngx-grpc/well-known-types';
import { EventSourceHandlers, StreamBase } from '@pinnakl/shared/types';
import { BehaviorSubject, Observable, Subscription } from 'rxjs';

interface BaseClient<T> {
  subscribe(requestData: googleProtobuf.Empty, requestMetadata: GrpcMetadata): Observable<T>;
}

interface BaseMessage extends GrpcMessage {
  id?: string;
}

export interface BaseResponse {
  message: BaseMessage | number | undefined;
}

const MAX_RETRIES_COUNT = 5;
const TIMEOUT_INTERVAL = 2000;

export abstract class BaseGrpcService<T, K extends BaseResponse> extends StreamBase<T> {
  private readonly connectionEstablished$ = new BehaviorSubject<boolean>(false);
  private readonly retriesCount$ = new BehaviorSubject<number>(0);
  grpcClientStream?: Subscription;
  subscribeConfig: { token: string; handlers?: EventSourceHandlers } | null = null;
  client!: BaseClient<K>;

  constructor() {
    super();
    this.handleTabVisibility();
  }

  transformFn = (message: any): T => message as T;

  subscribeToStream(token: string, handlers?: EventSourceHandlers): Observable<T> {
    this.unsubscribeFromStream();
    this.subscribeConfig = { token, handlers };
    const headers = new GrpcMetadata();
    headers.set('Authorization', `Bearer ${token}`);
    this.grpcClientStream = this.client.subscribe(new googleProtobuf.Empty(), headers).subscribe({
      next: response => this.handleMessage(response, handlers),
      error: error => this.handleError(error, handlers),
      complete: () => this.handleMessage(undefined, handlers)
    });
    return this.items$;
  }

  unsubscribeFromStream(): void {
    this.grpcClientStream && this.grpcClientStream.unsubscribe();
    this.emitValue(undefined);
    this.grpcClientStream = undefined;
    this.connectionEstablished$.next(false);
  }

  protected handleError(error?: GrpcStatusEvent, handlers?: EventSourceHandlers): void {
    console.warn(error);
    handlers?.onEstablish?.(false);
    handlers?.onError?.(false);
    this.unsubscribeFromStream();
    this.tryToResubscribe();
  }

  protected handleMessage(response?: K, handlers?: EventSourceHandlers): void {
    if (!this.connectionEstablished$.getValue()) {
      this.connectionEstablished$.next(true);
      handlers?.onEstablish?.(true);
      handlers?.onError?.(true);
      this.retriesCount$.next(0);
    }
    if (response) {
      const { message } = response;
      if (message && typeof message !== 'number') {
        this.emitValue(this.transformFn(message.toJSON()));
        handlers?.onMessage?.();
      }
    }
  }

  protected handleTabVisibility() {
    document.onvisibilitychange = () => {
      if (document.visibilityState === 'visible' && this.connectionEstablished$.getValue()) {
        this.tryToResubscribe();
      }
    };
  }

  private tryToResubscribe() {
    const prevCount = this.retriesCount$.getValue();
    if (prevCount <= MAX_RETRIES_COUNT) {
      setTimeout(
        () => {
          this.retriesCount$.next(prevCount + 1);
          this.subscribeConfig?.handlers?.onError?.(true);
          this.subscribeConfig &&
            this.subscribeToStream(this.subscribeConfig.token, this.subscribeConfig.handlers);
        },
        Math.max(prevCount, 1) * TIMEOUT_INTERVAL
      );
    }
  }
}
