import {Injectable} from '@angular/core';
import {HttpConstants} from '../../commons/http-constants';
import {DeviceService} from '../device/device.service';
import {RateLimit} from '../../domain/rate-limit';
import {noop, Observable, Subject} from 'rxjs';
import {EventMessage} from '../../domain/event-message';
import {filter, map, take, tap} from 'rxjs/operators';
import {isDefined, isNullOrUndefined} from '../../commons/utils';
import {IMessage, StompHeaders} from '@stomp/stompjs';
import {Platform} from '@ionic/angular';
import {RateNotification} from '../../domain/rate-notification';
import {RxStompConfig, RxStompState} from '@stomp/rx-stomp';
import {Rate} from '../../domain/rate';
import {AuthorizationService} from '../../auth/authorization.service';
import {SharedDataService} from '../shared-data/shared-data.service';
import {AppConstants} from '../../commons/app-constants';
import {LoggerService} from '../logger/logger.service';
import {FirebaseService} from '../firebase/firebase.service';
import {HedgedOrder} from '../../domain/hedged-order';
import {ClientContextService} from '../client-context/client-context.service';
import {Dictionary} from '../../commons/dictionary';
import {TradePlanNotification} from '../../domain/trade-plan-notification';
import {ResourceLock} from '../../domain/resource-lock';
import {EntityResourceLock} from '../../domain/entity-resource-lock';
import {RollableHedgedOrder} from '../../domain/rollable-hedged-order';
import {RxStompService} from '../rx-stomp/rx-stomp.service';

@Injectable({
    providedIn: 'root'
})
export class WebSocketService {

    private static readonly CHANNEL_MESSAGE_ID_HEADER = 'X-Channel-Message-Id';
    private readonly watchMap: Dictionary<Observable<any>> = {};
    private rxStompActivatedSubject = new Subject<void>;

    private readonly rxStompConfig: RxStompConfig = {
        connectHeaders: {},
        heartbeatIncoming: 0,
        heartbeatOutgoing: 0,
        reconnectDelay: 2000,
        beforeConnect: () => this.updateToken(),
        debug: (msg: string): void => console.log(msg)
    };

    private ackOperation = tap((message: IMessage) => {
        const messageId = message.headers[WebSocketService.CHANNEL_MESSAGE_ID_HEADER];
        if (isDefined(messageId)) {
            const ackHeaders = {};
            ackHeaders[WebSocketService.CHANNEL_MESSAGE_ID_HEADER] = messageId;
            ackHeaders[HttpConstants.DEVICE_ID_HEADER_KEY] = this.deviceService.getDeviceId();
            message.ack(ackHeaders);
        }
    });

    constructor(private rxStompService: RxStompService,
                private authorizationService: AuthorizationService,
                private sharedDataService: SharedDataService,
                private deviceService: DeviceService,
                private platform: Platform,
                private loggerService: LoggerService,
                private clientContextService: ClientContextService) {
        this.init();
    }

    private static getHeaders(accessToken: string, deviceId: string): StompHeaders {
        const headers = {} as StompHeaders;
        if (isDefined(accessToken)) {
            headers[HttpConstants.AUTHORIZATION] = HttpConstants.BEARER + accessToken;
        }
        headers[HttpConstants.VERSION] = '2';
        headers[HttpConstants.DEVICE_ID_HEADER_KEY] = deviceId;
        return headers;
    }

    private init(): void {
        this.platform.pause.subscribe(() => this.disconnect());
        this.platform.resume.subscribe(() =>
            this.authorizationService.isNotExpiredToken()
                ? this.connect()
                : noop()
        );

        this.authorizationService.onAuthenticate().subscribe(authenticated =>
            authenticated ? this.connect() : this.disconnect()
        );
        this.authorizationService.onTokenUpdate().subscribe(() => {
            console.log('ws token update');
            this.updateToken();
        });

        this.rxStompService.stompErrors$
            .pipe(
                filter(error => error.command === 'ERROR'
                    && error.headers['message'].indexOf('invalid_token') > 0)
            )
            .subscribe(error => this.loggerService.info('WebSocket error ' + error));

        this.rxStompService.connectionState$.subscribe(state =>
            this.loggerService.info('WebSocket connection state changed to ' + RxStompState[state])
        );
    }

    tryConnect(): void {
        if (isDefined(this.sharedDataService.getSharedData(AppConstants.ACCESS_TOKEN))) {
            this.connect();
        }
    }

    private connect(): void {
        this.rxStompConfig.webSocketFactory = (): WebSocket => {
            try {
                return new WebSocket(FirebaseService.getWebSocketPath());
            } catch (e) {
                throw new Error(e);
            }
        };
        this.rxStompService.configure(this.rxStompConfig);
        if (this.rxStompService.connected()) {
            this.disconnect();
        }
        this.rxStompService.connectionState$
            .pipe(
                filter(state => state === RxStompState.CLOSED),
                take(1)
            )
            .subscribe(() => {
                this.rxStompService.activate();
                this.rxStompActivatedSubject.next();
            });
    }

