import { Injectable, NgZone } from '@angular/core';
import { RpcWebSocketClient } from '@deepkit/rpc';
import { UntilDestroy, untilDestroyed } from '@ngneat/until-destroy';
import { combineLatest, defer, NEVER, Observable } from 'rxjs';
import { skip, startWith, distinctUntilChanged, switchMap, shareReplay, tap } from 'rxjs/operators';
import {
  PresenceRPCInterface, ThreadsRPC, ThreadsRPCInterface, AppNotificationRPC, AppNotificationsRPCInterface, OnlineRPCInterface, OnlineRPC, AiRPCInterface, AiRPC, FilesRPCInterface, FilesRPC,
  JamsRPCInterface,
  JamsRPC,
} from 'shared/rpc-interface';
import { AuthService } from './auth.service';
import { TeamsQuery } from './team.query';
import { MB } from 'magma/common/constants';


@Injectable()
@UntilDestroy()
export class RpcService {
  private deepkitClient = RpcWebSocketClient.fromCurrentHost('/api/teams/rpc');
  private deepkitMainProcessClient = RpcWebSocketClient.fromCurrentHost('/api/main/rpc');
  presence = this.deepkitClient.controller(PresenceRPCInterface);
  threads = this.deepkitClient.controller<ThreadsRPCInterface>(ThreadsRPC);
  notifications = this.deepkitClient.controller<AppNotificationsRPCInterface>(AppNotificationRPC);
  online = this.deepkitClient.controller<OnlineRPCInterface>(OnlineRPC);
  ai = this.deepkitClient.controller<AiRPCInterface>(AiRPC);
  files = this.deepkitMainProcessClient.controller<FilesRPCInterface>(FilesRPC);
  jams = this.deepkitMainProcessClient.controller<JamsRPCInterface>(JamsRPC); // jams needs to be able to update drawing data

  private autoReconnect = false;
  private reconnecting: NodeJS.Timeout | undefined;
  private reconnectingMainProcessRpc: NodeJS.Timeout | undefined;

  reconnected$ = this.deepkitClient.transporter.reconnected;
  isConnected$ = this.deepkitClient.transporter.connection;
  disconnected$ = this.deepkitClient.transporter.disconnected;

  trackConnection$ = this.isConnected$.pipe(skip(1), startWith(true));

  constructor(authService: AuthService, private teamsQuery: TeamsQuery, private ngZone: NgZone) {
    this.deepkitMainProcessClient.transporter.writerOptions.chunkSize = 1 * MB;

    authService.token$.pipe(skip(1), untilDestroyed(this)).subscribe((newToken) => {
      if (this.reconnecting) {
        clearTimeout(this.reconnecting);
        this.reconnecting = undefined;
      }
      this.autoReconnect = false;
      void this.deepkitClient.disconnect();
      void this.deepkitMainProcessClient.disconnect();
    });

    combineLatest([this.isConnected$, this.teamsQuery.selectCount().pipe(distinctUntilChanged())])
      .pipe(
        switchMap(([status, teamCount]) => {
          if (status) {
            this.autoReconnect = true;
          }
          if (status && teamCount > 0) {
            return defer(async () => this.online.online()).pipe(switchMap(o => o));
          }
          return NEVER;
        }),
        untilDestroyed(this),
      ).subscribe();

    this.disconnected$.pipe(untilDestroyed(this)).subscribe(() => {
      if (this.autoReconnect) {
        //  this.toastService.error({
        //    message: 'We lost the connection with the server. Some features may be degraded. Reconnecting ...',
        //    timeout: 3000,
        //  });
        this.tryToConnect();
      }
    });

    this.reconnected$.pipe(untilDestroyed(this)).subscribe(() => {
      if (this.reconnecting) {
        // this.toastService.success({
        //   message: 'Connection restored',
        //   timeout: 2000,
        // });
        DEVELOPMENT && console.info('Connection restored');
        clearTimeout(this.reconnecting);
        this.reconnecting = undefined;
      }
    });

    this.deepkitMainProcessClient.transporter.disconnected.pipe(untilDestroyed(this)).subscribe(() => {
      if (this.autoReconnect) this.tryToConnectMainProcessRpc();
    });

    this.deepkitMainProcessClient.transporter.reconnected.pipe(untilDestroyed(this)).subscribe(() => {
      if (this.reconnectingMainProcessRpc) {
        DEVELOPMENT && console.info('Connection to main rpc restored');
        clearTimeout(this.reconnectingMainProcessRpc);
        this.reconnectingMainProcessRpc = undefined;
      }
    });

    this.tryToConnectMainProcessRpc();
  }

  tryToConnect() {
    this.deepkitClient.connect().catch(() => {
      this.reconnecting = setTimeout(() => {
        this.tryToConnect();
      }, 2_000);
    });
  }

  tryToConnectMainProcessRpc() {
    this.deepkitMainProcessClient.connect().catch(() => {
      this.reconnectingMainProcessRpc = setTimeout(() => {
        this.tryToConnectMainProcessRpc();
      }, 2_000);
    });
  }

  ngOnDestroy() {
    this.autoReconnect = false;
    this.reconnecting && clearTimeout(this.reconnecting);
    void this.deepkitClient.disconnect();
    void this.deepkitMainProcessClient.disconnect();
  }

  // this will resubscribe after connection loss
  // it is subscribed to main process connection so use it with caution
  watch<T>(f: () => Promise<Observable<T>>) {
    return this.deepkitMainProcessClient.transporter.connection.pipe(
      switchMap(status => {
        if (status) {
          return defer(f).pipe(switchMap(obs => obs));
        } else {
          return NEVER;
        }
      }),
      shareReplay({ refCount: true, bufferSize: 1 }),
    );
  }
}
