import { retry } from "@lifeomic/attempt";
import { getLogger } from "@metablock/core";
import AsyncLock from "async-lock";
import { action, makeObservable, observable } from "mobx";
import { SecurityKey, messageDecoders } from "./protos";

interface ChannelDefinition {
  channel: string;
  name?: string;
  exchange?: string;
}

export class ChannelId implements ChannelDefinition {
  channel: string;
  name?: string;
  exchange?: string;

  constructor(channel: ChannelDefinition) {
    this.channel = channel.channel;
    this.name = channel.name;
    this.exchange = channel.exchange;
  }

  // Create a channel for an order book top subscription
  static orderBookTop(securityKey: SecurityKey) {
    return new ChannelId({
      channel: "order_book_top",
      name: securityKey.name,
      exchange: securityKey.exchange,
    });
  }

  // Create a channel for a snapshot subscription
  static snapshot() {
    return new ChannelId({ channel: "snapshot" });
  }

  get key(): string {
    return `${this.channel}:${this.name || ""}:${this.exchange || ""}`;
  }

  payload(): Record<string, string> {
    const p: Record<string, string> = { channel: this.channel };
    if (this.name) p.name = this.name;
    if (this.exchange) p.exchange = this.exchange;
    return p;
  }
}

// A message received from the websocket
class WsMessage {
  id: ChannelId;
  message: any;
  timeStamp = 0;
  private _decoded?: any;

  constructor(event: MessageEvent) {
    const { channel, message } = JSON.parse(event.data);
    this.timeStamp = event.timeStamp;
    this.id = new ChannelId(channel);
    this.message = message;
    this._decoded = undefined;
  }

  get key(): string {
    return this.id.key;
  }

  get decoded(): any {
    if (this._decoded) return this._decoded;
    const decoder = messageDecoders[this.id.channel];
    this._decoded = decoder ? decoder(this.message) : this.message;
    return this._decoded;
  }
}

export type WebsocketHandler = (msg: any) => void;
const logger = getLogger({ name: "fluidily.stream" });

class Channel {
  id: ChannelId;
  handlers: Set<WebsocketHandler>;
  lastUpdate = 0;

  constructor(id: ChannelId) {
    this.id = id;
    this.lastUpdate = 0;
    this.handlers = new Set<WebsocketHandler>();
  }

  dispatch(msg: WsMessage) {
    this.lastUpdate = msg.timeStamp;
    for (const handler of this.handlers) {
      try {
        handler(msg.decoded);
      } catch (e) {
        logger.error("could not deliver", msg.id.key, e);
      }
    }
  }

  dispatchAll(msg: any) {
    for (const handler of this.handlers) {
      try {
        handler(msg);
      } catch (e) {
        logger.error("could not deliver", msg, e);
      }
    }
  }

  async subscribe(store: StreamingStore, handler: WebsocketHandler) {
    this.handlers.add(handler);
    await store.sendJson({
      id: store.newId(),
      method: "subscribe",
      channel: this.id.payload(),
    });
  }

  async reSubscribe(store: StreamingStore) {
    await store.sendJson({
      id: store.newId(),
      method: "subscribe",
      channel: this.id.payload(),
    });
  }

  async unsubscribe(store: StreamingStore, handler: WebsocketHandler) {
    this.handlers.delete(handler);
    if (this.handlers.size === 0) {
      await store.sendJson({
        id: store.newId(),
        method: "unsubscribe",
        channel: this.id.payload(),
      });
    }
    if (this.handlers.size === 0) store.channels.delete(this.id.key);
  }
}

class StreamingStore {
  url: string;
  channels: Map<string, Channel>;
  messageId = 0;
  streaming = false;
  private _socket?: WebSocket = undefined;
  private _lock: AsyncLock;
  private _socketError: any = undefined;
  private _connectionError: any = undefined;
  private _requests: Map<string, any>;

  constructor(url: string) {
    this.url = url;
    this.channels = new Map<string, Channel>();
    this._lock = new AsyncLock();
    this._requests = new Map<string, any>();
    makeObservable(this, {
      streaming: observable,
      subscribe: action,
      unsubscribe: action,
    });
  }

  get error(): boolean {
    return this._socketError !== undefined;
  }

  async subscribe(handler: WebsocketHandler, channelId: ChannelId) {
    const channel = this.getChannel(channelId);
    await channel.subscribe(this, handler);
  }

  async unsubscribe(handler: WebsocketHandler, channelId: ChannelId) {
    const channel = this.channels.get(channelId.key);
    if (channel) await channel.unsubscribe(this, handler);
  }

  getChannel(channelId: ChannelId): Channel {
    let channel = this.channels.get(channelId.key);
    if (!channel) {
      channel = new Channel(channelId);
      this.channels.set(channelId.key, channel);
    }
    return channel;
  }

  async rpc(method: string, payload: any) {
    const id = this.newId();
    await this.sendJson({
      id,
      method,
      payload,
    });
  }

  async sendJson(message: any) {
    await this.send(JSON.stringify(message));
  }

  async send(msg: string) {
    const socket = await this.getSocket();
    socket.send(msg);
  }

  async getSocket(): Promise<WebSocket> {
    return await this._lock.acquire(
      "websocket",
      async () =>
        await retry(
          async () => {
            if (this._socket) return this._socket;
            try {
              return await this.connect();
            } catch (e) {
              this._connectionError = e;
              this.dispatchAll({ error: "Websocket cannot connect" });
              logger.error("Cannot connect to streaming endpoint", e);
              throw e;
            }
          },
          { delay: 200, factor: 2, maxAttempts: 0 },
        ),
    );
  }

  // Resubscribe to all channels
  async reSubscribe() {
    return await Promise.all(
      Array.from(this.channels.values()).map((channel: Channel) =>
        channel.reSubscribe(this),
      ),
    );
  }

  newId(): string {
    this.messageId += 1;
    return `fluidily-${this.messageId}`;
  }

  // Private methods

  private connect(): Promise<WebSocket> {
    return new Promise((resolve, reject) => {
      const socket = new WebSocket(this.url);
      socket.onopen = (event) => {
        this._socket = socket;
        resolve(socket);
      };
      socket.onerror = (event) => {
        this._socketError = event;
      };
      socket.onclose = (event) => {
        if (this._socket) {
          logger.warn("lost websocket connection, reconnecting");
          this.dispatchAll({ error: "Websocket lost connection" });
          this.reconnect();
        } else reject(event);
      };
      socket.onmessage = (event: MessageEvent) => {
        const msg = new WsMessage(event);
        const channel = this.channels.get(msg.key);
        if (channel && msg.message) channel.dispatch(msg);
      };
    });
  }

  private dispatchAll(msg: any) {
    this.channels.forEach((channel: Channel) => {
      channel.dispatchAll(msg);
    });
  }

  private async reconnect(): Promise<WebSocket> {
    this._socket = undefined;
    const sock = await this.getSocket();
    logger.info("new websocket connection", sock);
    await this.reSubscribe();
    return sock;
  }
}

export default StreamingStore;
