import { PromisePool } from "@supercharge/promise-pool";
import { XMLBuilder, XMLParser } from "fast-xml-parser";
import invariant from "invariant";
import { clamp, sortBy } from "lodash";
import prettyBytes from "pretty-bytes";
import { z } from "zod";
import type { Ingestion, IngestionApi } from "../../services/datastore";

const xmlParser = new XMLParser({
  ignorePiTags: true,
  processEntities: false,
});

const xmlBuilder = new XMLBuilder({
  ignoreAttributes: true,
  processEntities: false,
});

const errorResponseSchema = z.object({
  Error: z.object({
    Code: z.string(),
    Message: z.string(),
  }),
});

const createUploadResponseSchema = z.object({
  InitiateMultipartUploadResult: z.object({
    Bucket: z.string(),
    Key: z.string(),
    UploadId: z.string(),
  }),
});

const completeUploadResponseSchema = z.union([
  errorResponseSchema,
  z.object({
    // Don't actually care about the values, just want to know it wasn't an error
    CompleteMultipartUploadResult: z.unknown(),
  }),
]);

interface UploadedPart {
  partNumber: number;
  eTag: string;
}

export type UpdateEvent =
  | { type: "created" }
  // `percent` is a value in the range [0, 1]
  | { type: "progress"; percent: number }
  | { type: "uploaded" }
  | { type: "completed" }
  | { type: "error" }
  | { type: "cancel-started" }
  | { type: "cancel-failed" }
  | { type: "cancel-succeeded" };

export interface MultipartUploadOptions {
  ingestionApi: IngestionApi;
  file: File;
  ingestionId: Ingestion["id"];
  onUpdate?: (event: UpdateEvent) => void;
}

export default class MultipartUpload {
  // 150 GB
  static readonly MAX_FILE_SIZE_BYTES = 150 * 1e9;
  // 150 MB
  static readonly #PART_SIZE_BYTES = 150 * 1e6;

  readonly file: File;
  readonly ingestionId: Ingestion["id"];

  readonly #onUpdate: MultipartUploadOptions["onUpdate"];

  #bucket: string | null = null;
  #key: string | null = null;
  #uploadId: string | null = null;

  readonly #ingestionApi: IngestionApi;
  readonly #abortController: AbortController = new AbortController();

  constructor({
    ingestionApi,
    file,
    ingestionId,
    onUpdate,
  }: MultipartUploadOptions) {
    if (file.size > MultipartUpload.MAX_FILE_SIZE_BYTES) {
      const maxSizePretty = prettyBytes(MultipartUpload.MAX_FILE_SIZE_BYTES);
      const givenSizePretty = prettyBytes(file.size);

      throw new Error(
        `File is too large. Maximum size: ${maxSizePretty}. Given size: ${givenSizePretty}`
      );
    }

    this.file = file;
    this.ingestionId = ingestionId;

    this.#onUpdate = onUpdate;

    this.#ingestionApi = ingestionApi;
  }

  #dispatchUpdate(event: UpdateEvent) {
    if (this.#getSignal().aborted) {
      // Possible upload was cancelled after request was made but prior
      // to event being dispatched. In that case, just ignore event
      // since it's moot
      return;
    }

