import { Observable, Observer, ReplaySubject, Subscription, throwError, timeout } from "rxjs";
import { finalize, first } from "rxjs/operators";
import { Action } from "./action";
import { DeliveryOptions } from "./delivery-options";
import { EventBus } from "./event-bus";
import { MessageConsumer } from "./message-consumer";
import { ErrorMessage } from "./message/error-message";
import { DEFAULT_MESSAGE_TIMEOUT, Message } from "./message/message";

/**
 * The handlerHolder class maintains different consumers for a single address.
 * Every handler is stored with its own UUID that is returned upon registration.
 *
 * @author R.v.Raaphorst
 */
export class RequestReplyHolder {

	private subject = new ReplaySubject<any>();
	private readonly replyTimeout: number;
	public readonly address: string;

	constructor(private eventBus: EventBus, private message: Message<any>, options?: DeliveryOptions) {
		this.replyTimeout = options?.replyTimeout ? options.replyTimeout : DEFAULT_MESSAGE_TIMEOUT;

		const address = message.getReplyAddress();
		if (!address) throw Error("Cannot reply to message without replyAddress");
		this.address = address;
	}

	subscribe<T>(observerOrNext?: Partial<Observer<Message<Action<T>>>> | ((value: Message<Action<T>>) => void)): Subscription {
		return this.asObservable.subscribe(observerOrNext);
	}

	public get asObservable(): Observable<Message<any>> {
		return this.subject.pipe(timeout({
			each: this.replyTimeout,
			with: (timeoutInfo) =>
				throwError(() =>
					new ErrorMessage({
						message: `Event bus timeout: no response received within ${this.replyTimeout / 1000} seconds.` + ` message address: ${this.message.address}`,
            type: this.message.type,
						address: this.message.address,
            failureCode: 500,
            failureType: "TIMEOUT",
            name: "TIMEOUT ERROR"
			}))
		}), first(), finalize(() => {
			this.eventBus.unregisterHolder(this);
		})) as Observable<Message<any>>;
	}

	register(): MessageConsumer<any> {
		throw Error("Unsupported feature 'register' on RequestReplyHolder");
	}

	error(error: Error | ErrorMessage | Message<any>): void {
		this.subject.error(error);
	}

	send(message: Message<any>): void {
		if (message) this.subject.next(message);
	}
}

