import { Injectable } from '@angular/core';
import { EnvironmentService } from '@services/environment.service';
import { Observable, BehaviorSubject, of, throwError, combineLatest } from 'rxjs';
import { ResilientHttpService } from '@services/http/resilient-http.service';
import {
  ActivityStream,
  ConfigDataActivityResource,
  ActivityResource,
  ActivityBatch,
  Activity,
  ActivityActionType,
  ConfigDataAction,
  ActivitySubmission,
  ActivityProcessEntry,
  ActivityProcess,
  ConfigActivityEventChannels,
  ActivityEventMessage,
  ConfigDataActivityEvent,
  ConfigBinaryAction,
  ActivityEventMessageType,
  StreamItem,
} from '@models/activities-events/activities';
import { map, mergeMap, catchError, take, switchMap, filter, tap } from 'rxjs/operators';

import { DataElementType } from '@constants/data-element-types';
import { Config } from '@models/fabrication/config';
import { Store } from '@ngrx/store';
import { FDMState } from '@store/reducers';
import { AuthService } from '@services/auth.service';
import {
  getReferenceSelector,
  selectAllConfigs,
  selectConfigByUrn,
  selectCurrentConfig,
} from '@store/selectors/configs.selectors';
import { LoggingService } from '@services/logging.service';
import { groupBy, uniq } from 'lodash';
import { HttpErrorResponse } from '@angular/common/http';
import { ApiError } from '@models/errors/api-error';
import { v4 as uuidv4 } from 'uuid';
import Pubnub from 'pubnub';
import { CacheService } from '@services/cache.service';
import { UpsertRealTimeActivityMarker } from '@store/actions/activity-real-time.action';
import { NotificationService } from '@services/notification.service';
import { NotificationType } from '@models/notification/notification';
import { BroadcastChannel } from 'broadcast-channel';
import { RemoveConfigDataTypeLoadingState } from '@store/actions/loading-data.action';
import { LocalisationConstants as LC } from '@constants/localisation-constants';
import { TranslateService } from '@ngx-translate/core';
import { StorageFileType } from '@models/fabrication/files';
import { BinaryJobType } from '@models/binary-task/binary-job-type';
import { CookieService } from 'ngx-cookie-service';
import { FinaliseUpgradePart } from '@store/actions/part.action';
import { DataElementTypeUtils } from '@utils/data-element-type-utils';
import { SetBinaryTaskData } from '@store/actions/application.action';
import { ForgeContentService } from '@services/forge-content.service';
import { ForgeContentDataElement } from '@models/forge-content/forge-content-data-element';
import { BinaryTaskAction } from '@services/binary-task-manager.service';

// todo: clean console.logs for debug messages
// todo: make channel subscribtions update when new config is dynamically added
// todo: xlate notification text (when localisation branch is in master)

enum BroadcastChannelMessageType {
  DismissNotification = 'DismissNotification',
}

@Injectable({
  providedIn: 'root',
})
export class ActivityService {
  namespace: string = null;
  generator: string = null;
  baseUrl: string = null;
  // private userId: string = null;
  streamGroupBaseUrl: string = null;
  errorStreamBaseUrl: string = null;
  activityNotificationChannels: ConfigActivityEventChannels = {};
  // sync same browser tabs with updated activities
  private _browserTabSyncChannel = new BroadcastChannel<Pubnub.PublishParameters>(
    'browser-tab-sync-channel'
  );
  // private fabdmUserStreamType = 'user';
  supportsRealTimeNotifications = false;
  readonly fabdmConfigStreamType = 'config';
  readonly generatorPostfix = '-generator';
  readonly apiName = 'forge-activity';
  readonly activityLimit = 10;
  pubNub: Pubnub = null;

  constructor(
    private envService: EnvironmentService,
    private resilientHttpService: ResilientHttpService,
    private store$: Store<FDMState>,
    private authService: AuthService,
    private loggingService: LoggingService,
    private cacheService: CacheService,
    private notificationService: NotificationService,
    private translate: TranslateService,
    private cookieService: CookieService,
    private forgeContentService: ForgeContentService
  ) {
    this.namespace = `${this.envService.environment.activity.activityNamespace}`;
    this.baseUrl = this.envService.environment.activity.activityUrl;
    // this.userId = this.authService.currentUserData.userId;
    this.streamGroupBaseUrl = `${this.baseUrl}namespaces/${this.namespace}/streamgroups/`;
    this.errorStreamBaseUrl = `${this.streamGroupBaseUrl}binary-errors/streams/${this.fabdmConfigStreamType}/`;
    this.generator = `${this.namespace}${this.generatorPostfix}`;
    this.supportsRealTimeNotifications = this.cacheService.isCacheSupported();
    if (this.supportsRealTimeNotifications) {
      this.setupPubNub();
      this.setupBrowserTabSyncChannels();
    }
  }

