adding queue.

This commit is contained in:
Jason Kulatunga 2022-10-06 18:04:07 -07:00
parent 5442063c17
commit 054756c9b7
5 changed files with 113 additions and 20 deletions

View File

@ -2,6 +2,7 @@ import { Component, OnInit } from '@angular/core';
import {NavigationEnd, Router} from '@angular/router';
import {fromWorker} from 'observable-webworker';
import {Observable, of} from 'rxjs';
import {QueueService} from './workers/queue.service';
@Component({
selector: 'app-root',
@ -15,7 +16,7 @@ export class AppComponent implements OnInit {
showHeader:boolean = false;
showFooter:boolean = true;
constructor(private router: Router) {}
constructor(private router: Router, private queueService: QueueService) {}
ngOnInit() {
@ -26,8 +27,6 @@ export class AppComponent implements OnInit {
//determine if we should show the header
this.router.events.subscribe(event => this.modifyHeader(event));
this.runWorker()
}
modifyHeader(event) {
@ -40,23 +39,6 @@ export class AppComponent implements OnInit {
}
}
}
runWorker() {
if (typeof Worker !== 'undefined') {
const input$: Observable<string> = of('hello');
fromWorker<string, string>(() => new Worker(new URL('./workers/background.worker', import.meta.url), {type: 'module'}), input$)
.subscribe(message => {
console.log(`Got message`, message);
});
}else {
// Web workers are not supported in this environment.
// You should add a fallback so that your program still executes correctly.
console.error("WORKERS ARE NOT SUPPORTED")
}
}
}

View File

@ -0,0 +1,6 @@
import {Source} from '../../../lib/models/database/source';
export class SourceSyncMessage {
source: Source
database_name: string
}

View File

@ -0,0 +1,16 @@
import { TestBed } from '@angular/core/testing';
import { QueueService } from './queue.service';
describe('QueueService', () => {
let service: QueueService;
beforeEach(() => {
TestBed.configureTestingModule({});
service = TestBed.inject(QueueService);
});
it('should be created', () => {
expect(service).toBeTruthy();
});
});

View File

@ -0,0 +1,31 @@
import {Injectable} from '@angular/core';
import {Observable, of} from 'rxjs';
import {fromWorker} from 'observable-webworker';
import {Source} from '../../lib/models/database/source';
import {SourceSyncMessage} from '../models/queue/source-sync-message';
@Injectable({
providedIn: 'root'
})
export class QueueService {
constructor() { }
runSourceSyncWorker(source: Source):Observable<string> {
if (typeof Worker !== 'undefined') {
const sourceSync = new SourceSyncMessage()
sourceSync.source = source
sourceSync.database_name = "fasten"
const input$: Observable<string> = of(JSON.stringify(sourceSync));
return fromWorker<string, string>(() => new Worker(new URL('./source-sync.worker', import.meta.url), {type: 'module'}), input$)
// .subscribe(message => {
// console.log(`Got message`, message);
// });
}else {
// Web workers are not supported in this environment.
// You should add a fallback so that your program still executes correctly.
console.error("WORKERS ARE NOT SUPPORTED")
}
}
}

View File

@ -0,0 +1,58 @@
/// <reference lib="webworker" />
// addEventListener('message', ({ data }) => {
// const response = `worker response to ${data}`;
// postMessage(response);
// });
//
import {DoWork, runWorker} from 'observable-webworker';
import {from, Observable} from 'rxjs';
import {map, mergeMap} from 'rxjs/operators';
import {SourceSyncMessage} from '../models/queue/source-sync-message';
import {NewClient} from '../../lib/conduit/factory';
import {NewRepositiory} from '../../lib/database/pouchdb_repository';
export class SourceSyncWorker implements DoWork<string, string> {
public work(input$: Observable<string>): Observable<string> {
return input$.pipe(
//mergeMap allows us to convert a promise into an observable
// https://stackoverflow.com/questions/53649294/how-to-handle-for-promise-inside-a-piped-map
mergeMap(msg => {
console.log(msg); // outputs 'Hello from main thread'
const sourceSyncMessage = JSON.parse(msg) as SourceSyncMessage
// const fastenDB = new PouchDB('kittens');
// fastenDB.get("source_bluebutton_6966c695-1c15-46df-9247-e09f00688b0f")
// .then(console.log)
// .then(() => {console.log("PREVIOUS MESSAGE WAS FROM WORKER")})
const db = NewRepositiory(sourceSyncMessage.database_name)
const client = NewClient(sourceSyncMessage.source.source_type, sourceSyncMessage.source)
console.log("!!!!!!!!!!!!!!STARTING WORKER SYNC!!!!!!!!!", sourceSyncMessage)
return client.SyncAll(db)
.then((resp) => {
console.log("!!!!!!!!!!!!!COMPLETE WORKER SYNC!!!!!!!!!!", resp)
// response$.
return JSON.stringify(resp)
})
.catch((err) => {
console.error("!!!!!!!!!!!!!ERROR WORKER SYNC!!!!!!!!!!", err)
throw err
})
// return from(resp)
// return JSON.stringify(sourceSyncMessage)
// const sourceSyncUpdate = new SourceSyncMessage()
// sourceSyncUpdate.type = SourceSyncMessageType.StatusUpdate
// sourceSyncUpdate.message = `began processing source (${sourceSyncMessage.source.source_type}) in web-worker`
// return sourceSyncUpdate
// // return defer(() => {return promiseChain});
}),
);
}
}
runWorker(SourceSyncWorker);