    private updateToken(): void {
        const accessToken = this.sharedDataService.getSharedData<string>(AppConstants.ACCESS_TOKEN);
        const deviceId = this.deviceService.getDeviceId();
        const headers = WebSocketService.getHeaders(accessToken, deviceId);
        const config = {connectHeaders: headers};
        this.rxStompService.configure(config);
    }

    public disconnect(): void {
        this.rxStompService.connectionState$
            .pipe(
                take(1),
                filter(state => state === RxStompState.CONNECTING || state === RxStompState.OPEN)
            )
            .subscribe(() => this.rxStompService.deactivate().then());
    }

    getConnectionState(): Observable<boolean> {
        return this.rxStompService.connectionState$.pipe(
            map(state => state === RxStompState.OPEN)
        );
    }

    watchRates(tradingCurrency: string, contraCurrency: string): Observable<Rate> {
        const destination = `/topic/rates/${tradingCurrency}/${contraCurrency}`;
        if (isNullOrUndefined(this.watchMap[destination])) {
            this.watchMap[destination] = this.rxStompService.watch(destination).pipe(
                map(message => JSON.parse(message.body) as Rate)
            );
        }
        return this.watchMap[destination];
    }

    watchRateLimits(tradingCurrency: string, contraCurrency: string): Observable<EventMessage<RateLimit>> {
        const destination = `/user/queue/rate-limits/${tradingCurrency}/${contraCurrency}`;
        if (isNullOrUndefined(this.watchMap[destination])) {
            this.watchMap[destination] = this.rxStompService.watch(destination).pipe(
                this.ackOperation,
                map(message => JSON.parse(message.body) as EventMessage<RateLimit>)
            );
        }
        return this.watchMap[destination];
    }

    watchQuotes<T>(): Observable<EventMessage<T>> {
        const destination = `/user/queue/quotes/${this.clientContextService.getClientContextId()}`;
        if (isNullOrUndefined(this.watchMap[destination])) {
            this.watchMap[destination] = this.rxStompService.watch(destination).pipe(
                this.ackOperation,
                map(message => JSON.parse(message.body) as EventMessage<T>)
            );
        }
        return this.watchMap[destination];
    }

    watchRateNotifications(tradingCurrency: string,
                           contraCurrency: string): Observable<EventMessage<RateNotification>> {
        return this.watchAllRateNotifications()
            .pipe(filter(eventMessage => {
                const currencyPair = eventMessage.data.currencyPair;
                return currencyPair.tradingCurrency === tradingCurrency
                    && currencyPair.contraCurrency === contraCurrency;
            }));
    }

    watchAllRateNotifications(): Observable<EventMessage<RateNotification>> {
        const destination = `/user/queue/rate-notifications`;
        if (isNullOrUndefined(this.watchMap[destination])) {
            this.watchMap[destination] = this.rxStompService.watch(destination).pipe(
                this.ackOperation,
                map(message => JSON.parse(message.body) as EventMessage<RateNotification>)
            );
        }
        return this.watchMap[destination];
    }

    watchUpcomingSettlementNotifications(): Observable<EventMessage<HedgedOrder[]>> {
        const destination = `/user/queue/upcoming-settlement`;
        if (isNullOrUndefined(this.watchMap[destination])) {
            this.watchMap[destination] = this.rxStompService.watch(destination).pipe(
                this.ackOperation,
                map(message => JSON.parse(message.body) as EventMessage<HedgedOrder[]>)
            );
        }
        return this.watchMap[destination];
    }

    watchUpcomingTradePlanNotifications(): Observable<EventMessage<TradePlanNotification>> {
        const destination = `/user/queue/upcoming-trade-plan`;
        if (isNullOrUndefined(this.watchMap[destination])) {
            this.watchMap[destination] = this.rxStompService.watch(destination).pipe(
                this.ackOperation,
                map(message => JSON.parse(message.body) as EventMessage<TradePlanNotification>)
            );
        }
        return this.watchMap[destination];
    }

    watchEntityLock(): Observable<EventMessage<EntityResourceLock<RollableHedgedOrder>>> {
        const destination = `/queue/entity-lock`;
        if (isNullOrUndefined(this.watchMap[destination])) {
            this.watchMap[destination] = this.rxStompService.watch(destination).pipe(
                this.ackOperation,
                map(message => JSON.parse(message.body) as EventMessage<EntityResourceLock<RollableHedgedOrder>>)
            );
        }
        return this.watchMap[destination];
    }

    sendResourceLock(resourceLock: ResourceLock): void {
        const destination = '/app/resource/lock';
        this.rxStompService.publish({destination: destination, body: JSON.stringify(resourceLock)});
    }

    sendResourceUnlock(resourceLock: ResourceLock): void {
        const destination = '/app/resource/unlock';
        this.rxStompService.publish({destination: destination, body: JSON.stringify(resourceLock)});
    }

    public getActivatedSubject(): Subject<any> {
        return this.rxStompActivatedSubject;
    }
}