  /**
   * Load the event channels for the application to listen for changes to available configs
   * @param {string[]} configIds
   * @returns {Observable<boolean>}
   * @memberof ActivityService
   */
  public listenToConfigActivityChannels(configs: Config[]): boolean {
    if (!this.supportsRealTimeNotifications) {
      return true;
    }
    if (!configs?.length) {
      return true;
    }

    let validChannelCount = 0;
    configs.forEach((x) => {
      if (x.notificationChannelId?.length) {
        this.activityNotificationChannels[x.id] = x.notificationChannelId;
        validChannelCount++;
      }
    });

    if (!validChannelCount) return false;

    this.subscribeToNotificationChannelMessages();

    return configs.length === validChannelCount;
  }

  /**
   * Load the error activity stream items for specified config from separate binary-errors stream
   * @param {string} configId
   * @returns {Observable<Activity<ConfigDataActivityResource>[]>}
   * @memberof ActivityService
   */
  public loadConfigErrorActivityStreamItems(
    configId: string
  ): Observable<StreamItem<ConfigDataActivityResource>[]> {
    const url = `${this.errorStreamBaseUrl}${configId}`;
    const userId = this.authService.currentUserData?.userId;

    return this.getActivityStream<ConfigDataActivityResource>(url).pipe(
      map((stream) => stream.streamItems.filter((x) => x.activity.actor.dataUserId === userId)),
      catchError((error: HttpErrorResponse) => {
        const apiError: ApiError = this.handleActivityError(error, null);
        console.error('Failed to get error activities for config', apiError);
        return of([]);
      })
    );
  }

  private dismissErrorActivity(activityId: string, configId: string): void {
    const url = `${this.errorStreamBaseUrl}${configId}/items/${encodeURIComponent(activityId)}`;

    this.deleteActivity(url).pipe(
      catchError((error: HttpErrorResponse) => {
        const apiError: ApiError = this.handleActivityError(error, null);
        console.error('Failed to dismiss error activity for config', apiError);
        return of(null);
      })
    );
  }

  /**
   * Get the initial activity id to use as a reference point to determine subsequent activities
   * to process.
   * Note - this should only be called at the point that the data has just been cached. Any failures
   * will result in the cache entries for the data type being removed.
   * @param {string} configUrn
   * @param {DataElementType} dataType
   * @returns {Observable<string>}
   * @memberof ActivityService
   */
  public loadDataActivitityStartingId(
    configUrn: string,
    dataType: DataElementType
  ): Observable<string> {
    if (!this.cacheService.isCacheSupported()) {
      this.cacheService.printCacheNotSupportedMessage();
      return of(null);
    }
    const url = `${this.streamGroupBaseUrl}${dataType.toLowerCase()}/streams/${
      this.fabdmConfigStreamType
    }/${configUrn}?limit=1`;

    // todo: clear data cache if fails
    return this.getActivityStream<ConfigDataActivityResource>(url).pipe(
      map((stream: ActivityStream<ConfigDataActivityResource>) => {
        // todo: make 'not-found' as app wide constant
        return stream.streamItems.length ? stream.streamItems[0].id : 'not-found';
      }),
      catchError((error: HttpErrorResponse) => {
        const apiError: ApiError = this.handleActivityError(error, dataType);
        console.log(
          `${this.getActivityLogPrefix(
            dataType
          )}Failed to get the starting activity id, forcing cache removal`,
          apiError
        );
        return of('not-found');
      })
    );
  }

