import { all, call, put, select, takeEvery, takeLatest, takeLeading } from 'redux-saga/effects';
import delay from '@redux-saga/delay-p';
import {
  connect,
  WEBSOCKET_MESSAGE,
  DEFAULT_PREFIX as WEBSOCKET_PREFIX,
  send,
  WEBSOCKET_CLOSED,
  WEBSOCKET_OPEN
} from '@giantmachines/redux-websocket';
import { RootState } from '../rootReducer';
import {
  setIncomingMessage,
  subRequestToKey,
  websocketActions,
  WebsocketActionType,
  WebsocketRequestPayload,
  WebsocketResponsePayload
} from './reducer';
import { WebsocketDto, WebsocketResponse } from '@pclocs/platform-sdk';
import _ from 'lodash';
import { getAuthTokens } from '../../helpers/auth-token-helper';
import { Awaited } from '../../typescript-utils';

const WEBSOCKET_PING_INTERVAL = 60000;
const WEBSOCKET_PING_ACTION = 'WEBSOCKET_PING';

function* websocketConnect() {
  yield takeLeading(websocketActions.connect.type, function*() {
    try {
      const authTokens: Awaited<ReturnType<typeof getAuthTokens>> = yield call(getAuthTokens);
      if (!authTokens) {
        throw Error('No session found for websocket connection');
      }
      const websocketAuthToken: string = authTokens.type === 'cognito' ? authTokens.accessToken : authTokens.idToken;
      const socketUrl = `${process.env.SOCKET_API_ENDPOINT}?token=${websocketAuthToken}`;
      yield put(connect(socketUrl));
      yield put({ type: WEBSOCKET_PING_ACTION });
    } catch (e) {
      console.error(e);
    }
  });
}

function* websocketReconnectOnClose() {
  yield takeLeading([`${WEBSOCKET_PREFIX}::${WEBSOCKET_CLOSED}`], function*() {
    do {
      yield put(websocketActions.connect());
      yield delay(10000);
    } while (!((yield select((state: RootState) => state.websocket.isConnected)) as boolean));
  });
}

function* websocketPing() {
  yield takeLatest(WEBSOCKET_PING_ACTION, function*() {
    const accountId: string = yield select(state => state.auth.accountContext);
    while (true) {
      try {
        const isConnected: boolean = yield select((state: RootState) => state.websocket.isConnected);
        if (isConnected) {
          yield put(send({ accountId, route: 'ping' }));
        }
        yield delay(WEBSOCKET_PING_INTERVAL);
      } catch (e) {
        console.error(`Failed a websocket ping: `, e);
      }
    }
  });
}

function* websocketIncomingMessage() {
  interface LibMessageAction {
    type: 'REDUX_WEBSOCKET::MESSAGE';
    meta: { timestamp: Date };
    payload: {
      event: MessageEvent;
      message: string;
      origin: string;
    };
  }
  yield takeEvery(`${WEBSOCKET_PREFIX}::${WEBSOCKET_MESSAGE}`, function*(action: LibMessageAction) {
    try {
      const message = JSON.parse(action.payload.message) as WebsocketResponse | WebsocketDto['streamResponse'];
      if (message.type === 'status') {
        if (message.data?.statusCode > 204) {
          console.error(`Websocket error response: `, message?.data);
        }
      } else {
        yield put(
          setIncomingMessage({
            ..._.omit(message, 'itemId'),
            id: 'itemId' in message ? message.itemId : undefined
          } as WebsocketResponsePayload)
        );
      }
    } catch (e) {
      console.error(`Failed to parse incoming websocket message:`, e);
    }
  });
}

const convertRequestForMessage = (req: WebsocketRequestPayload): WebsocketDto['request'] => ({
  ..._.omit(req, 'id'),
  itemId: 'id' in req ? req.id : undefined
});

function* websocketHandleSubscriptions() {
  yield takeEvery(
    [websocketActions.subscribe.type, websocketActions.unsubscribe.type, websocketActions.resubscribe.type],
    function*(action: WebsocketActionType<'subscribe' | 'unsubscribe' | 'resubscribe'>) {
      try {
        const isConnected: boolean = yield select((state: RootState) => state.websocket.isConnected);
        if (!isConnected) {
          yield put(websocketActions.connect());
        }
        const {
          accountId,
          subscriptionCounts
        }: {
          accountId: RootState['auth']['accountContext'];
          subscriptionCounts: RootState['websocket']['subscriptionCounts'];
        } = yield select((state: RootState) => ({
          accountId: state.auth.accountContext,
          subscriptionCounts: state.websocket.subscriptionCounts
        }));

        const subKey = subRequestToKey(action.payload);
        const subToAction = subscriptionCounts[subKey];

        let subRoute: 'subscribe' | 'unsubscribe';
        if (
          (action.type === websocketActions.subscribe.type && subToAction.count === 1) ||
          (action.type === websocketActions.resubscribe.type && subToAction.count > 0)
        ) {
          subRoute = 'subscribe';
        } else if (action.type === websocketActions.unsubscribe.type && !subToAction?.count) {
          subRoute = 'unsubscribe';
        } else {
          return;
        }

        yield put(
          send({ accountId, route: subRoute, timestamp: Date.now(), ...convertRequestForMessage(action.payload) })
        );
      } catch (e) {
        console.error(e);
      }
    }
  );
}

function* websocketResubscribe() {
  yield takeEvery(`${WEBSOCKET_PREFIX}::${WEBSOCKET_OPEN}`, function*() {
    const {
      accountId,
      subscriptionCounts
    }: {
      accountId: RootState['auth']['accountContext'];
      subscriptionCounts: RootState['websocket']['subscriptionCounts'];
    } = yield select((state: RootState) => ({
      accountId: state.auth.accountContext,
      subscriptionCounts: state.websocket.subscriptionCounts
    }));

    yield all(
      Object.values(subscriptionCounts).map(sub => {
        return put(
          send({
            accountId,
            route: 'subscribe',
            timestamp: new Date().getTime(),
            ...convertRequestForMessage(sub.request)
          })
        );
      })
    );
  });
}

export function* websocketSaga() {
  yield all([
    websocketConnect(),
    websocketReconnectOnClose(),
    websocketPing(),
    websocketHandleSubscriptions(),
    websocketResubscribe(),
    websocketIncomingMessage()
  ]);
}