    this.#onUpdate?.(event);
  }

  #getSignal() {
    return this.#abortController.signal;
  }

  #abortRequests() {
    this.#abortController.abort();
  }

  async #create() {
    const urlResponse = await this.#ingestionApi.createIngestionPresignedUrl(
      {
        ingestionId: this.ingestionId,
        createPresignedURLRequest: {
          method: "create_multipart_upload",
          params: {
            key: this.file.name,
          },
        },
      },
      { signal: this.#getSignal() }
    );

    const createUploadResponse = await fetch(urlResponse.url, {
      method: "POST",
      signal: this.#getSignal(),
    });

    if (!createUploadResponse.ok) {
      throw createUploadResponse;
    }

    const responseBody = xmlParser.parse(await createUploadResponse.text());

    const {
      InitiateMultipartUploadResult: { Bucket, Key, UploadId },
    } = createUploadResponseSchema.parse(responseBody);

    this.#dispatchUpdate({ type: "created" });

    this.#bucket = Bucket;
    this.#key = Key;
    this.#uploadId = UploadId;

    return { bucket: Bucket, key: Key, uploadId: UploadId };
  }

  async #uploadPart(
    key: string,
    uploadId: string,
    partIndex: number
  ): Promise<UploadedPart> {
    // S3 part numbers need to be 1-indexed
    const partNumber = partIndex + 1;

    const presignedUrlResponse =
      await this.#ingestionApi.createIngestionPresignedUrl(
        {
          ingestionId: this.ingestionId,
          createPresignedURLRequest: {
            method: "upload_part",
            params: {
              key,
              partNumber,
              uploadId,
            },
          },
        },
        { signal: this.#getSignal() }
      );

    const offset = partIndex * MultipartUpload.#PART_SIZE_BYTES;
    const part = this.file.slice(
      offset,
      offset + MultipartUpload.#PART_SIZE_BYTES,
      this.file.type
    );

    const partUploadResponse = await fetch(presignedUrlResponse.url, {
      method: "PUT",
      body: part,
      signal: this.#getSignal(),
    });

    // TODO: Retrying?
    if (!partUploadResponse.ok) {
      throw partUploadResponse;
    }

    const eTag = partUploadResponse.headers.get("ETag");

    invariant(eTag !== null, "ETag header was missing");

    return { partNumber, eTag };
  }

  async #uploadParts(key: string, uploadId: string) {
    const numParts = Math.ceil(
      this.file.size / MultipartUpload.#PART_SIZE_BYTES
    );

    // Works like `_.range(numParts)`
    const partIndices = [...Array(numParts).keys()];

    const { results } = await PromisePool.for(partIndices)
      .withConcurrency(10)
      .onTaskFinished((_, pool) => {
        this.#dispatchUpdate({
          type: "progress",
          percent: clamp(pool.processedPercentage() / 100, 0, 1),
        });
      })
      // Will cause the `await PromisePool` statement above to throw
      .handleError((error) => {
        this.#abortRequests();

        throw error;
      })
      .process(this.#uploadPart.bind(this, key, uploadId));

    this.#dispatchUpdate({ type: "uploaded" });

    return sortBy(results, "partNumber");
  }

  async #completeUpload(key: string, uploadId: string, parts: UploadedPart[]) {
    const { url: presignedUrl } =
      await this.#ingestionApi.createIngestionPresignedUrl(
        {
          ingestionId: this.ingestionId,
          createPresignedURLRequest: {
            method: "complete_multipart_upload",
            params: {
              key,
              uploadId,
            },
          },
        },
        { signal: this.#getSignal() }
      );

    const requestBody = {
      CompleteMultipartUpload: {
        Part: parts.map((part) => ({
          PartNumber: part.partNumber,
          ETag: part.eTag,
        })),
      },
    };

    const completeUploadResponse = await fetch(presignedUrl, {
      method: "POST",
      body: xmlBuilder.build(requestBody),
      signal: this.#getSignal(),
    });

    if (!completeUploadResponse.ok) {
      throw completeUploadResponse;
    }

    const responseBody = xmlParser.parse(await completeUploadResponse.text());

    const parsedResponse = completeUploadResponseSchema.parse(responseBody);

    if ("Error" in parsedResponse) {
      console.error("Error completing multipart upload:\n", parsedResponse);
      throw completeUploadResponse;
    }

    this.#dispatchUpdate({ type: "completed" });
  }

  async start() {
    let shouldCallCancel = false;
    try {
      const { bucket, key, uploadId } = await this.#create();

      shouldCallCancel = true;

      const parts = await this.#uploadParts(key, uploadId);

      await this.#completeUpload(key, uploadId, parts);

      return { bucket, key };
    } catch (e) {
      this.#dispatchUpdate({ type: "error" });

      if (shouldCallCancel) {
        await this.cancel();
      }

      throw e;
    }
  }

  async #abortUpload() {
    if (
      this.#key === null ||
      this.#bucket === null ||
      this.#uploadId === null
    ) {
      // Upload hasn't been created yet so don't attempt to abort it in S3
      return;
    }

    // Upload has been created in S3 so it also needs to be aborted
    const { url: presignedUrl } =
      await this.#ingestionApi.createIngestionPresignedUrl({
        ingestionId: this.ingestionId,
        createPresignedURLRequest: {
          method: "abort_multipart_upload",
          params: {
            key: this.#key,
            bucket: this.#bucket,
            uploadId: this.#uploadId,
          },
        },
      });

    const abortResponse = await fetch(presignedUrl, { method: "DELETE" });

    if (!abortResponse.ok) {
      throw abortResponse;
    }
  }

  async cancel() {
    this.#abortRequests();

    this.#dispatchUpdate({ type: "cancel-started" });

    try {
      await this.#abortUpload();

      this.#dispatchUpdate({ type: "cancel-succeeded" });
    } catch {
      // The `cancel` method should never reject. The user will need to
      // listen for the "cancel-failed" event to know if it failed
      this.#dispatchUpdate({ type: "cancel-failed" });
    }
  }
}
