import {ArcEvent} from "engine/ArcEvent";
import {ArcActor} from "engine/actor/ArcActor";
import {ArcEventFilter} from "engine/ArcEventFilter";
import {ArcMessage, ArcMessageType} from "engine/ArcMessage";
import {ArcEngineError} from "engine/ArcEngineError";
import {ArcEngineActorConnector} from "engine/ArcEngineActorConnector";
import {FacetingMessage} from "engine/FacetingMessage";
import {SelectionMessage} from "engine/SelectionMessage";
import {ActorStatus} from "engine/actor/ActorStatus";
import {ActorStatusMessage} from "engine/ActorStatusMessage";
import {SystemActor} from "engine/actor/SystemActor";
import {Optional} from "common/Optional";
import {LoadStateMessage} from "engine/actor/LoadStateMessage";
import {ArcEngineState} from "engine/ArcEngineState";


/**
 * Internal message for publish queue.
 */
type PublishMessage = {
    actor: ArcActor,
    message: ArcMessage
};

/**
 * Engine for HyperArc visualizations.
 *
 * Maintains the runtime execution state including query results and user interactions vs the metadata state which
 * describes the save-able serializable state of a visualization.
 *
 * @author zuyezheng
 */
export class ArcEngine {

    private readonly actors: Map<string, ArcActor>;
    // Status of each actor.
    public readonly actorStatuses: Map<string, ActorStatus>;

    public readonly state: ArcEngineState;

    private readonly systemActor: SystemActor;

    private lockPublishing: boolean;

    // Message queue to hold messages published during state loading
    private publishQueue: PublishMessage[];

    constructor(public readonly id: string) {
        this.actors = new Map();
        this.actorStatuses = new Map();
        this.state = new ArcEngineState();

        this.systemActor = new SystemActor();
        this.register(this.systemActor);
        this.lockPublishing = false;
        this.publishQueue = [];
    }

    /**
     * Register a new actor, returning a promise with the resulting ActorStatus.
     */
    register(actor: ArcActor, bypassInitialState?: boolean): Promise<ActorStatus> {
        if (this.actors.has(actor.id)) {
            throw new ArcEngineError(`Actor '${actor.id}' is already registered.`);
        }

        // register actor
        this.actors.set(actor.id, actor);

        // create a connector specific to the actor
        actor.connect(new ArcEngineActorConnector(this, actor));

        // trigger any initialization steps
        this.publish(actor, new ActorStatusMessage(ActorStatus.INITIALIZING));
        const controller = new AbortController();

        return actor.initialize(controller)
            .then(
                // initialization successful
                (sendState: boolean) => {
                    const finalShouldSendState = bypassInitialState !== undefined && bypassInitialState ? false : sendState;
                    // send existing stateful messages to the actor if requested
                    if (finalShouldSendState) {
                        actor.notify(
                            new ArcEvent(
                                this.systemActor,
                                new LoadStateMessage(actor.id, this.statefulMessages(actor))
                            )
                        );
                    }

                    this.publish(actor, new ActorStatusMessage(ActorStatus.INITIALIZED));
                    actor.postInitialize();
                    return ActorStatus.INITIALIZED;
                },
                // initialization promise rejected
                () => {
                    this.publish(actor, new ActorStatusMessage(ActorStatus.INITIALIZATION_ERROR));
                    return ActorStatus.INITIALIZATION_ERROR;
                }
            );
    }

    /**
     * Register all actors, returning a promise once all initialization is complete with the status of each actor.
     */
    registerAll(actors: ArcActor[], bypassInitialState?: boolean): Promise<ActorStatus[]> {
        return Promise.all(actors.map(a => this.register(a, bypassInitialState)));
    }

    /**
     * Remove an actor from the engine.
     */
    removeActor(actorId: string): ArcEngine {
        this.actors.delete(actorId);
        this.actorStatuses.delete(actorId);
        this.state.removeActor(actorId);

        return this;
    }

    getActorIds(includeSystem: boolean): Set<string> {
        const keys = Array.from(this.actors.keys());
        // TODO ZZ make system actors less janks
        return new Set(includeSystem ? keys : keys.filter(k => !k.startsWith('__SYSTEM__')));
    }

    getActor<T extends ArcActor>(id: string): T {
        return <T>this.actors.get(id);
    }

    /**
     * Publish a message from the given actor.
     */
    publish(actor: ArcActor, message: ArcMessage) {
        if (this.lockPublishing) {
            this.publishQueue.push({actor, message});
            return;
        }

        if (this.actors.get(actor.id) !== actor) {
            // due to race conditions with queries pending and actors being removed, we could get ghost messages from
            // previous versions of actors so ignore them if they don't watch the current actor object
            return;
        }

        // see if we need to retain the state
        if (message.type.stateful) {
            this.state.setActorMessage(actor.id, message.type, message);
        }

        if (message instanceof ActorStatusMessage) {
            this.actorStatuses.set(actor.id, message.status);
        }

        // construct the event from the message with the actor that sent it
        const event = new ArcEvent(actor, message);

        // notify other actors that might care
        this.actors.forEach((a: ArcActor) => {
            // don't publish a message to itself
            if (actor.id === a.id) {
                return;
            }

            this.tryNotify(a, event);
        });

        // if it's a selection message, also publish faceting state for all other actors
        if (message instanceof SelectionMessage) {
            this.actors.forEach((a: ArcActor) => {
                if (actor.id !== a.id) {
                    this.tryNotify(a, new ArcEvent(actor, this.datasetFacetingState(message.datasetFqn, a)));
                }
            });
        }
    }

