import { ActionsObservable, combineEpics, StateObservable } from 'redux-observable';
import { ActionTypes as Connection } from '../../../authentication/actions/connection';
import { ActionTypes as Authentication } from '../../../authentication/actions/authentication';
import { StompService, StompClient } from '../../../main/services/stompService';
import { State } from '../../../main/reducers/rootReducer';
import { receiveMessage, receiveConcatMessage } from '../actions/messenger';
import * as Rx from 'rxjs';
import { timer } from 'rxjs';
import { filter, switchMap, map, catchError, takeUntil, delay } from 'rxjs/operators';

const stompService = new StompService(StompClient);

export const errorMessageListener: any = (
  actions$: ActionsObservable<any>,
  state: StateObservable<State>
) => {
  return actions$.pipe(
    filter(action => action.type === Connection.CONNECTION_SUCCESS || action.type === Connection.RECONNECTION_SUCCESS),
    switchMap(() => {
      return stompService
        .subscribe('/user/topic/im/error').pipe(
          map((content: any) => {
            let message;
            if (content.message) {
              message = content.message;
            } else {
              message = content.name;
            }
            return receiveMessage(content.correlationId, message, true, content.translated, content.translationParams);
          }),
          catchError(error => {
            return Rx.of(null);
          }),
          takeUntil(actions$.pipe(filter(action => action.type === Authentication.AUTHENTICATION_LOGOUT)))
        );
    })
  );
};

export const messageListener: any = (
  actions$: ActionsObservable<any>,
  state: StateObservable<State>
) => {
  return actions$.pipe(
    filter(action => action.type === Connection.CONNECTION_SUCCESS ||  action.type === Connection.RECONNECTION_SUCCESS),
    switchMap(() => {
      return stompService
        .subscribe('/user/topic/im/ack').pipe(
          map((content: any) => {
            return receiveMessage(content.correlationId, content.message, false);
          }),
          catchError(error => {
            return Rx.of(receiveMessage('', error, true));
          }),
          takeUntil(actions$.ofType(Authentication.AUTHENTICATION_LOGOUT))
        );
    })
    );
};

export const concatMessageListener: any = (
  actions$: ActionsObservable<any>,
  state: StateObservable<State>
) => {
  return actions$.pipe(
    filter(action => action.type === Connection.CONNECTION_SUCCESS ||  action.type === Connection.RECONNECTION_SUCCESS),
    switchMap(() => {
      return stompService
        .subscribe('/user/topic/im/ack/concat').pipe(
          map((content: any) => {
            return receiveConcatMessage(content.acknowledgementResponses, false);
          }),
          catchError(error => {
            return Rx.of(receiveMessage('', error, true));
          }),
          takeUntil(actions$.ofType(Authentication.AUTHENTICATION_LOGOUT))
        );
    })
    );
};

export const messengerEpic = combineEpics(
  messageListener,
  concatMessageListener,
  errorMessageListener
);