  /**
   * Get fabdm config data activities
   * e.g. data added, updated or deleted
   * @param {string} configUrn
   * @returns {Observable<ActivityStream>}
   * @memberof ActivityService
   */
  public getDataActivities(
    configUrn: string,
    dataType: DataElementType,
    lastActivityIdProcessed: string,
    activitySubmissionId: string
  ): Observable<ActivityProcess> {
    // loop and get activities up until the specified date range is reached
    // then filter for activities that have not been submitted using the current
    // activitySubmissionId
    const activityStreamSubject: BehaviorSubject<void> = new BehaviorSubject<void>(null);
    let nextToken = '';
    const requiresNextToken = () => (nextToken ? `&token=${nextToken}` : '');
    const activityProcessEntries: ActivityProcessEntry[] = [];
    const lastActivityIdProcessedExists = lastActivityIdProcessed !== 'not-found';
    let nextActivityIdProcessed = '';

    console.log(
      `${this.getActivityLogPrefix(
        dataType
      )}Filtering activities on or after activity id: "${lastActivityIdProcessed}" with submission ids that are not ${activitySubmissionId}`
    );
    console.log(
      `${this.getActivityLogPrefix(
        dataType
      )}Last activity processed id exists: ${lastActivityIdProcessedExists}`
    );

    return activityStreamSubject.pipe(
      switchMap(() => {
        const url = `${this.streamGroupBaseUrl}${dataType.toLowerCase()}/streams/${
          this.fabdmConfigStreamType
        }/${configUrn}?limit=${this.activityLimit}${requiresNextToken()}`;
        console.log(`${this.getActivityLogPrefix(dataType)}Getting activities using: ${url}`);
        return this.getActivityStream<ConfigDataActivityResource>(url).pipe(
          map((stream: ActivityStream<ConfigDataActivityResource>) => {
            console.log(
              `${this.getActivityLogPrefix(dataType)}Activity stream for ${dataType.toLowerCase()}`,
              stream
            );
            let isComplete = true;
            if (stream.streamItems.length) {
              let streamItemsWithinRange = stream.streamItems;
              // set the id of first (most recent activity in the stream) as a marker point
              // for the next time activities are queried for the data type
              // set this regardless of how many times we need to call back to get activities
              nextActivityIdProcessed =
                (!nextActivityIdProcessed && streamItemsWithinRange[0].id) ||
                nextActivityIdProcessed;
              // when the data type record was created no activities existed
              // make sure we process all found in stream
              if (lastActivityIdProcessedExists) {
                const lastActivityProcessedIndex = stream.streamItems.findIndex(
                  (x) => x.id === lastActivityIdProcessed
                );

                // if the last processed id is found in the activity stream
                // then get the items that appear above it for processing
                if (lastActivityProcessedIndex !== -1) {
                  // find all activities above the previously processed id
                  streamItemsWithinRange =
                    lastActivityProcessedIndex === 0
                      ? []
                      : streamItemsWithinRange.slice(0, lastActivityProcessedIndex);

                  console.log(
                    `${this.getActivityLogPrefix(
                      dataType
                    )}Activities found after last processed id: ${lastActivityIdProcessed}`,
                    streamItemsWithinRange
                  );
                } else {
                  nextToken = stream.nextToken;
                  isComplete = false;
                }
              } else {
                // kepp getting activities until end of stream reached
                if (streamItemsWithinRange.length === this.activityLimit && stream.nextToken) {
                  nextToken = stream.nextToken;
                  isComplete = false;
                }
              }

              const processEntries: ActivityProcessEntry[] = streamItemsWithinRange
                .filter((x) => x.activity.object.submissionId !== activitySubmissionId)
                .map((x) => ({
                  configUrn,
                  dataType,
                  urn: x.activity.object.data.urn,
                  activityType: this.getDataActivityActionTypeFromVerb(
                    x.activity.verb as ConfigDataAction | ConfigBinaryAction
                  ),
                  isBinaryUpdateOperation: x.activity.verb === 'config-binary-updated',
                }));
              activityProcessEntries.push(...processEntries);
            } else {
              // no activities in stream, maintain last processed activity id (this could still be 'not-found')
              nextActivityIdProcessed = lastActivityIdProcessed;
            }

            return isComplete;
          })
        );
      }),
      tap((complete: boolean) => {
        // make sure we call for the next set of activities
        //
        if (!complete) {
          activityStreamSubject.next();
        }
      }),
      filter((complete: boolean) => complete),
      take(1),
      map(() => {
        console.log(
          `${this.getActivityLogPrefix(dataType)}Filtered activity count = ${
            activityProcessEntries.length
          }`
        );
        const activityProcess: ActivityProcess = {
          lastActivityIdProcessed: nextActivityIdProcessed,
          entries: activityProcessEntries.length
            ? this.filterActivityPrecedence(activityProcessEntries, dataType)
            : activityProcessEntries,
        };
        console.log(
          `${this.getActivityLogPrefix(dataType)}ActivityProcess created`,
          activityProcess
        );

        return activityProcess;
      }),
      catchError((err: any) => throwError(() => err))
    );
  }

  private getActivityLogPrefix = (dataType: DataElementType): string =>
    `Activities-Log (DataType:${dataType}): `;

