From 054756c9b79c0fd984d6b9881312e735bb63267b Mon Sep 17 00:00:00 2001 From: Jason Kulatunga Date: Thu, 6 Oct 2022 18:04:07 -0700 Subject: [PATCH] adding queue. --- frontend/src/app/app.component.ts | 22 +------ .../app/models/queue/source-sync-message.ts | 6 ++ .../src/app/workers/queue.service.spec.ts | 16 +++++ frontend/src/app/workers/queue.service.ts | 31 ++++++++++ .../src/app/workers/source-sync.worker.ts | 58 +++++++++++++++++++ 5 files changed, 113 insertions(+), 20 deletions(-) create mode 100644 frontend/src/app/models/queue/source-sync-message.ts create mode 100644 frontend/src/app/workers/queue.service.spec.ts create mode 100644 frontend/src/app/workers/queue.service.ts create mode 100644 frontend/src/app/workers/source-sync.worker.ts diff --git a/frontend/src/app/app.component.ts b/frontend/src/app/app.component.ts index 5f1b4d68..5cda0a86 100644 --- a/frontend/src/app/app.component.ts +++ b/frontend/src/app/app.component.ts @@ -2,6 +2,7 @@ import { Component, OnInit } from '@angular/core'; import {NavigationEnd, Router} from '@angular/router'; import {fromWorker} from 'observable-webworker'; import {Observable, of} from 'rxjs'; +import {QueueService} from './workers/queue.service'; @Component({ selector: 'app-root', @@ -15,7 +16,7 @@ export class AppComponent implements OnInit { showHeader:boolean = false; showFooter:boolean = true; - constructor(private router: Router) {} + constructor(private router: Router, private queueService: QueueService) {} ngOnInit() { @@ -26,8 +27,6 @@ export class AppComponent implements OnInit { //determine if we should show the header this.router.events.subscribe(event => this.modifyHeader(event)); - - this.runWorker() } modifyHeader(event) { @@ -40,23 +39,6 @@ export class AppComponent implements OnInit { } } } - - runWorker() { - if (typeof Worker !== 'undefined') { - const input$: Observable = of('hello'); - fromWorker(() => new Worker(new URL('./workers/background.worker', import.meta.url), {type: 'module'}), input$) - .subscribe(message => { - console.log(`Got message`, message); - }); - }else { - // Web workers are not supported in this environment. - // You should add a fallback so that your program still executes correctly. - console.error("WORKERS ARE NOT SUPPORTED") - } - - } - - } diff --git a/frontend/src/app/models/queue/source-sync-message.ts b/frontend/src/app/models/queue/source-sync-message.ts new file mode 100644 index 00000000..fab099b3 --- /dev/null +++ b/frontend/src/app/models/queue/source-sync-message.ts @@ -0,0 +1,6 @@ +import {Source} from '../../../lib/models/database/source'; + +export class SourceSyncMessage { + source: Source + database_name: string +} diff --git a/frontend/src/app/workers/queue.service.spec.ts b/frontend/src/app/workers/queue.service.spec.ts new file mode 100644 index 00000000..b3af28c5 --- /dev/null +++ b/frontend/src/app/workers/queue.service.spec.ts @@ -0,0 +1,16 @@ +import { TestBed } from '@angular/core/testing'; + +import { QueueService } from './queue.service'; + +describe('QueueService', () => { + let service: QueueService; + + beforeEach(() => { + TestBed.configureTestingModule({}); + service = TestBed.inject(QueueService); + }); + + it('should be created', () => { + expect(service).toBeTruthy(); + }); +}); diff --git a/frontend/src/app/workers/queue.service.ts b/frontend/src/app/workers/queue.service.ts new file mode 100644 index 00000000..8f9a0068 --- /dev/null +++ b/frontend/src/app/workers/queue.service.ts @@ -0,0 +1,31 @@ +import {Injectable} from '@angular/core'; +import {Observable, of} from 'rxjs'; +import {fromWorker} from 'observable-webworker'; +import {Source} from '../../lib/models/database/source'; +import {SourceSyncMessage} from '../models/queue/source-sync-message'; + +@Injectable({ + providedIn: 'root' +}) +export class QueueService { + + constructor() { } + + runSourceSyncWorker(source: Source):Observable { + if (typeof Worker !== 'undefined') { + const sourceSync = new SourceSyncMessage() + sourceSync.source = source + sourceSync.database_name = "fasten" + const input$: Observable = of(JSON.stringify(sourceSync)); + return fromWorker(() => new Worker(new URL('./source-sync.worker', import.meta.url), {type: 'module'}), input$) + // .subscribe(message => { + // console.log(`Got message`, message); + // }); + }else { + // Web workers are not supported in this environment. + // You should add a fallback so that your program still executes correctly. + console.error("WORKERS ARE NOT SUPPORTED") + } + + } +} diff --git a/frontend/src/app/workers/source-sync.worker.ts b/frontend/src/app/workers/source-sync.worker.ts new file mode 100644 index 00000000..8f90a27e --- /dev/null +++ b/frontend/src/app/workers/source-sync.worker.ts @@ -0,0 +1,58 @@ +/// + + + +// addEventListener('message', ({ data }) => { +// const response = `worker response to ${data}`; +// postMessage(response); +// }); +// + +import {DoWork, runWorker} from 'observable-webworker'; +import {from, Observable} from 'rxjs'; +import {map, mergeMap} from 'rxjs/operators'; +import {SourceSyncMessage} from '../models/queue/source-sync-message'; +import {NewClient} from '../../lib/conduit/factory'; +import {NewRepositiory} from '../../lib/database/pouchdb_repository'; + +export class SourceSyncWorker implements DoWork { + public work(input$: Observable): Observable { + return input$.pipe( + //mergeMap allows us to convert a promise into an observable + // https://stackoverflow.com/questions/53649294/how-to-handle-for-promise-inside-a-piped-map + mergeMap(msg => { + console.log(msg); // outputs 'Hello from main thread' + const sourceSyncMessage = JSON.parse(msg) as SourceSyncMessage + + // const fastenDB = new PouchDB('kittens'); + // fastenDB.get("source_bluebutton_6966c695-1c15-46df-9247-e09f00688b0f") + // .then(console.log) + // .then(() => {console.log("PREVIOUS MESSAGE WAS FROM WORKER")}) + const db = NewRepositiory(sourceSyncMessage.database_name) + const client = NewClient(sourceSyncMessage.source.source_type, sourceSyncMessage.source) + console.log("!!!!!!!!!!!!!!STARTING WORKER SYNC!!!!!!!!!", sourceSyncMessage) + return client.SyncAll(db) + .then((resp) => { + console.log("!!!!!!!!!!!!!COMPLETE WORKER SYNC!!!!!!!!!!", resp) + // response$. + return JSON.stringify(resp) + + }) + .catch((err) => { + console.error("!!!!!!!!!!!!!ERROR WORKER SYNC!!!!!!!!!!", err) + throw err + }) + // return from(resp) + + // return JSON.stringify(sourceSyncMessage) + // const sourceSyncUpdate = new SourceSyncMessage() + // sourceSyncUpdate.type = SourceSyncMessageType.StatusUpdate + // sourceSyncUpdate.message = `began processing source (${sourceSyncMessage.source.source_type}) in web-worker` + // return sourceSyncUpdate + // // return defer(() => {return promiseChain}); + }), + ); + } +} + +runWorker(SourceSyncWorker);