import Peer from "peerjs";
import { fromEvent, Observable, Subject, merge } from "rxjs";
import { shareReplay, take, map, mergeMap, switchMap } from "rxjs/operators";

export default class PeerToPeer {
  private _peer: Peer;
  private _outgoingConnection$: Subject<IConnectionInfo>;

  /**
   * Emitted when a connection to the PeerServer is established.
   * Providing the brokering ID of the peer.
   */
  public readonly peerOpen$: Observable<string>;

  /**
   * Emitted when a new data-connection is established.
   */
  public readonly connectionOpen$: Observable<IConnectionInfo>;

  public readonly connectionClose$: Observable<IConnectionInfo>;

  /**
   * Emitted when data is received.
   */
  public readonly receive$: Observable<IReceiveInfo>;

  /**
   * Array of all open connections.
   */
  public get connections() {
    return Object.keys(this._peer.connections)
      .map<Peer.DataConnection[]>((key) => this._peer.connections[key])
      .reduce((acc, cur) => {
        acc.push(...cur.filter((x) => x.open));
        return acc;
      }, []);
  }

  constructor(options?: IPeerToPeerOptions) {
    this._peer = new Peer(undefined, {
      host: options?.host,
      port: options?.port,
      path: options?.path,
    });
    this._outgoingConnection$ = new Subject<IConnectionInfo>();

    this.peerOpen$ = fromEvent<string>(<any>this._peer, "open").pipe(
      shareReplay(1)
    );
    const incomingConnection$ = fromEvent<Peer.DataConnection>(
      <any>this._peer,
      "connection"
    ).pipe(
      map<Peer.DataConnection, IConnectionInfo>((x) => ({
        connection: x,
        direction: ConnectDirection.Incoming,
      })),
      shareReplay(1)
    );
    this.connectionOpen$ = merge(
      incomingConnection$,
      this._outgoingConnection$
    ).pipe(
      switchMap((x) => fromEvent(<any>x.connection, "open").pipe(map(() => x)))
    );
    this.connectionClose$ = merge(
      incomingConnection$,
      this._outgoingConnection$      
    ).pipe(
      switchMap((x) => fromEvent(<any>x.connection, "close").pipe(map(() => x)))
    );

    this.receive$ = this.connectionOpen$.pipe(
      shareReplay(),
      mergeMap((x) => {
        return fromEvent<any>(<any>x.connection, "data").pipe(
          map<any, IReceiveInfo>((data) => ({ sender: x.connection, data }))
        );
      })
    );
  }

  public connect(remoteId: string) {
    this.peerOpen$
      .pipe(
        take(1),
        map<string, IConnectionInfo>(() => ({
          connection: this._peer.connect(remoteId, {
            serialization: "json",
            reliable: true,
          }),
          direction: ConnectDirection.Outgoing,
        }))
      )
      .subscribe((x) => {
        this._outgoingConnection$.next(x);
      });
  }

  public broadcast<T>(data: T): boolean {
    const connections = this.connections;
    if (0 < connections.length) {
      connections.forEach((x) => x.send(data));
      return true;
    } else {
      return false;
    }
  }
}

export interface IPeerToPeerOptions {
  host?: string;
  port?: number;
  path?: string;
}

export enum ConnectDirection {
  Incoming,
  Outgoing,
}

export interface IConnectionInfo {
  connection: Peer.DataConnection;
  direction: ConnectDirection;
}

export interface IReceiveInfo {
  sender: Peer.DataConnection;
  data: any;
}