  private filterActivityPrecedence(
    entries: ActivityProcessEntry[],
    dataType: DataElementType
  ): ActivityProcessEntry[] {
    // delete takes precedence
    // if entry for a urn contains add and delete, remove and do not process (we should have never ingested this entry so ignore)
    const groupedActivities = groupBy(entries, 'urn');
    const filteredEntries: ActivityProcessEntry[] = [];
    console.log(
      `${this.getActivityLogPrefix(dataType)}Applying precedence to activity entries`,
      entries
    );

    Object.keys(groupedActivities).forEach((key: string) => {
      const groupEntries = groupedActivities[key];

      if (groupEntries.length > 1) {
        // get unique entries, we could have multiples of update actions
        // we only need one
        const uniqueActivityActions: ActivityProcessEntry[] = uniq(groupEntries);
        const containsAdd = !!uniqueActivityActions.find(
          (x) => x.activityType === ActivityActionType.Add
        );
        const containsUpdate = !!uniqueActivityActions.find(
          (x) => x.activityType === ActivityActionType.Update
        );
        const containsDelete = !!uniqueActivityActions.find(
          (x) => x.activityType === ActivityActionType.Delete
        );

        // todo: cleaner way to determine Precedence
        // add and update, no delete only need add action
        if (containsAdd && containsUpdate && !containsDelete) {
          // only need add activity
          filteredEntries.push(
            ...groupEntries.filter((x) => x.activityType === ActivityActionType.Add)
          );
        } else if (!containsAdd && containsUpdate && !containsDelete) {
          // only need one update activity if multiples found
          filteredEntries.push(
            groupEntries.find((x) => x.activityType === ActivityActionType.Update)
          );
        } else if (!containsAdd && containsUpdate && containsDelete) {
          // only need one update activity if multiples found
          filteredEntries.push(
            groupEntries.find((x) => x.activityType === ActivityActionType.Delete)
          );
        } else if (containsAdd && containsDelete) {
          console.log(
            `${this.getActivityLogPrefix(dataType)}Ignoring activities, contains add and delete`
          );
        }
      } else {
        filteredEntries.push(...groupEntries);
      }
    });

    console.log(
      `${this.getActivityLogPrefix(dataType)}Applied precedence to activity entries`,
      filteredEntries
    );
    return filteredEntries;
  }

  private getDataActivityActionTypeFromVerb(
    verb: ConfigDataAction | ConfigBinaryAction
  ): ActivityActionType {
    let actionType: ActivityActionType;
    switch (verb) {
      case 'config-data-added':
      case 'config-binary-added':
        actionType = ActivityActionType.Add;
        break;
      case 'config-data-updated':
      case 'config-binary-updated':
        actionType = ActivityActionType.Update;
        break;
      case 'config-data-deleted':
      case 'config-binary-deleted':
        actionType = ActivityActionType.Delete;
        break;
      default:
        break;
    }

    return actionType;
  }

  /**
   * Submit batch of actitites for data type operations that have run on a config.
   *
   * @param {Config} config
   * @param {UserData} userData
   * @param {DataElementType} dataType
   * @param {string} activitySubmissionId
   * @param {{ activityType: ActivityActionType; urn: string; name: string }[]} data
   * @returns {Observable<boolean>}
   * @memberof ActivityService
   */
  public submitDataActivities(submission: ActivitySubmission): Observable<boolean> {
    if (!submission?.entries.length) {
      return of(false);
    }

    return this.store$.select(selectAllConfigs).pipe(
      take(1),
      mergeMap((configs: Config[]) => {
        const config = configs.find((x) => x.id === submission.configUrn);
        const userData = this.authService.currentUserData;
        const configName = config.name;
        const configId = config.id;
        const userId = userData.userId;
        const userName = `${userData.firstName} ${userData.lastName}`;
        const publishedDateTime = new Date().toISOString();
        const activitySubmissionId = submission.activitySubmissionId;
        const dataType = submission.dataType;
        // notification
        const notification: ActivityEventMessage<ConfigDataActivityEvent> = {
          messageType: 'data',
          configUrn: configId,
          dataType,
          activitySubmissionId,
          messages: [],
        };

        const batch: ActivityBatch<ConfigDataActivityResource> = {
          activities: [],
        };

        submission.entries.forEach((entry) => {
          let verb: ConfigDataAction;
          switch (entry.activityType) {
            case ActivityActionType.Add:
              verb = 'config-data-added';
              break;
            case ActivityActionType.Update:
              verb = 'config-data-updated';
              break;
            case ActivityActionType.Delete:
              verb = 'config-data-deleted';
              break;
            default:
              break;
          }
          const activity: Activity<ConfigDataActivityResource> = {
            published: publishedDateTime,
            generator: this.generator,
            actor: {
              id: userId,
              displayName: userName,
            },
            verb,
            object: {
              id: configId,
              displayName: configName,
              submissionId: activitySubmissionId,
              data: {
                id: dataType.toLowerCase(),
                displayName: entry.name,
                urn: entry.urn,
                nodeId: submission.nodeId,
              },
            },
          };

          const notificationMessage: ConfigDataActivityEvent = {
            activityType: entry.activityType,
            urn: entry.urn,
            name: entry.name,
          };

          batch.activities.push(activity);
          notification.messages.push(notificationMessage);
        });

        const url = `${this.streamGroupBaseUrl}${dataType.toLowerCase()}/streams/${
          this.fabdmConfigStreamType
        }/${config.id}/items`;
        return this.postActivities<ConfigDataActivityResource>(url, batch).pipe(
          tap(() => {
            if (
              this.supportsRealTimeNotifications &&
              !submission.disableNotification &&
              this.activityNotificationChannels[configId]
            ) {
              this.postNotification(
                this.activityNotificationChannels[configId],
                notification,
                activitySubmissionId
              );
            }
          }),
          catchError((error: HttpErrorResponse) => {
            const apiError: ApiError = this.handleActivityError(error, dataType);
            console.log(
              `${this.getActivityLogPrefix(dataType)}Failed to submit the activity`,
              apiError
            );
            return of(false);
          })
        );
      })
    );
  }

