import { Injectable, inject } from '@angular/core';
import { UserState } from '@core/services/user-state/user.state';
import { environment } from '@environment';
import {
	AppSyncEvent,
	AppSyncEventHeader,
	AppSyncEventType,
	AppSyncMessageEnum,
	AppSyncMessageTypes
} from '@interfaces/app-sync.interface';
import { EMPTY, Observable, ReplaySubject, Subject, distinctUntilChanged, filter, map, merge, share, switchMap, take } from 'rxjs';
import { WebSocketSubject, webSocket } from 'rxjs/webSocket';
import { v4 as uuidv4 } from 'uuid';

// TODO: handle connection timeout
// TODO: implement error handling
@Injectable({
	providedIn: 'root'
})
export class AppSyncEventService {
	private userState = inject(UserState);

	private readonly PROTOCOL = 'aws-appsync-event-ws';
	private readonly appSyncHost = environment.appSyncEvent.host;
	private readonly appSyncRealtimeEndpoint = environment.appSyncEvent.realtimeEndpoint;

	private readonly messages$ = new ReplaySubject<AppSyncMessageTypes>(10);
	private readonly events$ = new Subject<AppSyncEvent>();
	private connectionSubject?: WebSocketSubject<AppSyncEvent>;
	private subscriptions = new Map<string, string>();

	private connectionTime?: number;
	private connectionTimeout?: number;

	public connect() {
		const token$ = this.userState.getJwtToken();

		const connection$ = token$.pipe(
			distinctUntilChanged(),
			switchMap((token?: string | null) => {
				this.disconnect();

				if (!token) return EMPTY;

				const header = this.createHeader(token);
				const encodedHeader = this.encodeHeader(header);

				const connectionSubject = webSocket<AppSyncEvent>({
					url: this.appSyncRealtimeEndpoint,
					protocol: [this.PROTOCOL, encodedHeader],
					openObserver: {
						next: () => {
							console.log('Connection created.');

							connectionSubject.next({ type: AppSyncEventType.CONNECTION_INIT });
						}
					},
					closeObserver: {
						next: () => {
							console.log('Connection closed.');
						}
					}
				});

				this.connectionSubject = connectionSubject;

				return connectionSubject;
			})
		);

		const shared$ = connection$.pipe(share());

		shared$.subscribe({
			next: event => this.events$.next(event),
			error: err => console.error(err),
			complete: () => console.info('Connection completed.')
		});

		this.onConnection().subscribe(({ connectionTimeoutMs }) => {
			this.connectionTime = Date.now();
			this.connectionTimeout = connectionTimeoutMs;
		});

		this.onData().subscribe(({ event }) => {
			this.messages$.next(JSON.parse(event! as string));
		});
	}

	public disconnect() {
		if (!this.connectionSubject) return;

		this.connectionSubject.complete();

		delete this.connectionSubject;

		this.subscriptions = new Map<string, string>();
	}

	public subscribeToChannel(channel: string) {
		const id = uuidv4();

		const token$ = this.userState.getJwtToken().pipe(take(1));

		const subscribe$ = token$.pipe(
			switchMap(token => {
				if (!this.connectionSubject || !token) return EMPTY;

				const header = this.createHeader(token);

				this.connectionSubject.next({
					type: AppSyncEventType.SUBSCRIBE,
					channel,
					id: id,
					authorization: header
				});

				return merge(this.onSubscribeSuccess(), this.onSubscribeError()).pipe(
					filter(({ id: eventId }) => eventId === id),
					take(1)
				);
			})
		);

		const shared$ = subscribe$.pipe(share());

		shared$.subscribe(({ type }) => {
			if (type !== AppSyncEventType.SUBSCRIBE_SUCCESS) return;

			this.subscriptions.set(id, channel);
		});

		return shared$;
	}

