import {
  BulkWriteRow,
  EventBulk,
  getChangedDocumentsSince,
  getPrimaryFieldOfPrimaryKey,
  getQueryPlan,
  PreparedQuery,
  PROMISE_RESOLVE_TRUE,
  promiseWait,
  RX_META_LWT_MINIMUM,
  RxConflictResultionTask,
  RxConflictResultionTaskSolution,
  RxDocumentData,
  RxJsonSchema,
  RxStorageBulkWriteResponse,
  RxStorageChangeEvent,
  RxStorageCountResult,
  RxStorageDefaultCheckpoint,
  RxStorageInstance,
  RxStorageQueryResult,
  StringKeys,
} from 'rxdb';
import { now } from 'rxdb/plugins/utils';
import { Observable } from 'rxjs';
import { filter, mergeWith } from 'rxjs/operators';

import { DeferredPromise } from '../../../../utils/DeferredPromise';
import {
  type HybridStorageSettings,
  HybridInstanceCreationOptions,
  HybridStorageInternals,
} from './hybrid-types';

enum ReplicationContext {
  Downstream = 'hybrid-storage-downstream',
  Upstream = 'hybrid-storage-upstream',
}

export class HybridRxStorageInstance<RxDocType>
  implements
    RxStorageInstance<
      RxDocType,
      HybridStorageInternals<RxDocType>,
      HybridInstanceCreationOptions,
      RxStorageDefaultCheckpoint
    > {
  private static stats = {
    writes: 0,
    queries: 0,
    counts: 0,
    changes: 0,
  };

  public readonly primaryPath: StringKeys<RxDocType>;
  public closed = false;
  public replicationDone = false;
  private replicationDonePromise = new DeferredPromise<void>();

  constructor(
    public readonly databaseName: string,
    public readonly collectionName: string,
    public readonly schema: Readonly<RxJsonSchema<RxDocumentData<RxDocType>>>,
    public readonly internals: HybridStorageInternals<RxDocType>,
    public readonly options: Readonly<HybridInstanceCreationOptions>,
  ) {
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    this.primaryPath = getPrimaryFieldOfPrimaryKey(this.schema.primaryKey) as any;
  }

  async bulkWrite(
    documentWrites: BulkWriteRow<RxDocType>[],
    context: string,
  ): Promise<RxStorageBulkWriteResponse<RxDocType>> {
    const len = documentWrites.length;
    HybridRxStorageInstance.stats.writes += len;
    let result;
    try {
      result = await this.instance.bulkWrite(documentWrites, context);
      // we write synchronously to the master instance when replication is done
      if (this.replicationDone) {
        await this.internals.masterInstance.bulkWrite(documentWrites, ReplicationContext.Upstream);
      }
    } finally {
      HybridRxStorageInstance.stats.writes -= len;
    }
    return result;
  }

  async findDocumentsById(ids: string[], withDeleted: boolean): Promise<RxDocumentData<RxDocType>[]> {
    const len = ids.length;
    HybridRxStorageInstance.stats.queries += len;
    let result;
    try {
      result = await this.instance.findDocumentsById(ids, withDeleted);
    } finally {
      HybridRxStorageInstance.stats.queries -= len;
    }
    return result;
  }

  async query(preparedQuery: PreparedQuery<RxDocType>): Promise<RxStorageQueryResult<RxDocType>> {

    /* This covers the special case of prepared queries knowing of more indices
     * than available on the master storage. We need to correct the optimal
     * index in case replication isn't done.
     */
    const sanitizedQuery = this.replicationDone
      ? preparedQuery
      : this.sanitizePreparedQueryForMasterInstance(preparedQuery);

    HybridRxStorageInstance.stats.queries += 1;
    let result;
    try {
      result = await this.instance.query(sanitizedQuery);
    } finally {
      HybridRxStorageInstance.stats.queries -= 1;
    }
    return result;
  }

  async count(preparedQuery: PreparedQuery<RxDocType>): Promise<RxStorageCountResult> {

    /* This covers the special case of prepared queries knowing of more indices
     * than available on the master storage. We need to correct the optimal
     * index in case replication isn't done.
     */
    const sanitizedQuery = this.replicationDone
      ? preparedQuery
      : this.sanitizePreparedQueryForMasterInstance(preparedQuery);

    HybridRxStorageInstance.stats.counts += 1;
    let result;
    try {
      result = await this.instance.count(sanitizedQuery);
    } finally {
      HybridRxStorageInstance.stats.counts -= 1;
    }
    return result;
  }

  async getAttachmentData(documentId: string, attachmentId: string, digest: string): Promise<string> {
    return this.instance.getAttachmentData(documentId, attachmentId, digest);
  }

  async getChangedDocumentsSince(
    limit: number,
    checkpoint?: RxStorageDefaultCheckpoint,
  ): Promise<{
    documents: RxDocumentData<RxDocType>[];
    checkpoint: RxStorageDefaultCheckpoint;
  }> {
    HybridRxStorageInstance.stats.changes += 1;
    let result;
    try {
      result = await getChangedDocumentsSince(this.instance, limit, checkpoint);
    } finally {
      HybridRxStorageInstance.stats.changes -= 1;
    }
    return result;
  }

  changeStream(): Observable<
    EventBulk<RxStorageChangeEvent<RxDocumentData<RxDocType>>, RxStorageDefaultCheckpoint>
  > {

    /* We merge both change streams, to cover cases when events are in flight,
     * but not yet emitted. This is fine as we only ever write to one storage at
     * a time. We still don't want to re-emit change events origination from
     * the fork instance during replication, as we'd otherwise emit events that
     * aren't real.
     */
    const forkChangeStream = this.internals.forkInstance
      .changeStream()
      .pipe(filter((e) => e.context !== ReplicationContext.Downstream));

    const masterChangeStream = this.internals.masterInstance
      .changeStream()
      .pipe(filter((e) => e.context !== ReplicationContext.Upstream));

    const finalChangeStream = masterChangeStream.pipe(mergeWith(forkChangeStream));
    return finalChangeStream as never;
  }

  async cleanup(minimumDeletedTime: number): Promise<boolean> {
    await this.replicationDonePromise;
    const results = await Promise.all([
      this.internals.forkInstance.cleanup(minimumDeletedTime),
      this.internals.masterInstance.cleanup(minimumDeletedTime),
    ]);
    return results.every((result) => Boolean(result));
  }

  async close(): Promise<void> {
    await this.replicationDonePromise;
    await Promise.all([this.internals.forkInstance.close(), this.internals.masterInstance.close()]);
  }

  async remove(): Promise<void> {
    await this.replicationDonePromise;
    await Promise.all([this.internals.forkInstance.remove(), this.internals.masterInstance.remove()]);
  }

  /**
   * Replicate all data from the master into the fork storage.
   */
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  async replicate(
    settings: HybridStorageSettings,
    batchSize = 1000,
    lastCheckpoint?: { id: string; lwt: number; },
  ): Promise<boolean> {
    // already replicated
    if (this.replicationDone) {
      return PROMISE_RESOLVE_TRUE;
    }

    /* Check if database is more or less idle, before we start replicating the
     * next batch of documents, otherwise wait a bit
     */
    if (
      HybridRxStorageInstance.stats.changes > 0 ||
      HybridRxStorageInstance.stats.queries > 0 ||
      HybridRxStorageInstance.stats.counts > 0 ||
      HybridRxStorageInstance.stats.writes > 0
    ) {
      await promiseWait(1250);
      return this.replicate(settings, batchSize, lastCheckpoint);
    }
    const start = now();
    const masterInstance = this.internals.masterInstance;

    const { documents, checkpoint } = await getChangedDocumentsSince(
      masterInstance,
      batchSize,
      lastCheckpoint,
    );

    const forkInstance = this.internals.forkInstance;
    const writeRows: BulkWriteRow<RxDocType>[] = documents.map((document) => ({
      document,
    }));
    await forkInstance.bulkWrite(writeRows, ReplicationContext.Downstream);
    const duration = Math.round(now() - start);

    // if number of documents is smaller than the limit, we are done replicating
    if (documents.length < batchSize || this.isInvalidCheckpoint(checkpoint, lastCheckpoint)) {
      this.replicationDone = true;
      this.replicationDonePromise.resolve();
      return PROMISE_RESOLVE_TRUE;
    }

    // let the db calm down a bit (we skip 2 frames)
    await promiseWait(2 / 60);

    // adjust batch size to create short, but efficient tasks (~50ms)
    // https://web.dev/articles/optimize-long-tasks
    const factor = Math.round(50 / duration);
    // make sure we never do less than a 100, or more than 10000
    const nextBatchSize = Math.min(10000, Math.max(100, batchSize * factor));

    // enqueue next batch of documents
    // eslint-disable-next-line no-console
    return this.replicate(settings, nextBatchSize, checkpoint);
  }

  conflictResultionTasks(): Observable<RxConflictResultionTask<RxDocType>> {
    return this.instance.conflictResultionTasks();
  }

  resolveConflictResultionTask(taskSolution: RxConflictResultionTaskSolution<RxDocType>) {
    return this.instance.resolveConflictResultionTask(taskSolution);
  }

  private get instance() {
    if (this.replicationDone) {
      return this.internals.forkInstance;
    }
    return this.internals.masterInstance;
  }

  /**
   * Prepare the query (again) for the master instance, as it can have a
   * completely different set of indexes. This adds some extra cost to queries,
   * but this only happens for queries during replication is going on.
   *
   * @param preparedQuery
   * @private
   */
  private sanitizePreparedQueryForMasterInstance(preparedQuery: PreparedQuery<RxDocType>) {
    const masterSchema = this.internals.masterInstance.schema;
    const queryPlan = getQueryPlan(masterSchema, preparedQuery.query);
    return {
      query: preparedQuery.query,
      queryPlan,
    };
  }

  private isInvalidCheckpoint(
    checkpoint?: { id: string; lwt: number; },
    lastCheckpoint?: { id: string; lwt: number; },
  ) {
    if (!checkpoint) {
      return true;
    }

    if (checkpoint.id === '' && checkpoint.lwt === RX_META_LWT_MINIMUM) {
      return true;
    }

    return checkpoint.id === lastCheckpoint?.id && checkpoint.lwt === lastCheckpoint.lwt;
  }
}
