import Constants from "expo-constants";
import { combineEpics, Epic } from "redux-observable";
import { concat, EMPTY, interval, merge, Observable, of } from "rxjs";
import makeWebSocketObservable, {
  GetWebSocketResponses,
} from "rxjs-websockets";
import {
  catchError,
  delay,
  filter,
  map,
  mapTo,
  mergeMap,
  switchMap,
  takeUntil,
} from "rxjs/operators";
import { Action, AnyAction } from "typescript-fsa";
import { ofActionPayload } from "typescript-fsa-redux-observable";
import deviceActions from "../actions/Device";
import eventActions from "../actions/Event";
import loginActions from "../actions/Login";
import messageActions from "../actions/Message";
import userActions from "../actions/User";
import {
  EventPage,
  MessageEvent,
  WebSocketMessageType,
} from "../services/api/interface";
import { parseWebSocketMessage } from "../services/api/parsers";
import { State } from "../states";
import { Dependencies } from "./dependencies";
import { makeAPICallEpic, observeAppState } from "./helpers";

const startLoadingEvents: Epic<AnyAction, AnyAction, State> = (
  action$,
  state
) =>
  action$.pipe(
    ofActionPayload(
      userActions.loadCurrentUser.done,
      deviceActions.loadCurrentDevice.done
    ),
    map(({ result }) =>
      eventActions.loadEvents.started({
        eventStreamId: result.eventStreamId,
        lastSequenceId:
          state.value.event.streams[result.eventStreamId]?.lastSequenceId ?? 0,
      })
    )
  );

const loadEvents = makeAPICallEpic<
  { eventStreamId: string; lastSequenceId: number },
  EventPage
>(eventActions.loadEvents, (params, { apiClient }) =>
  apiClient.getStreamEvents(params.eventStreamId, params.lastSequenceId)
);

const loadNextEventPage: Epic<AnyAction, AnyAction, State> = (action$, state) =>
  action$.pipe(
    ofActionPayload(eventActions.loadEvents.done),
    mergeMap(({ params, result }) =>
      result.lastSequenceId === null
        ? EMPTY
        : of(
            eventActions.loadEvents.started({
              eventStreamId: params.eventStreamId,
              lastSequenceId: result.lastSequenceId,
            })
          )
    )
  );

const emitAllEventsLoaded: Epic<AnyAction, Action<String>, State> = (
  action$,
  state
) =>
  action$.pipe(
    ofActionPayload(eventActions.loadEvents.done),
    filter(({ result }) => result.lastSequenceId === null),
    map(({ params }) => eventActions.allEventsLoaded(params.eventStreamId))
  );

const startWebSocketSessionAfterEventsLoaded: Epic<
  AnyAction,
  AnyAction,
  State
> = (action$, state) =>
  action$.pipe(
    ofActionPayload(eventActions.allEventsLoaded),
    filter(() =>
      Object.entries(state.value.event.streams).every(
        ([_, stream]) => stream.waitingForUpdate
      )
    ),
    mapTo(eventActions.subscribeEventUpdates.started())
  );

const subscribeEventUpdates: Epic<AnyAction, AnyAction, State, Dependencies> = (
  action$,
  state,
  { apiClient }
) =>
  action$.pipe(
    ofActionPayload(eventActions.subscribeEventUpdates.started),
    switchMap(() =>
      apiClient.getWebSocketConfig().pipe(
        switchMap((config) =>
          concat(
            makeWebSocketObservable(
              Constants.manifest!.extra!.webSocketBaseURL +
                "/api/v1/event_streams/updates?" +
                new URLSearchParams({
                  token: config.token,
                }).toString()
            ).pipe(
              switchMap((getResponses: GetWebSocketResponses) => {
                let subscribeMessages: Observable<string> = of(
                  ...Object.entries(state.value.event.streams).map(
                    ([eventStreamId, eventStream]) =>
                      JSON.stringify({
                        type: "SUBSCRIBE",
                        event_stream_id: eventStreamId,
                        last_sequence_id: eventStream.lastSequenceId,
                      })
                  )
                );
                let heatbeatMessages: Observable<string> = EMPTY;
                if (config.heartbeatEnabled) {
                  console.info(
                    "Heartbeat enabled with interval=",
                    config.heartbeatInterval,
                    "seconds"
                  );
                  heatbeatMessages = merge(
                    of(undefined),
                    observeAppState().pipe(
                      filter(
                        (status) => status === "active" || status === "inactive"
                      )
                    )
                  ).pipe(
                    switchMap(() =>
                      concat(
                        of(undefined),
                        interval(config.heartbeatInterval * 1000)
                      )
                    ),
                    map(() =>
                      JSON.stringify({
                        type: "HEARTBEAT",
                        heartbeat: { timestamp: new Date().getTime() },
                      })
                    )
                  );
                }
                return getResponses(merge(subscribeMessages, heatbeatMessages));
              }),
              mergeMap((webSocketPayload) => {
                const jsonPayload = JSON.parse(webSocketPayload as string);
                if ("type" in jsonPayload && jsonPayload["type"] === "error") {
                  console.warn("WebSocket error message", jsonPayload);
                  throw new Error(`WebSocket error message ${jsonPayload}`);
                }
                const webSocketMessage = parseWebSocketMessage(jsonPayload);
                if (webSocketMessage === null) {
                  return EMPTY;
                }
                switch (webSocketMessage.type) {
                  case WebSocketMessageType.EVENT_UPDATE: {
                    return of(
                      eventActions.eventUpdate({
                        eventStreamId: webSocketMessage.eventStreamId,
                        eventPage: {
                          events: webSocketMessage.events,
                          lastSequenceId: webSocketMessage.lastSequenceId,
                        },
                      })
                    );
                  }
                }
              })
            ),
            of(eventActions.subscribeEventUpdates.done({}))
          )
        ),
        catchError((error) => {
          console.warn("Subscription ended because of WebSocket error", error);
          return of(eventActions.subscribeEventUpdates.failed({ error }));
        }),
        takeUntil(action$.pipe(ofActionPayload(loginActions.logout)))
      )
    )
  );

const retrySubscribeEventUpdates: Epic<AnyAction, AnyAction, State> = (
  action$
) =>
  action$.pipe(
    ofActionPayload(
      eventActions.subscribeEventUpdates.done,
      eventActions.subscribeEventUpdates.failed
    ),
    switchMap(() => {
      const seconds = 3 + Math.random() * 5;
      console.log(`Retry subscribe event updates in ${seconds}`);
      return of(eventActions.subscribeEventUpdates.started()).pipe(
        delay(seconds * 1000)
      );
    })
  );

const scrollToLatestMessage: Epic<AnyAction, Action<void>, State> = (
  action$,
  state
) =>
  action$.pipe(
    ofActionPayload(eventActions.eventUpdate),
    filter(
      (payload) =>
        state.value.messages.viewingLatest &&
        payload.eventStreamId === state.value.messages.eventStreamId &&
        // Skip the scroll if it is a message from this client
        !payload.eventPage.events.every(
          (event) => (event as MessageEvent).localId! in state.value.messages
        )
    ),
    // Need to delay a bit to let the message appending cycle finishs first so that
    // we can call the scroll method on the list
    delay(0),
    mapTo(messageActions.scrollToLatestMessage())
  );

const eventEpic = combineEpics(
  startLoadingEvents,
  loadEvents,
  loadNextEventPage,
  emitAllEventsLoaded,
  startWebSocketSessionAfterEventsLoaded,
  subscribeEventUpdates,
  retrySubscribeEventUpdates,
  scrollToLatestMessage
);

export default eventEpic;
