import { timeout } from "@sportaq/common/utils/time-utils";
import { Flowable } from "rsocket-flowable";
import { ISubscriber } from "rsocket-types";
import { BaseSettingsService } from "@sportaq/services/base-setting-service/base-settings-service";
import {
    DisconnectCallback,
    Message,
    RSocketTransport
} from "@sportaq/services/streaming/streaming-network/transport/rsocket-transport";
import { StreamingNetworkRoutes } from "@sportaq/services/streaming/streaming-network/streaming-network-routes";
import { resolveLanguage } from "@sportaq/i18n/index";
import {
    StartApplicationRequestMsg
} from "@sportaq/services/streaming/streaming-network/messages/requests/system/StartApplicationRequest_pb";
import {
    StartApplicationResponseMsg
} from "@sportaq/services/streaming/streaming-network/messages/responses/system/StartApplicationResponse_pb";
import { STREAMING_NETWORK_PROTOCOL_VERSION } from "@sportaq/common/consts/streaming-network-protocol-version";

// Create interface, assignable from Flowable because Cypress doesn't want to work with Flowable (has written that tests not present)
export interface Flow<T> {
    subscribe (subscriberOrCallback?: Partial<ISubscriber<T>> | ((a: T) => void)): void;
}

export interface StreamingNetwork {
    connect (baseUrl: string, appCode: string): Promise<StreamingNetworkConnectResponse>;

    requestResponse<T> (route: string, message: Message | undefined, deserialize: (msg: Uint8Array) => T): Promise<T>;

    requestStream<T> (route: string, message: Message | undefined, deserialize: (msg: Uint8Array) => T): Promise<Flow<T>>;

    setDisconnectCallback (callback: DisconnectCallback): void;

    disconnect (): void;
}

export interface StreamingNetworkConnectResponse {
    serverName: string;
    serverVersion: string;
    protocolVersion: number;
    isSupportedClientProtocolVersion: boolean;
    language: string;
}

interface TokenInfo {
    token: string;
    expiration: Date;
}

interface StartApplicationResponse {
    tokenInfo: TokenInfo;
    connectResponse: StreamingNetworkConnectResponse;
}

class StreamingNetworkImpl implements StreamingNetwork {
    private readonly transport: RSocketTransport;
    private startApplicationResponse?: StartApplicationResponse;
    private waitForStartApplicationResponse: boolean = false;

    constructor (readonly settingsService: BaseSettingsService) {
        this.transport = new RSocketTransport();
    }

    setDisconnectCallback (callback: DisconnectCallback): void {
        this.transport.onDisconnect = e => {
            this.startApplicationResponse = undefined;
            this.waitForStartApplicationResponse = false;
            callback(e);
        };
    }

    async connect (): Promise<StreamingNetworkConnectResponse> {
        this.startApplicationResponse = undefined;
        await this.transport.connect(this.settingsService.getStreamServerAddress(resolveLanguage()), this.settingsService.useSSL);
        const startApplicationResponse = await this.getToken();
        return startApplicationResponse.connectResponse;
    }

    async requestResponse<T> (route: string, message: Message | undefined, deserialize: (msg: Uint8Array) => T): Promise<T> {
        const startApplicationResponse = await this.getToken();
        return this.transport.requestResponse(route, message, {
            appCode: this.settingsService.appCode,
            token: startApplicationResponse.tokenInfo.token
        }, deserialize);
    }

    async requestStream<T> (route: string, message: Message | undefined, deserialize: (msg: Uint8Array) => T): Promise<Flowable<T>> {
        const startApplicationResponse = await this.getToken();
        return this.transport.requestStream(route, message, {
            appCode: this.settingsService.appCode,
            token: startApplicationResponse.tokenInfo.token
        }, deserialize);
    }

    disconnect () {
        this.transport.disconnect();
    }

    private async getToken (): Promise<StartApplicationResponse> {
        while (this.waitForStartApplicationResponse) {
            await timeout(100);
        }
        if (this.startApplicationResponse) {
            if (this.startApplicationResponse.tokenInfo.expiration.getTime() > new Date().getTime()) {
                return this.startApplicationResponse;
            }
        }
        return this.refreshToken();
    }

    private async refreshToken (): Promise<StartApplicationResponse> {
        this.startApplicationResponse = undefined;
        this.waitForStartApplicationResponse = true;
        const appCode = this.settingsService.appCode;
        try {
            const requestMsg = new StartApplicationRequestMsg();
            requestMsg.setClientchannel(process.env.__CHANNEL__ as string);
            requestMsg.setClientversion(process.env.__VERSION__ as string);
            requestMsg.setProtocolversion(STREAMING_NETWORK_PROTOCOL_VERSION);
            const response = await this.transport.requestResponse<StartApplicationResponseMsg>(
                StreamingNetworkRoutes.START_APPLICATION,
                requestMsg,
                {
                    appCode,
                    token: "000-000"
                },
                msg => StartApplicationResponseMsg.deserializeBinary(msg)
            );
            const expiration = new Date();
            expiration.setSeconds(response.getExpiration());
            const tokenInfo: TokenInfo = {
                token: response.getToken(),
                expiration
            };
            const connectResponse: StreamingNetworkConnectResponse = {
                serverName: response.getServername(),
                serverVersion: response.getServerversion(),
                protocolVersion: response.getProtocolversion(),
                isSupportedClientProtocolVersion: response.getIssupportedclientprotocolversion(),
                language: response.getLanguage()
            };
            const startApplicationResponse = {
                tokenInfo,
                connectResponse
            };
            this.startApplicationResponse = startApplicationResponse;
            return startApplicationResponse;
        } finally {
            this.waitForStartApplicationResponse = false;
        }
    }
}

export function createStreamingNetwork (settingsService: BaseSettingsService): StreamingNetwork {
    return new StreamingNetworkImpl(settingsService);
}