    /**
     * Notify if the actor is ready and is waiting for the event type.
     */
    tryNotify(actor: ArcActor, event: ArcEvent): void {
        // actor is not ready
        if (!this.actorStatuses.get(actor.id).isSteady()) {
            return;
        }

        // actor doesn't care about the event
        if (!actor.eventFilters().some((filter: ArcEventFilter) => filter.of(event))) {
            return;
        }

        actor.notify(event);
    }

    /**
     * Collect on some attribute of actors of a certain type.
     */
    collectActors<AT, RT>(
        // if it's an actor of this class
        clazz: abstract new (...args: any[]) => AT,
        // collect something about it
        collector: (actor: AT) => RT
    ): RT[] {
        return Array.from(this.actors.values())
            .filter(actor => this.actorStatuses.get(actor.id).isSteady())
            .flatMap(actor => Optional.ofType(actor, clazz).map(collector).array);
    }

    /**
     * Waits for all registered actors to reach the INITIALIZED or READY state.
     */
    waitActorsReady(): Promise<void> {
        return new Promise((resolve, reject) => {
            const checkIntervalMs = 100; // How often to check the actors' states, in milliseconds.
            const timeoutMs = 30000; // Maximum wait time before timing out, in milliseconds.
            let elapsedTimeMs = 0;

            const checkActorsReady = () => {
                // Check if all actors are in the INITIALIZED state.
                const allInitialized = Array.from(this.actorStatuses.values())
                    .every(status => status === ActorStatus.INITIALIZED || status === ActorStatus.READY);

                if (allInitialized) {
                    resolve(); // All actors are ready.
                } else if (elapsedTimeMs >= timeoutMs) {
                    reject(new Error('Timeout waiting for all actors to be ready.'));
                } else {
                    elapsedTimeMs += checkIntervalMs;
                    setTimeout(checkActorsReady, checkIntervalMs); // Check again after a delay.
                }
            };

            checkActorsReady();
        });
    }

    /**
     *  This will clear all existing stateful messages and notify all actors with the loaded new state.
     */
    loadState(newActorsState: ArcEngineState): void {
        // lock publishing to prevent any messages from propagating while state loading is taking place
        this.lockPublishing = true;

        // clear current state
        this.state.clear();

        // load new stateful messages for each known actor
        newActorsState.allMessages
            .forEach(
                ([actorId, state]: [string, Map<string, ArcMessage>]) => {
                    if (this.actors.has(actorId)) {
                        this.state.setActorState(actorId, state);
                    }
                }
            );

        // notify all actors
        this.actors.forEach((a: ArcActor) => {
            this.tryNotify(a, new ArcEvent(
                this.systemActor,
                new LoadStateMessage(a.id, this.statefulMessages(a))
            ));
        });

        this.lockPublishing = false;

        // now propagate any messages that were enqueued during state loading
        this.processMessageQueue();
    }


    /**
     * Create a faceting message from selection state for the given dataset, excluding the given actor.
     */
    private datasetFacetingState(datasetFqn: string, actor: ArcActor): FacetingMessage {
        const messages: SelectionMessage[] = this.selectionsState(actor, true)
            // only care about selections for the given dataset
            .filter((message: SelectionMessage) => message.datasetFqn === datasetFqn);

        return new FacetingMessage(
            datasetFqn,
            // flatten into a list of facets
            messages.flatMap((selectionMessage: SelectionMessage) => selectionMessage.facet)
        );
    }

    /**
     * Get all stateful messages for the given actor.
     */
    private statefulMessages(actor: ArcActor): ArcMessage[] {
        // faceting state requires some collating of selection messages across actors
        const messagesByDataset: Map<string, SelectionMessage[]> = this.selectionsState(actor, true)
            // accumulate messages by dataset
            .reduce(
                (acc: Map<string, SelectionMessage[]>, message: SelectionMessage) => {
                    let messages = acc.get(message.datasetFqn);
                    if (messages == null) {
                        messages = [];
                        acc.set(message.datasetFqn, messages);
                    }
                    messages.push(message);

                    return acc;
                },
                new Map()
            );

        const facetingMessages = Array.from(messagesByDataset.entries())
            .map(([datasetFqn, messages]: [string, SelectionMessage[]]) => new FacetingMessage(
                datasetFqn,
                // flatten into a list of facets
                messages.flatMap((selectionMessage: SelectionMessage) => selectionMessage.facet)
            ));

        // all other state we can just clobber together
        const otherMessages = this.state.getStateMessagesExcluding(actor.id, ArcMessageType.SELECTION);

        // also propagate the original selection stateful message designated to this actor (if any)
        otherMessages.push(...this.selectionsState(actor));

        // and same with global filters stateful message to original actor
        this.state.getActorMessage(actor.id, ArcMessageType.GLOBAL_FILTERS)
            .map(s => otherMessages.push(s));

        // merge them together
        return [...otherMessages, ...facetingMessages];
    }

    /**
     * Return all the selection state either including or excluding the given actor.
     */
    private selectionsState(targetActor: ArcActor, excluding: boolean = false): SelectionMessage[] {
        return this.state.messagesOfType(ArcMessageType.SELECTION)
            .filter(([actorId, _]: [string, ArcMessage]) => excluding ? actorId !== targetActor.id : actorId === targetActor.id)
            .map(([_, message]: [string, ArcMessage]) => message as SelectionMessage);
    }


    /**
     * Process all the messages enqueued during state loading.
     */
    private processMessageQueue(): void {
        while (this.publishQueue.length > 0) {
            const message = this.publishQueue.shift();
            this.publish(message.actor, message.message);
        }
    }
}