import {
    APPLICATION_OCTET_STREAM,
    BufferEncoders,
    Encodable,
    encodeCompositeMetadata,
    encodeRoute,
    MESSAGE_RSOCKET_COMPOSITE_METADATA,
    MESSAGE_RSOCKET_ROUTING,
    RSocketClient
} from "rsocket-core";
import { ReactiveSocket } from "rsocket-types";
import RSocketWebSocketClient from "rsocket-websocket-client";
import { Flowable } from "rsocket-flowable";
import {
    NetworkContextMsg
} from "@sportaq/services/streaming/streaming-network/messages/requests/context/NetworkContext_pb";
import { EmptyMsg } from "@sportaq/services/streaming/streaming-network/messages/common/single/Empty_pb";
import { appLogger } from "@sportaq/common/utils/logger";

const contextMimeType = "application/x-protobuf";

const KEEP_ALIVE = 2000;
const LIFETIME = 60000;

export type DisconnectCallback = (e: Error) => void;

export interface Message {
    serializeBinary (): Uint8Array;
}

export interface ContextMetadata {
    appCode: string;
    token: string;
}

export class RSocketTransport {
    private client?: RSocketClient<Encodable, Encodable>;
    private socket?: ReactiveSocket<Encodable, Encodable>;
    private connected = false;
    public onDisconnect?: DisconnectCallback;

    public async connect (baseUrl: string, useSsl: boolean): Promise<void> {
        this.disconnect();
        this.client = RSocketTransport.buildClient(baseUrl, useSsl);
        this.socket = await this.socketConnect(this.client);
        this.connected = true;
        this.socket.connectionStatus().subscribe(socketStatus => {
            if (socketStatus.kind !== "ERROR") {
                appLogger.logger.info(`Stream server connection status has been changed to ${socketStatus.kind}`);
            } else {
                appLogger.logger.error(`Stream server connection status has been changed to ${socketStatus.kind}`, socketStatus.error);
            }
            if (socketStatus.kind === "ERROR") {
                if (this.connected) {
                    this.disconnect();
                    appLogger.logger.warn("Client has been disconnected", socketStatus.error);
                    if (this.onDisconnect) {
                        this.onDisconnect(socketStatus.error);
                    }
                }
            }
        });
    }

    public disconnect () {
        this.connected = false;
        if (this.socket) {
            this.socket.close();
        }

        this.socket = undefined;
        if (this.client) {
            this.client.close();
        }
        this.client = undefined;
    }

    public async requestResponse<T> (route: string, message: Message | undefined, context: ContextMetadata, deserialize: (msg: Uint8Array) => T): Promise<T> {
        if (this.socket && this.connected) {
            const socket = this.socket;
            const metadata = RSocketTransport.serializeMetadata(context, route);
            const data = RSocketTransport.serializeData(message);
            // appLogger.logger.debug(`Send ${data.byteLength} bytes to ${route}:`, message);
            return new Promise<T>((resolve, reject) => {
                socket.requestResponse({
                    data,
                    metadata
                }).subscribe({
                    onComplete: value => {
                        const array = new Uint8Array(value.data as Buffer);
                        const msg = deserialize(array);
                        // appLogger.logger.debug(`Receive ${array.byteLength} bytes from ${route}:`, msg);
                        resolve(msg);
                    },
                    onError: error => reject(error)
                });
            });
        } else {
            return Promise.reject(new Error("Transport doesn't connected"));
        }
    }

    public requestStream<T> (route: string, message: Message | undefined, context: ContextMetadata, deserialize: (msg: Uint8Array) => T): Flowable<T> {
        if (this.socket && this.connected) {
            const metadata = RSocketTransport.serializeMetadata(context, route);
            const data = RSocketTransport.serializeData(message);
            // appLogger.logger.debug(`Send ${data.byteLength} bytes to ${route}:`, message);
            return this.socket.requestStream({
                data,
                metadata
            }).map(value => {
                const array = new Uint8Array(value.data as Buffer);
                // noinspection UnnecessaryLocalVariableJS
                const msg = deserialize(array);
                // appLogger.logger.debug(`Receive ${array.byteLength} bytes from ${route}:`, msg);
                return msg;
            });
        } else {
            return Flowable.error(new Error("Transport doesn't connected"));
        }
    }

    private static buildClient (baseUrl: string, useSsl: boolean): RSocketClient<Encodable, Encodable> {
        const connectionUrl = (useSsl ? "wss" : "ws") + "://" + baseUrl;
        appLogger.logger.info(`Making stream server connection to ${connectionUrl}  ...`);
        return new RSocketClient({
            setup: {
                // ms btw sending keepalive to server
                keepAlive: KEEP_ALIVE,
                // ms timeout if no keepalive response
                lifetime: LIFETIME,
                // format of `data`
                dataMimeType: APPLICATION_OCTET_STREAM.toString(),
                // format of `metadata`
                metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.toString()
            },
            transport: new RSocketWebSocketClient({
                url: connectionUrl,
                debug: process.env.NODE_ENV !== "production"
            }, BufferEncoders)
        });
    }

    private async socketConnect (client: RSocketClient<Encodable, Encodable>): Promise<ReactiveSocket<Encodable, Encodable>> {
        return new Promise<ReactiveSocket<Encodable, Encodable>>((resolve, reject) => {
            client.connect().subscribe({
                onComplete: socket => resolve(socket),
                onError: error => reject(error)
            });
        });
    }

    private static serializeMetadata (context: ContextMetadata, route: string): Buffer {
        const meta = new NetworkContextMsg();
        meta.setAppcode(context.appCode);
        meta.setToken(context.token);
        const msg = meta.serializeBinary();
        return Buffer.from(encodeCompositeMetadata([
            [contextMimeType, Buffer.from(msg)],
            [MESSAGE_RSOCKET_ROUTING, encodeRoute(route)]
        ]));
    }

    private static serializeData (message: Message | undefined): Buffer {
        return message ? Buffer.from(message.serializeBinary()) : Buffer.from((new EmptyMsg()).serializeBinary());
    }
}
