consolidate event bus methods in EventBusService.

This commit is contained in:
Jason Kulatunga 2023-09-10 09:19:59 -07:00
parent 2b0a1e0d6d
commit 8b6c321e8e
3 changed files with 87 additions and 33 deletions

View File

@ -66,7 +66,7 @@ export class MedicalSourcesConnectedComponent implements OnInit {
.then(console.log)
}
this.eventBusService.eventBusSourceSyncMessages.subscribe((event) => {
this.eventBusService.SourceSyncMessages.subscribe((event) => {
this.status[event.source_id] = "token"
})

View File

@ -7,19 +7,28 @@ import {ToastService} from './toast.service';
import {Event} from '../models/events/event';
import {EventSourceComplete} from '../models/events/event_source_complete';
import {EventSourceSync} from '../models/events/event_source_sync';
import {GetEndpointAbsolutePath} from '../../lib/utils/endpoint_absolute_path';
import {environment} from '../../environments/environment';
import {fetchEventSource} from '@microsoft/fetch-event-source';
@Injectable({
providedIn: 'root'
})
export class EventBusService {
eventBusSubscription: Subscription | undefined;
eventBusSourceSyncMessages: Subject<EventSourceSync> = new Subject<EventSourceSync>();
eventBusSourceCompleteMessages: Subject<EventSourceComplete> = new Subject<EventSourceComplete>();
//stores a reference to the event stream observable, which we can listen to for events
private eventBus: Observable<Event> | undefined;
//stores a reference to the event bus abort controller, which we can use to abort the event bus connection
private eventBusAbortController: AbortController | undefined;
//stores a reference to the event bus observable subscription, which we can use to unsubscribe from the event bus
private eventBusSubscription: Subscription | undefined;
public SourceSyncMessages: Subject<EventSourceSync> = new Subject<EventSourceSync>();
public SourceCompleteMessages: Subject<EventSourceComplete> = new Subject<EventSourceComplete>();
constructor(
public router: Router,
public authService: AuthService,
public fastenApiService: FastenApiService,
public toastService: ToastService
) {
@ -28,21 +37,83 @@ export class EventBusService {
this.authService.IsAuthenticatedSubject.subscribe((isAuthenticated) => {
console.log("isAuthenticated changed:", isAuthenticated)
if(isAuthenticated){
this.eventBusSubscription = this.fastenApiService.listenEventBus().subscribe((event: Event | EventSourceSync | EventSourceComplete)=>{
this.eventBusSubscription = this.listenEventBus().subscribe((event: Event | EventSourceSync | EventSourceComplete)=>{
console.log("eventbus event:", event)
//TODO: start toasts.
if(event.event_type == "source_sync"){
this.eventBusSourceSyncMessages.next(event as EventSourceSync)
this.SourceSyncMessages.next(event as EventSourceSync)
} else if(event.event_type == "source_complete"){
this.eventBusSourceCompleteMessages.next(event as EventSourceComplete)
this.SourceCompleteMessages.next(event as EventSourceComplete)
}
})
} else {
//no longer authenticated, unsubscribe from eventbus
if(this.eventBusSubscription){
this.eventBusSubscription.unsubscribe()
}
//no longer authenticated, unsubscribe from eventbus and abort/terminate connection
this.abortEventBus()
}
});
}
//Make sure we an cancel the event bus connection & subscription, resetting to a clean state.
abortEventBus() {
if(this.eventBusAbortController){
try {
this.eventBusAbortController.abort()
} catch (e) {
console.log("ignoring, error aborting event bus:", e)
}
}
if(this.eventBusSubscription){
try {
this.eventBusSubscription.unsubscribe()
} catch (e) {
console.log("ignoring, error unsubscribing from event bus:", e)
}
}
this.eventBus = null
this.eventBusAbortController = null
this.eventBusSubscription = null
}
//Listen to the event bus, and return an observable that we can subscribe to.
//this method uses the fetch-event-source library, which is a polyfill for the EventSource API (which does not support Authorization Headers)
//
listenEventBus(): Observable<any> {
//this is a singleton, so if we already have an event bus, return it.
if(this.eventBus){
return this.eventBus
}
let serviceThis = this;
let eventStreamUrl = `${GetEndpointAbsolutePath(globalThis.location, environment.fasten_api_endpoint_base)}/secure/events/stream`
this.eventBusAbortController = new AbortController();
this.eventBus = new Observable(observer => {
fetchEventSource(eventStreamUrl, {
method: 'GET',
headers: {
'Authorization': `Bearer ${this.authService.GetAuthToken()}`
},
onmessage(ev) {
observer.next(JSON.parse(ev.data));
},
onerror(event) {
observer.error(event)
//don't retry, just close the stream
observer.complete()
throw new Error('EventBus error: ' + event);
},
onclose(){
// if the server closes the connection unexpectedly, retry:
serviceThis.abortEventBus()
},
signal: this.eventBusAbortController.signal,
}).then(
() => observer.complete(),
error => observer.error(error)
)
});
return this.eventBus
}
}

View File

@ -29,6 +29,9 @@ import { fetchEventSource } from '@microsoft/fetch-event-source';
})
export class FastenApiService {
private _eventBus: Observable<Event>
private _eventBusAbortController: AbortController
constructor(@Inject(HTTP_CLIENT_TOKEN) private _httpClient: HttpClient, private router: Router, private authService: AuthService) {
}
@ -55,27 +58,7 @@ export class FastenApiService {
SECURE ENDPOINTS
*/
listenEventBus(): Observable<any> {
let eventStreamUrl = `${GetEndpointAbsolutePath(globalThis.location, environment.fasten_api_endpoint_base)}/secure/events/stream`
return new Observable(observer => {
fetchEventSource(eventStreamUrl, {
method: 'GET',
headers: {
'Authorization': `Bearer ${this.authService.GetAuthToken()}`
},
onmessage(ev) {
observer.next(JSON.parse(ev.data));
},
onerror(event) {
observer.error(event)
},
}).then(
() => observer.complete(),
error => observer.error(error)
)
});
}
//TODO: Any significant API changes here should also be reflected in EventBusService
getDashboards(): Observable<DashboardConfig[]> {
return this._httpClient.get<any>(`${GetEndpointAbsolutePath(globalThis.location, environment.fasten_api_endpoint_base)}/secure/dashboards`, )