import { Inject, inject, Injectable, InjectionToken } from "@angular/core";
import { Observable } from "rxjs";
import { map } from "rxjs/operators";
import { Action } from "../../event-bus/action";
import { CrudAction } from "../../event-bus/crud-action";
import { DeliveryOptions } from "../../event-bus/delivery-options";
import { Message, MessageType } from "../../event-bus/message/message";
import { Socket, SocketStates } from "../../socket/socket";
import { SocketConnector } from "../../socket/socket-connector";
import { AbstractEventBusBridge, EventBusBridgeOptions } from "../abstract-event-bus-bridge";

export const SOCKET_EVENT_BUS_BRIDGE_OPTIONS = new InjectionToken<EventBusBridgeOptions>(
  "SocketEventBusBridgeOptions");

export const SOCKET_EVENT_BUS_BRIDGE_OPTIONS_PROVIDER = {
  provide: SOCKET_EVENT_BUS_BRIDGE_OPTIONS,
  useValue: {
    allowedInbound: [],
    allowedOutbound: []
  }
};

class OutboundConsumer {
}

@Injectable({
  providedIn: "root"
})
export class SocketEventBusBridge extends AbstractEventBusBridge {

  private socket = inject(Socket);
  private socketConnector = inject(SocketConnector);
  private queue: any[] = [];

  constructor(@Inject(SOCKET_EVENT_BUS_BRIDGE_OPTIONS) options: EventBusBridgeOptions) {
    super();
    this.initAllowedInbound(options.allowedInbound);
    this.initAllowedOutbound(options.allowedOutbound);
    this.listenToInbound();
  }

  /**
   * Listen to inbound messages from the socket
   * The socketMessage received is the raw object data that is transformed here
   * to a message with an action.
   *
   * For any failed message reply, return a normal message with the error data.
   * Don't throw an error for failed messages, because those unsubscribe the rxjs pipelines.
   */
  override inboundMessageObserver(): Observable<Message<Action<any>> | Observable<never>> {
    return this.socket.messages.pipe(
      map(socketMessage => {
        if (socketMessage.failureCode != undefined) {

          return this.eventBus.createErrorMessage(
            socketMessage.address,
            socketMessage.type,
            new Action("err", {
              failureCode: socketMessage.failureCode,
              failureType: socketMessage.failureType,
              message: socketMessage.message
            }),
            socketMessage.options
          )
        }

        const socketBody = socketMessage.body as any;
        let action: Action<any>;
        if (socketBody.id !== undefined && socketBody.rev !== undefined) {
          action = new CrudAction(socketBody.type, socketBody.id, socketBody.rev, socketBody.data);
        } else {
          action = new Action(socketBody.type, socketBody.data);
        }
        const options = {
          isLocal: false
        } as DeliveryOptions;

        if (socketMessage.replyTimeout) {
          options.replyTimeout = socketMessage.replyTimeout;
        }
        if (socketMessage.headers) {
          options.headers = socketMessage.headers;
        }

        const message = this.eventBus.createMessage(socketMessage.address, MessageType.SEND, action, options);

        if (socketMessage.replyAddress) {
          message.setReplyAddress(socketMessage.replyAddress);
        }

        return message;
      })
    );
  }

  private timer: any | undefined = undefined;

  send(message: Message<any>): void {
    if (this.socket.status === SocketStates.OPEN) {
      this.socket.send(message);
      return;
    }

    this.queue.push(message);
    if (this.timer == null) {
      this.timer = setInterval(() => {
        if (this.socket.status == SocketStates.OPEN) {
          clearInterval(this.timer);
          this.timer = undefined;

          while (this.queue.length > 0) {
            const message = this.queue.pop();
            this.socket.send(message);
          }
        }
      }, 1000);
    }

    if(this.socket.status === SocketStates.CLOSED) {
      this.socketConnector.setReconnectOnClose(true);
      this.socketConnector.reconnect();
    }
  }
}