  // submit pubnub message
  private postNotification<T>(
    channelId: string,
    notification: ActivityEventMessage<T>,
    activitySubmissionId: string
  ): void {
    // add meta data to pubnub message to enable streamFiltering
    // see - https://www.pubnub.com/docs/web-javascript/stream-filtering-tutorial
    // enables clients to not recieve their own messages
    const meta = {
      activitySubmissionId,
    };
    const params: Pubnub.PublishParameters = {
      message: {
        message: [{ notification }],
      },
      channel: channelId,
      meta,
    };
    this.pubNub.publish(params, (status: Pubnub.PubnubStatus, response: Pubnub.PublishResponse) =>
      console.log('Pubnub Publish status: ', status, response)
    );
    // publish across browser tabs, browser has the same cache activity submissionId
    // and will not receive pubnub data messages orginating from another tab
    // hence the use of braodcast channels
    console.log('Sending browser cross tab data notification message', params);
    this._browserTabSyncChannel.postMessage(params);
  }

  private setupBrowserTabSyncChannels = (): void => {
    this._browserTabSyncChannel.onmessage = (ev: Pubnub.PublishParameters) => {
      console.log('Received browser cross tab data notification message', ev);

      switch (ev.message.type) {
        case BroadcastChannelMessageType.DismissNotification:
          this.notificationService.hideNotification(ev.message.notificationId);
          break;
        default:
          this.parseActivityNotification({ message: ev.message }, true);
      }
    };
  };

  // intial pubnub setup i.e. assign keys, ids etc
  private setupPubNub(): void {
    combineLatest([this.authService.hasKeyData, this.authService.hasUserData])
      .pipe(
        filter((hasAuthData: [boolean, boolean]) => {
          const [keyData, userData] = hasAuthData;
          return keyData && userData;
        }),
        take(1)
      )
      .subscribe(() => {
        const { notificationsSubscribeKey, notificationsPublishKey, notificationsCipherKey } =
          this.authService.currentKeyData;
        const config: Pubnub.PubnubConfig = {
          subscribeKey: notificationsSubscribeKey,
          publishKey: notificationsPublishKey,
          cipherKey: notificationsCipherKey,
          uuid: this.authService.currentUserData.userId,
          ssl: true,
          keepAlive: true,
          useRandomIVs: false,
        };
        this.pubNub = new Pubnub(config);
      });
  }

  // setup pubnub subscription to channels
  private subscribeToNotificationChannelMessages(): void {
    const channels = Object.values(this.activityNotificationChannels);
    if (!channels?.length) return;

    const handleNotification = (messageEvent: Pubnub.MessageEvent) => {
      console.log('Pubnub Message: ', messageEvent);
      this.parseActivityNotification(messageEvent);
    };

    // add listeners
    this.pubNub.addListener({
      message: handleNotification,
    });

    // allows us to send mock notifications in our cypress tests
    if (this.cookieService.check('fabdm-e2e')) {
      window['mockReceiveNotification'] = handleNotification;
    }

    // subscribe
    // set filter to only recieve messages published using other activitySubmissionIds
    const filterExpression = `activitySubmissionId != '${this.cacheService.currentCacheIdentityRecord.activitySubmissionId}'`;
    this.pubNub.setFilterExpression(filterExpression);
    this.pubNub.subscribe({ channels });
  }

