import { mergeWith, Observable, Observer, ReplaySubject, Subject, Subscription } from "rxjs";
import { takeUntil } from "rxjs/operators";
import { Util } from "../../util/util";
import { HandlerHolder } from "./handler-holder";
import { Message } from "./message/message";

export class MessageConsumer<T> {

	private subject : Subject<Message<T>> = new Subject<Message<T>>();
	public readonly uuId = Util.generateUUID();
	private destroyed = new ReplaySubject<boolean>(1);
  readonly observable: Observable<Message<any>>;

	constructor(
		private handlerHolder: HandlerHolder,
		public readonly isLocal: boolean | undefined = undefined,

	) {
		this.observable = this.handlerHolder.asObservable.pipe(
			mergeWith(this.subject),
			takeUntil(this.destroyed)
		)
	}

	subscribe(observerOrNext?: Partial<Observer<Message<T>>> | ((value: Message<T>) => void)): Subscription {
    if(this.isLocal) {
      return this.observable
        // .pipe(
        // filter( (message) =>
        //   this.isLocal === undefined || message.isLocal === this.isLocal
        // ))
        .subscribe(observerOrNext);
    }
		return this.observable.subscribe(observerOrNext);
	}

	unregister() {
		this.destroyed.next(true);
		this.handlerHolder.unregister(this.uuId);
	}

  /**
   * Internal method used by the
   * @param message
   */
  send(message: Message<any>): void {
    this.subject.next(message);
  }
}
