import {QueryService} from 'services/QueryService';
import {ArcQL} from 'metadata/query/ArcQL';
import {ArcQLResponse} from 'metadata/query/ArcQLResponse';
import {ServiceProvider} from "services/ServiceProvider";
import {RestService} from "services/RestService";
import {Either, Left} from "common/Either";
import {ApiResponse, ErrorResponse} from "services/ApiResponse";
import {ExpiryCache} from "common/ExpiryCache";
import {SingleSource} from "metadata/query/ArcQLSource";
import {ArcQLFilters} from 'metadata/query/ArcQLFilters';
import {RejectedResponse} from "services/RejectedResponse";
import {Optional} from 'common/Optional';
import md5 from 'md5';
import {ArcDataset} from "metadata/dataset/ArcDataset";
import DistinctValuesArcQLRequest from "services/DistinctValuesArcQLRequest";
import RangeValuesArcQLRequest from "services/RangeValuesArcQLRequest";
import {DatasetV2Service} from "services/DatasetV2Service";

/**
 * Service for making ArcQL queries.
 *
 * @author zuyezheng
 */
export class ApiQueryService implements QueryService {

    private readonly maxConcurrent: number;

    private readonly queryCache: ExpiryCache<
        string,
        Promise<Either<ErrorResponse, ArcQLResponse>>
    >;

    private pendingQueries: number;
    private readonly queryQueue: ((v: boolean) => void)[];

    constructor(
        cacheSize: number = 50,
        // invalidate query cache at 10 minutes by default
        defaultCacheTimeout: number = 10 * 60 * 1000,
        // max concurrent
        maxConcurrent: number = 4
    ) {
        this.queryCache = new ExpiryCache(cacheSize, defaultCacheTimeout);

        this.maxConcurrent = maxConcurrent;
        this.pendingQueries = 0;
        this.queryQueue = [];
    }

    async query(
        arcql: ArcQL,
        signal?: AbortSignal,
        // use an explicit timeout for this query
        cacheTimeout?: number
    ): Promise<Either<ErrorResponse, ArcQLResponse>> {
        // fetch the dataset which should already be cached
        const dataset = await ServiceProvider.get(DatasetV2Service).describeDataset(
            (arcql.source as SingleSource).fqn, signal, arcql.fullyQualifiedName
        );

        return this.queryCache.getOr(
            arcql.hash(),
            () => this.queryWithQueue(() => ServiceProvider.get(RestService).post(
                "/api/v1/arcql", arcql.toQueryPartsJSON(), signal
            ))
                .then(r => ApiResponse.custom(
                    r,
                    // query success
                    json => ArcQLResponse.fromJson(json, dataset.rightOrThrow()),
                    // query error
                    (json: any, r: Response) => {
                        // don't cache query errors since they might be transient
                        this.queryCache.delete(arcql.hash());
                        return ErrorResponse.of(json, r);
                    }
                )),
            cacheTimeout
        );
    }

    async querySaved(
        arcql: ArcQL,
        filters: Optional<ArcQLFilters>,
        signal?: AbortSignal,
        // use an explicit timeout for this query
        cacheTimeout?: number
    ): Promise<Either<ErrorResponse, ArcQLResponse>> {
        // can't query if not saved yet
        if (!arcql.isExisting) {
            return new Left(ErrorResponse.of({}, new RejectedResponse()));
        }

        // fetch the dataset which should already be cached
        const dataset = await ServiceProvider.get(DatasetV2Service).describeDataset(
            (arcql.source as SingleSource).fqn, signal, arcql.fullyQualifiedName
        );

        // build out the hash from the saved queries fqn and the new filters
        const hash = md5(JSON.stringify({
            fqn: arcql.fullyQualifiedName.toString(),
            filters: filters.nullable
        }));

        return this.queryCache.getOr(
            hash,
            () => this.queryWithQueue(() => ServiceProvider.get(RestService).post(
                // post to the full api path of the saved query
                '/api/v1/' + arcql.fullyQualifiedName.apiPath + '/query',
                // post the facets
                filters.nullable,
                signal
            ))
                .then(r => ApiResponse.custom(
                    r,
                    // query success
                    json => ArcQLResponse.fromJson(json, dataset.rightOrThrow()),
                    // query error
                    (json: any, r: Response) => {
                        // don't cache query errors since they might be transient
                        this.queryCache.delete(arcql.hash());
                        return ErrorResponse.of(json, r);
                    }
                )),
            cacheTimeout
        );
    }

    async distinctValues(dataset: ArcDataset, request: DistinctValuesArcQLRequest, signal?: AbortSignal): Promise<Either<ErrorResponse, ArcQLResponse>> {
        return ServiceProvider.get(RestService)
            .post(`/api/v1/arcql/values/distinct`, request.toJSON(), signal)
            .then(r => ApiResponse.custom(
                r,
                json => ArcQLResponse.fromJson(json, dataset),
                ErrorResponse.of
            ));
    }

    async rangeValues(dataset: ArcDataset, request: RangeValuesArcQLRequest, signal?: AbortSignal): Promise<Either<ErrorResponse, ArcQLResponse>> {
        return ServiceProvider.get(RestService)
            .post(`/api/v1/arcql/values/range`, request.toJSON(), signal)
            .then(r => ApiResponse.custom(
                r,
                json => ArcQLResponse.fromJson(json, dataset),
                ErrorResponse.of
            ));
    }

    /**
     * Queue up queries to avoid overloading the server.
     */
    private queryWithQueue(query: () => Promise<Response>): Promise<Response> {
        // the first promise helps with blocking the actual query util we're no longer at max concurrent queries
        const queuePromise = this.pendingQueries >= this.maxConcurrent ?
            // we've hit the max, queue up the resolve function which will stall the promise until we're below the max
            new Promise(r => this.queryQueue.push(r)) :
            // we're not at the max, resolve immediately which will invoke the chained then to issue the actual query
            new Promise(r => {
                this.pendingQueries++;
                r(true);
            });

        return queuePromise
            // once the blocking promise resolves, make the actual query
            .then(query)
            // query finished
            .then(response => {
                this.queryDone();
                return response;
            })
            // free up the slot and check the queue even if query has failed, this shouldn't ever happen since we
            // should be resolving a RejectedResponse vs rejecting the promise in the query call
            .catch(reason => {
                this.queryDone();
                return reason;
            });
    }

    /**
     * Free up a query slot and check the queue.
     */
    private queryDone() {
        this.pendingQueries--;
        if (this.queryQueue.length > 0) {
            this.pendingQueries++;
            this.queryQueue.shift()(true);
        }
    }
}
