From 8b6c321e8ea01872d1049ccf7879e7c2573fbbae Mon Sep 17 00:00:00 2001 From: Jason Kulatunga Date: Sun, 10 Sep 2023 09:19:59 -0700 Subject: [PATCH] consolidate event bus methods in EventBusService. --- .../medical-sources-connected.component.ts | 2 +- .../src/app/services/event-bus.service.ts | 93 ++++++++++++++++--- .../src/app/services/fasten-api.service.ts | 25 +---- 3 files changed, 87 insertions(+), 33 deletions(-) diff --git a/frontend/src/app/components/medical-sources-connected/medical-sources-connected.component.ts b/frontend/src/app/components/medical-sources-connected/medical-sources-connected.component.ts index ab735bab..2334e1c2 100644 --- a/frontend/src/app/components/medical-sources-connected/medical-sources-connected.component.ts +++ b/frontend/src/app/components/medical-sources-connected/medical-sources-connected.component.ts @@ -66,7 +66,7 @@ export class MedicalSourcesConnectedComponent implements OnInit { .then(console.log) } - this.eventBusService.eventBusSourceSyncMessages.subscribe((event) => { + this.eventBusService.SourceSyncMessages.subscribe((event) => { this.status[event.source_id] = "token" }) diff --git a/frontend/src/app/services/event-bus.service.ts b/frontend/src/app/services/event-bus.service.ts index 14c0f01e..707b6991 100644 --- a/frontend/src/app/services/event-bus.service.ts +++ b/frontend/src/app/services/event-bus.service.ts @@ -7,19 +7,28 @@ import {ToastService} from './toast.service'; import {Event} from '../models/events/event'; import {EventSourceComplete} from '../models/events/event_source_complete'; import {EventSourceSync} from '../models/events/event_source_sync'; +import {GetEndpointAbsolutePath} from '../../lib/utils/endpoint_absolute_path'; +import {environment} from '../../environments/environment'; +import {fetchEventSource} from '@microsoft/fetch-event-source'; @Injectable({ providedIn: 'root' }) export class EventBusService { - eventBusSubscription: Subscription | undefined; - eventBusSourceSyncMessages: Subject = new Subject(); - eventBusSourceCompleteMessages: Subject = new Subject(); + + //stores a reference to the event stream observable, which we can listen to for events + private eventBus: Observable | undefined; + //stores a reference to the event bus abort controller, which we can use to abort the event bus connection + private eventBusAbortController: AbortController | undefined; + //stores a reference to the event bus observable subscription, which we can use to unsubscribe from the event bus + private eventBusSubscription: Subscription | undefined; + + public SourceSyncMessages: Subject = new Subject(); + public SourceCompleteMessages: Subject = new Subject(); constructor( public router: Router, public authService: AuthService, - public fastenApiService: FastenApiService, public toastService: ToastService ) { @@ -28,21 +37,83 @@ export class EventBusService { this.authService.IsAuthenticatedSubject.subscribe((isAuthenticated) => { console.log("isAuthenticated changed:", isAuthenticated) if(isAuthenticated){ - this.eventBusSubscription = this.fastenApiService.listenEventBus().subscribe((event: Event | EventSourceSync | EventSourceComplete)=>{ + this.eventBusSubscription = this.listenEventBus().subscribe((event: Event | EventSourceSync | EventSourceComplete)=>{ console.log("eventbus event:", event) //TODO: start toasts. if(event.event_type == "source_sync"){ - this.eventBusSourceSyncMessages.next(event as EventSourceSync) + this.SourceSyncMessages.next(event as EventSourceSync) } else if(event.event_type == "source_complete"){ - this.eventBusSourceCompleteMessages.next(event as EventSourceComplete) + this.SourceCompleteMessages.next(event as EventSourceComplete) } }) } else { - //no longer authenticated, unsubscribe from eventbus - if(this.eventBusSubscription){ - this.eventBusSubscription.unsubscribe() - } + //no longer authenticated, unsubscribe from eventbus and abort/terminate connection + this.abortEventBus() } }); } + + //Make sure we an cancel the event bus connection & subscription, resetting to a clean state. + abortEventBus() { + if(this.eventBusAbortController){ + try { + this.eventBusAbortController.abort() + } catch (e) { + console.log("ignoring, error aborting event bus:", e) + } + } + if(this.eventBusSubscription){ + try { + this.eventBusSubscription.unsubscribe() + } catch (e) { + console.log("ignoring, error unsubscribing from event bus:", e) + } + } + + this.eventBus = null + this.eventBusAbortController = null + this.eventBusSubscription = null + } + + //Listen to the event bus, and return an observable that we can subscribe to. + //this method uses the fetch-event-source library, which is a polyfill for the EventSource API (which does not support Authorization Headers) + // + listenEventBus(): Observable { + //this is a singleton, so if we already have an event bus, return it. + + if(this.eventBus){ + return this.eventBus + } + + let serviceThis = this; + let eventStreamUrl = `${GetEndpointAbsolutePath(globalThis.location, environment.fasten_api_endpoint_base)}/secure/events/stream` + this.eventBusAbortController = new AbortController(); + this.eventBus = new Observable(observer => { + fetchEventSource(eventStreamUrl, { + method: 'GET', + headers: { + 'Authorization': `Bearer ${this.authService.GetAuthToken()}` + }, + onmessage(ev) { + observer.next(JSON.parse(ev.data)); + }, + onerror(event) { + observer.error(event) + //don't retry, just close the stream + observer.complete() + throw new Error('EventBus error: ' + event); + }, + onclose(){ + // if the server closes the connection unexpectedly, retry: + serviceThis.abortEventBus() + }, + signal: this.eventBusAbortController.signal, + }).then( + () => observer.complete(), + error => observer.error(error) + ) + }); + return this.eventBus + } + } diff --git a/frontend/src/app/services/fasten-api.service.ts b/frontend/src/app/services/fasten-api.service.ts index 5866b914..ad6d911b 100644 --- a/frontend/src/app/services/fasten-api.service.ts +++ b/frontend/src/app/services/fasten-api.service.ts @@ -29,6 +29,9 @@ import { fetchEventSource } from '@microsoft/fetch-event-source'; }) export class FastenApiService { + private _eventBus: Observable + private _eventBusAbortController: AbortController + constructor(@Inject(HTTP_CLIENT_TOKEN) private _httpClient: HttpClient, private router: Router, private authService: AuthService) { } @@ -55,27 +58,7 @@ export class FastenApiService { SECURE ENDPOINTS */ - listenEventBus(): Observable { - let eventStreamUrl = `${GetEndpointAbsolutePath(globalThis.location, environment.fasten_api_endpoint_base)}/secure/events/stream` - - return new Observable(observer => { - fetchEventSource(eventStreamUrl, { - method: 'GET', - headers: { - 'Authorization': `Bearer ${this.authService.GetAuthToken()}` - }, - onmessage(ev) { - observer.next(JSON.parse(ev.data)); - }, - onerror(event) { - observer.error(event) - }, - }).then( - () => observer.complete(), - error => observer.error(error) - ) - }); - } + //TODO: Any significant API changes here should also be reflected in EventBusService getDashboards(): Observable { return this._httpClient.get(`${GetEndpointAbsolutePath(globalThis.location, environment.fasten_api_endpoint_base)}/secure/dashboards`, )