import { ChatAdapterExtras, StreamingAdapterObserver } from '@nlux/core';
import { NluxError, NluxUsageError } from '../types/error';
import { warn } from '../utils/warn';
import { parseChunk } from '../parser/parseChunk';
import { ChatAdapterOptions } from '../types/adapterOptions';
import { adapterErrorToExceptionId } from '../utils/adapterErrorToExceptionId';
import { LangGraphAbstractAdapter } from './adapter';
import { parseStreamedEvent } from '../parser/parseStreamedEvent';

export class LangGraphStreamAdapter<AiMsg> extends LangGraphAbstractAdapter<AiMsg> {
  constructor(options: ChatAdapterOptions<AiMsg>) {
    super(options);
  }

  async batchText(
    message: string,
    extras: ChatAdapterExtras<AiMsg>
  ): Promise<string | object | undefined> {
    throw new NluxUsageError({
      source: this.constructor.name,
      message: 'Cannot fetch text using the stream adapter!',
    });
  }

  streamText(
    message: string,
    observer: StreamingAdapterObserver<string | object | undefined>,
    extras: ChatAdapterExtras<AiMsg>
  ): void {
    const body = this.getRequestBody(message, this.config, extras.conversationHistory);

    fetch(this.endpointUrl, {
      method: 'POST',
      headers: {
        ...this.headers,
        'Content-Type': 'application/json',
      },
      body,
    })
      .then(async (response) => {
        if (!response.ok) {
          throw new NluxError({
            source: this.constructor.name,
            message: `LangGraph runnable returned status code: ${response.status}`,
          });
        }

        if (!response.body) {
          throw new NluxError({
            source: this.constructor.name,
            message: `LangGraph runnable returned status code: ${response.status}`,
          });
        }

        // Read a stream of server-sent events
        // and feed them to the observer as they are being generated
        const reader = response.body.getReader();
        const textDecoder = new TextDecoder();

        while (true) {
          const { value, done } = await reader.read();
          if (done) {
            break;
          }

          const event = textDecoder.decode(value);
          const eventData = parseStreamedEvent(event);
          console.log(eventData);
          if (eventData) {
            if (eventData.event === 'messages' && eventData.data !== undefined) {
              observer.next(eventData.data as string | object | undefined);
            }
          }
        }
        observer.complete();
      })
      .catch((error) => {
        warn(error);
        observer.error(
          new NluxUsageError({
            source: this.constructor.name,
            message: error.message,
            exceptionId: adapterErrorToExceptionId(error) ?? undefined,
          })
        );
      });
  }
}