	public unsubscribeFromChannel(idOrChannel: string): Observable<AppSyncEvent> {
		if (!this.connectionSubject) return EMPTY;

		const id = this.getChannelId(idOrChannel);

		if (!id) return EMPTY;

		this.connectionSubject.next({
			type: AppSyncEventType.UNSUBSCRIBE,
			id: id
		});

		const unsubscribe$ = merge(this.onUnSubscribeSuccess(), this.onUnSubscribeError()).pipe(
			filter(({ id: eventId }) => eventId === id),
			take(1),
			share()
		);

		unsubscribe$.subscribe(({ type }) => {
			if (type !== AppSyncEventType.UNSUBSCRIBE_SUCCESS) return;

			this.subscriptions.delete(id);
		});

		return unsubscribe$;
	}

	public publish(channel: string, events: unknown | unknown[]) {
		const id = uuidv4();

		const token$ = this.userState.getJwtToken().pipe(take(1));

		const publish$ = token$.pipe(
			switchMap(token => {
				if (!this.connectionSubject || !token) return EMPTY;

				const header = this.createHeader(token);

				this.connectionSubject.next({
					type: AppSyncEventType.PUBLISH,
					id,
					channel,
					event: (Array.isArray(events) ? events : [events]).map(event => JSON.stringify(event)),
					authorization: header
				});

				return merge(this.onPublishSuccess(), this.onPublishError()).pipe(
					filter(({ id: eventId }) => eventId === id),
					take(1)
				);
			})
		);

		const shared$ = publish$.pipe(share());

		shared$.subscribe();

		return shared$;
	}

	public onConnection() {
		return this.events$.pipe(filter(({ type }) => type === AppSyncEventType.CONNECTION_ACK));
	}

	public watchForMessage<T extends AppSyncMessageEnum>(message: T): Observable<AppSyncMessageTypes[T]> {
		return this.messages$.pipe(
			filter(msg => message in msg),
			map(msg => msg[message])
		);
	}

	private createHeader(token: string): AppSyncEventHeader {
		return {
			host: this.appSyncHost,
			Authorization: token
		};
	}

	private encodeHeader(header: AppSyncEventHeader): string {
		const encodedHeader = btoa(JSON.stringify(header)).replace(/\+/g, '-').replace(/\//g, '_').replace(/=+$/, '');

		return `header-${encodedHeader}`;
	}

	private getChannelId(idOrChannel: string) {
		return [...this.subscriptions!].find(([key, value]) => idOrChannel === value || idOrChannel === key)?.[0];
	}

	private onData() {
		return this.events$.pipe(filter(({ type }) => type === AppSyncEventType.DATA));
	}

	// AWS AppSync sends "ka" messages periodically. The client keeps track of the time that it received
	// each "ka" message. If the client doesn't receive a "ka" message within connectionTimeoutMs
	// milliseconds, the client should close the connection.
	private onKeepAlive() {
		return this.events$.pipe(filter(({ type }) => type === AppSyncEventType.KA));
	}

	private onPublishSuccess() {
		return this.events$.pipe(filter(({ type }) => type === AppSyncEventType.PUBLISH_SUCCESS));
	}

	private onPublishError() {
		return this.events$.pipe(filter(({ type }) => type === AppSyncEventType.PUBLISH_ERROR));
	}

	private onSubscribeSuccess() {
		return this.events$.pipe(filter(({ type }) => type === AppSyncEventType.SUBSCRIBE_SUCCESS));
	}

	private onSubscribeError() {
		return this.events$.pipe(filter(({ type }) => type === AppSyncEventType.SUBSCRIBE_ERROR));
	}

	private onUnSubscribeSuccess() {
		return this.events$.pipe(filter(({ type }) => type === AppSyncEventType.UNSUBSCRIBE_SUCCESS));
	}

	private onUnSubscribeError() {
		return this.events$.pipe(filter(({ type }) => type === AppSyncEventType.UNSUBSCRIBE_ERROR));
	}
}
