import { ServerStreamingCall } from '@protobuf-ts/runtime-rpc';
import { QueryKey, useQuery } from '@tanstack/react-query';
import { queryClient } from 'app/queryClient';
import { differenceInSeconds } from 'date-fns';

const maxSecondsWithoutMessage = 10;
const abortLoopInterval = 4; // secs

interface StreamArguments<TRequest extends object, TResponse extends object> {
  key: QueryKey;
  queryFn: (abort: AbortSignal) => ServerStreamingCall<TRequest, TResponse>;
  onError?: (err: Error) => void;
  onMessage?: (message: TResponse) => void;
  enabled?: boolean;
  stateMerger?: (oldState: TResponse | undefined, newState: TResponse) => TResponse;
}

export const useStream = <TRequest extends object, TResponse extends object>({
  key,
  queryFn,
  onError,
  onMessage,
  enabled,
  stateMerger,
}: StreamArguments<TRequest, TResponse>) => {
  useQuery({
    queryKey: [key, 'watch'],
    queryFn: async ({ signal: querySignal }) => {
      // Setup an abort controller for the message interval loop to use to cancel
      const controller = new AbortController();
      const signal = anySignal(querySignal, controller.signal);

      console.log('[event stream] starting');
      const call = queryFn(signal);

      let lastMessage = new Date();
      (async () => {
        while (!signal?.aborted) {
          // If we have not had a message in the last 20 seconds, we abort the stream
          if (differenceInSeconds(new Date(), lastMessage) > maxSecondsWithoutMessage) {
            console.error(
              '[event stream] aborting due to no message in ' +
                maxSecondsWithoutMessage +
                ' seconds',
            );
            controller.abort('No message in ' + maxSecondsWithoutMessage + ' seconds');
            return;
          }
          await sleep(abortLoopInterval);
        }
      })().catch(err => {
        console.error('[event stream] Error in timeout loop:', err);
      });

      call.responses.onMessage(message => {
        lastMessage = new Date();
        queryClient.setQueryData([key, 'state'], (data: TResponse | undefined) =>
          stateMerger ? stateMerger(data, message) : { ...(data || {}), ...message },
        );
        onMessage?.(message);
        console.debug('[event stream] message:', message);
      });

      call.responses.onError(err => {
        console.warn('[event stream] errored:', err);
        onError?.(err);
      });

      const { status, trailers } = await call;
      console.warn('[event stream] ended:', { status, trailers });
      throw new Error('Stream ended'); // Ensures that react-query will retry
    },
    retry: true,
    retryDelay: 1000,
    refetchIntervalInBackground: true,
    enabled,
  });

  return useQuery({
    queryKey: [key, 'state'],
    queryFn: () => new Promise<TResponse>(() => undefined), // never resolve
    refetchInterval: false,
    enabled,
  });
};

const sleep = async (secs: number) => {
  return new Promise(resolve => setTimeout(resolve, secs * 1000));
};

const anySignal = (...signals: (AbortSignal | undefined)[]) => {
  const controller = new AbortController();

  const onAbort = () => {
    controller.abort();
    // Cleanup
    for (const signal of signals) {
      signal?.removeEventListener('abort', onAbort);
    }
  };

  for (const signal of signals) {
    if (signal?.aborted) {
      onAbort();
      break;
    }
    signal?.addEventListener('abort', onAbort);
  }
  return controller.signal;
};