  // dispatch to store the message details
  // details can then be actioned when the data type is re-loaded
  // see /src/app/feature-modules/data/guards/base/data-element-list.guard.ts
  // todo: xlate notifications
  private parseActivityNotification(
    messageEvent: Partial<Pubnub.MessageEvent>,
    fromBrowserTab = false
  ): void {
    const messages: ActivityEventMessage<ConfigDataActivityEvent>[] =
      messageEvent.message.message.map((x) => x as ActivityEventMessage<ConfigDataActivityEvent>);

    messages.forEach((message: ActivityEventMessage<ConfigDataActivityEvent>, index: number) => {
      const contentItemIds = [...new Set(message.messages.map((x) => x.urn))];

      if (message.jobType === BinaryJobType.DataUpgrade) {
        message.messages.forEach((activityEvent: ConfigDataActivityEvent) => {
          this.handleDataUpgrades(message.configUrn, activityEvent.nodeId, activityEvent.urn);
        });
      }

      this.handleRealTimeActivities(message, contentItemIds, fromBrowserTab);

      // show notifications for data updates (but only show the first notification)
      if (index === 0) this.showNotifications(message);
    });
  }

  private handleDataUpgrades(configId: string, nodeId: string, contentItemId: string): void {
    // we only have part upgrades right now, but we will need to change this to something more
    // generic if we add more data types
    this.store$
      .select(selectConfigByUrn(configId))
      .pipe(take(1))
      .subscribe((config) => {
        this.store$.dispatch(
          new FinaliseUpgradePart({
            config,
            nodeId,
            partId: contentItemId,
          })
        );
      });
  }

  private showNotifications(message: ActivityEventMessage<ConfigDataActivityEvent>): void {
    const getErrorActivity$ = (config) =>
      this.loadConfigErrorActivityStreamItems(config.id).pipe(
        switchMap((streamItems) => streamItems),
        filter((item) => item.activity.object.submissionId === message.activitySubmissionId)
      );

    // binary updated shows only for user who made changes
    this.store$
      .select(selectConfigByUrn(message.configUrn))
      .pipe(
        take(1),
        switchMap((config) => {
          const errorActivity =
            message.messageType === 'error' ? getErrorActivity$(config) : of(null);

          return combineLatest([of(config), errorActivity]).pipe(take(1));
        })
      )
      .subscribe(([config, errorActivity]) => {
        let notificationText = '';
        const configName = config.name;

        const dataTypeMessageStr = this.translate.instant(
          LC.DATATYPES.TYPES[
            DataElementTypeUtils.getLocalisationConstantRef(message.dataType, false)
          ]
        );

        const userId = this.authService.currentUserData.userId;

        if (errorActivity && userId === message.dataUserId) {
          this.showActivityErrorNotification(
            errorActivity,
            config.id,
            message.dataAction,
            dataTypeMessageStr,
            message.dataName
          );
        } else if (message.messageType === 'binary' && userId === message.dataUserId) {
          // only show 1 notification for binary updates if its the current user
          const completeOperation = this.actionTypeFriendlyName(
            message.dataAction,
            message.jobType
          );
          notificationText = this.translate.instant(LC.NOTIFICATIONS.MSG_DATA_ELEMENT_SUCCESS, {
            dataTypeMessageStr,
            completeOperation,
          });

          // post notification to top nav and toast
          this.notificationService.showFlyoutNotification({
            message: notificationText,
            type: NotificationType.Success,
            showToast: true,
          });
        } else if (message.messageType !== 'binary' && userId !== message.dataUserId) {
          // show a notification for each non-binarry update if not the current user
          message.messages.forEach((x) => {
            const completeOperation = this.actionTypeFriendlyName(x.activityType, message.jobType);
            notificationText = this.translate.instant(
              LC.NOTIFICATIONS.MSG_DATA_ELEMENT_SUCCESS_FLYOUT,
              {
                configName,
                dataTypeMessageStr,
                completeOperation,
                name: x.name,
              }
            );
            // post notification to top nav and toast
            this.notificationService.showFlyoutNotification({
              message: notificationText,
              type: NotificationType.Info,
              showToast: true,
            });
          });
        }
      });
  }

