import {
  HttpTransportType,
  HubConnectionBuilder,
  LogLevel,
} from '@aspnet/signalr'
import { MessagePackHubProtocol } from '@microsoft/signalr-protocol-msgpack'
import { BondSignalrConfig } from '../../configs/Signalr'
import authProvider from '../AuthenticationProvider'
import EventBus from './EventBus'
import { formatMessageData } from './helper'

class SignalrMessageHub extends EventBus {
  messageChannelToHubs = {} // map hubs to a label
  remoteChannelToHubs = {} // rpc
  queuedRemoteCalls = [] // if a rpc posted while connection down, push it to a queue
  runQueueThread = null

  constructor() {
    super()
    this.init()
  }

  // iterate all signalr config and create connections
  init() {
    BondSignalrConfig.Hubs.forEach((hub) => {
      const connection = new HubConnectionBuilder()
        .withUrl(hub.ServerUrl, {
          transport: HttpTransportType.WebSockets, //HttpTransportType.LongPolling, // | HttpTransportType.WebSockets // websocket will throw error, can not resolve this yet
          accessTokenFactory: () =>
            authProvider.getUser().then((user) => user.access_token),
          skipNegotiation: true,
        })
        .withHubProtocol(new MessagePackHubProtocol())
        .configureLogging(LogLevel.Error)
        .build()

      //map hub connection to a label
      this.messageChannelToHubs[hub.Prefix] = connection

      connection.on('ReceivedMessage', (message) => {
        this.publish(
          message.Channel,
          formatMessageData(message.Channel, message.Data),
        )
      })

      //do reconnect
      connection.onclose((e) =>
        setTimeout(() => this.connect(connection), 5000),
      )

      this.connect(connection)

      //register rpc, map a rpc to a hub
      hub.RegisteredRemoteChannels.forEach((channel, index) => {
        if (this.remoteChannelToHubs[channel]) {
          throw new Error('Remote channel is duplicated: ' + channel)
        }
        this.remoteChannelToHubs[channel] = connection
      })
    })
  }

  async connect(hub) {
    let self = this
    await hub
      .start()
      .then(() => self.runQueue())
      .catch((err) => {
        console.log(err)

        //try re-connecting after 5s
        setTimeout(() => {
          console.log('reconnect')
          this.connect(hub)
        }, 5000)
      })
  }

  //push an action to the queue
  //it is posible for a race condition
  ensure(f) {
    this.queuedRemoteCalls.push(f)
  }

  runQueue() {
    var fails = []
    while (this.queuedRemoteCalls && this.queuedRemoteCalls.length > 0) {
      var f = this.queuedRemoteCalls.shift()
      if (!f()) {
        fails.push(f)
      }
    }
    for (let f of fails) {
      this.queuedRemoteCalls.push(f)
    }
    if (this.runQueueThread != null) {
      clearTimeout(this.runQueueThread)
    }
    this.runQueueThread = setTimeout(() => {
      this.runQueue()
    }, 1000)
  }

  invoke(channel, message, fail) {
    this.remoteChannelToHubs[channel]
      .invoke(channel, message)
      .catch((error) => fail(error))
  }

  subscribe(channel, action) {
    if (!this.channels[channel]) {
      this.channels[channel] = []

      //register group on signalr server
      const hubPrefix = channel.split('.')[0]
      if (this.messageChannelToHubs[hubPrefix]) {
        if (
          this.messageChannelToHubs[hubPrefix].connection.connectionState === 1
        ) {
          this.messageChannelToHubs[hubPrefix].invoke('JoinGroup', channel)
        } else {
          let self = this
          this.ensure(() => {
            if (
              self.messageChannelToHubs[hubPrefix] &&
              self.messageChannelToHubs[hubPrefix].connection
                .connectionState === 1
            ) {
              self.messageChannelToHubs[hubPrefix].invoke('JoinGroup', channel)
              return true
            }
            return false
          })
        }
      }
    }
    this.channels[channel].push(action)
  }

  unsubscribe(channel, action) {
    if (this.channels[channel]) {
      this.channels[channel] = this.channels[channel].filter(
        (a) => a !== action,
      )

      if (this.channels[channel].length === 0) {
        delete this.channels[channel]
      }

      //unregister group on signalr server
      const hubPrefix = channel.split('.')[0]
      if (!this.channels[channel] && this.messageChannelToHubs[hubPrefix]) {
        if (
          this.messageChannelToHubs[hubPrefix].connection.connectionState === 1
        ) {
          this.messageChannelToHubs[hubPrefix].invoke('LeaveGroup', channel)
        } else {
          let self = this
          this.ensure(() => {
            if (
              this.messageChannelToHubs[hubPrefix] &&
              this.messageChannelToHubs[hubPrefix].connection
                .connectionState === 1
            ) {
              self.messageChannelToHubs[hubPrefix].invoke('LeaveGroup', channel)
              return true
            }
            return false
          })
        }
      }
    }
  }
}

const MessageHub = new SignalrMessageHub()
export default MessageHub