  private retryBinaryUpdateTask(
    streamItem: StreamItem<ConfigDataActivityResource>,
    contentItem: ForgeContentDataElement
  ): void {
    const activityData = streamItem.activity.object.data;
    const { action, oldPath, externalIds, updatedReferenceContentIds } = activityData;

    return this.store$.dispatch(
      new SetBinaryTaskData({
        action: action.toLowerCase() as BinaryTaskAction,
        entityId: contentItem.id,
        entityType: 'content',
        schemaId: contentItem.extensionDataType,
        jobType: BinaryJobType.BinaryUpdate,
        externalIds: externalIds ? externalIds.split(',') : [],
        updatedReferenceContentIds: updatedReferenceContentIds
          ? updatedReferenceContentIds.split(',')
          : [],
        oldPath,
      })
    );
  }

  public showActivityErrorNotification(
    activityStreamItem: StreamItem<ConfigDataActivityResource>,
    configId: string,
    action: string,
    dataTypeMessageStr: string,
    dataName: string
  ) {
    const { id, activity, submitted: timestamp } = activityStreamItem;
    const dataType = activity.object.data.dataType;
    const failedOperation = this.actionTypeFriendlyName(
      action.toLowerCase(),
      BinaryJobType.BinaryUpdate,
      'error'
    );
    const notificationText = this.translate.instant(LC.NOTIFICATIONS.MSG_DATA_ELEMENT_ERROR, {
      dataTypeMessageStr,
      failedOperation,
    });
    const description = this.translate.instant(LC.NOTIFICATIONS.MSG_DATA_ELEMENT_ERROR_DETAILS, {
      entity: dataName,
    });
    const onDismiss = () => {
      this.dismissErrorActivity(id, configId);
      this.notificationService.hideNotification(id);
      this._browserTabSyncChannel.postMessage({
        message: {
          type: BroadcastChannelMessageType.DismissNotification,
          notificationId: id,
        },
      });
    };
    const withLink = {
      text: this.translate.instant(LC.ERROR_HANDLING.GENERIC.RETRY),
      action: () => {
        combineLatest([
          this.store$.select(selectCurrentConfig),
          this.store$.select(getReferenceSelector(dataType)),
        ])
          .pipe(
            take(1),
            switchMap(([config, referenceList]) => {
              const entity = referenceList.find((ref) => ref.id === activity.object.data.id);

              // No need to request content item if it is already in store.
              // This happens when the user has already navigated to the search table for that data type.
              // If user opens FDM on Config page, for example, data type is missing in store.
              if (entity) {
                return of(entity);
              }

              return this.forgeContentService.getContentItem(
                config,
                activity.object.data.id,
                false,
                false
              );
            }),
            take(1)
          )
          .subscribe((contentItem) => {
            onDismiss();
            return this.retryBinaryUpdateTask(activityStreamItem, contentItem);
          });
      },
    };

    this.notificationService.hideNotification(id);
    this.notificationService.showFlyoutNotification({
      id,
      message: notificationText,
      description,
      type: NotificationType.Error,
      autoDismiss: false,
      showToast: true,
      timestamp,
      withLink,
      onDismiss,
    });
  }

  private handleRealTimeActivities(
    message: ActivityEventMessage<ConfigDataActivityEvent>,
    contentItemIds: string[],
    fromBrowserTab: boolean
  ): void {
    // add marker so when browsing to the data type in the specified config we know to re-check for activities
    const activityDataTypes = this.getActivityDataTypes(message);
    if (fromBrowserTab) {
      // if message from another browser tab
      // simply remove the data type's loaded status
      // to ensure new cache data is pulled into the store
      this.store$
        .select(selectConfigByUrn(message.configUrn))
        .pipe(
          take(1),
          tap((config: Config) => {
            activityDataTypes.forEach((x) => {
              if (x === DataElementType.Part) {
                // handle cross browser parts slightly differently, mark the nodeid that has been updated
                this.store$.dispatch(
                  new UpsertRealTimeActivityMarker({
                    configUrn: message.configUrn,
                    dataType: x,
                    partNodeId: message.messages[0]?.nodeId,
                    jobType: message.jobType,
                    contentItemIds,
                  })
                );
              } else {
                this.store$.dispatch(
                  new RemoveConfigDataTypeLoadingState({
                    configId: config.externalId,
                    dataType: x,
                  })
                );
              }
            });
          })
        )
        .subscribe();
    } else {
      activityDataTypes.forEach((x) => {
        let activityMarkerSent = false;
        if (x === DataElementType.Part) {
          const nodeIds = Array.from(
            new Set<string>(message.messages.map((y) => y.nodeId).filter((y) => !!y))
          );

          if (nodeIds.length) {
            nodeIds.forEach((nodeId) => {
              this.store$.dispatch(
                new UpsertRealTimeActivityMarker({
                  configUrn: message.configUrn,
                  dataType: x,
                  partNodeId: nodeId,
                  jobType: message.jobType,
                  contentItemIds,
                })
              );
            });
            activityMarkerSent = true;
          }
        }

        if (!activityMarkerSent) {
          this.store$.dispatch(
            new UpsertRealTimeActivityMarker({
              configUrn: message.configUrn,
              dataType: x,
              partNodeId: x === DataElementType.Part ? message.messages[0]?.nodeId : null,
              jobType: message.jobType,
              contentItemIds,
            })
          );
        }
      });
    }
  }

  private getActivityHeaders = () => ({
    'x-ads-region': 'US',
  });

  private getActivityStream<T extends ActivityResource>(
    url: string
  ): Observable<ActivityStream<T>> {
    const bypassSwQuery = 'ngsw-bypass=true';
    const queryParams = url.includes('?') ? `&${bypassSwQuery}` : `?${bypassSwQuery}`;
    return this.resilientHttpService.get<ActivityStream<T>>(`${url}${queryParams}`, {
      headers: this.getActivityHeaders(),
    });
  }

  private postActivities<T extends ActivityResource>(
    url: string,
    activities: ActivityBatch<T>
  ): Observable<any> {
    return this.resilientHttpService.post<ActivityBatch<T>>(url, {
      data: activities,
      headers: this.getActivityHeaders(),
    });
  }

  private deleteActivity(url: string): Observable<any> {
    return this.resilientHttpService.delete(url, {
      headers: this.getActivityHeaders(),
    });
  }

  private handleActivityError(httpError: HttpErrorResponse, dataType: DataElementType): ApiError {
    const error: ApiError = {
      correlationId: uuidv4(),
      status: httpError.status,
      statusText: httpError.statusText,
      errors: [httpError],
      failedAt: new Date().toISOString(),
      dataType,
      apiName: this.apiName,
    };

    this.loggingService.logError(error);
    return error;
  }

  private getActivityDataTypes(
    message: ActivityEventMessage<ConfigDataActivityEvent>
  ): DataElementType[] {
    const dataTypes: DataElementType[] = [];

    message.messages.forEach((activityEvent: ConfigDataActivityEvent) => {
      const dataType =
        activityEvent.binaryFileType === StorageFileType.DbFile
          ? DataElementType.DBFile
          : message.dataType;

      dataTypes.push(dataType);
    });

    return [...new Set(dataTypes)];
  }

  private actionTypeFriendlyName(
    actionType: ActivityActionType | string,
    jobType: BinaryJobType,
    messageType?: ActivityEventMessageType
  ): string {
    if (jobType === BinaryJobType.DataUpgrade) {
      this.translate.instant(LC.NOTIFICATIONS.COMPLETE_OPERATIONS.UPGRADED);
    }

    const isErrorMessage = messageType === 'error';
    let friendlyName = '';
    switch (actionType) {
      case ActivityActionType.Add:
      case 'add':
        friendlyName = isErrorMessage
          ? this.translate.instant(LC.NOTIFICATIONS.COMPLETE_OPERATIONS.ADD)
          : this.translate.instant(LC.NOTIFICATIONS.COMPLETE_OPERATIONS.ADDED);
        break;
      case ActivityActionType.Update:
      case 'update':
        friendlyName = isErrorMessage
          ? this.translate.instant(LC.NOTIFICATIONS.COMPLETE_OPERATIONS.UPDATE)
          : this.translate.instant(LC.NOTIFICATIONS.COMPLETE_OPERATIONS.UPDATED);
        break;
      case ActivityActionType.Delete:
      case 'delete':
        friendlyName = isErrorMessage
          ? this.translate.instant(LC.NOTIFICATIONS.COMPLETE_OPERATIONS.DELETE)
          : this.translate.instant(LC.NOTIFICATIONS.COMPLETE_OPERATIONS.DELETED);
        break;
      case 'copy':
        friendlyName = isErrorMessage
          ? this.translate.instant(LC.NOTIFICATIONS.COMPLETE_OPERATIONS.COPY)
          : this.translate.instant(LC.NOTIFICATIONS.COMPLETE_OPERATIONS.COPIED);
        break;
      default:
        break;
    }
    return friendlyName;
  }
}
