added a mechanism to store updated source data in the db (after tokens have been refreshed)

refresh tokens are still broken/inconsistently refreshed.
This commit is contained in:
Jason Kulatunga 2022-10-12 20:04:59 -07:00
parent f851cad8f8
commit e5daf0a7e5
3 changed files with 50 additions and 12 deletions

View File

@ -6,6 +6,7 @@ import {mergeMap} from 'rxjs/operators';
import {SourceSyncMessage} from '../models/queue/source-sync-message'; import {SourceSyncMessage} from '../models/queue/source-sync-message';
import {NewRepositiory} from '../../lib/database/pouchdb_repository'; import {NewRepositiory} from '../../lib/database/pouchdb_repository';
import {NewClient} from '../../lib/conduit/factory'; import {NewClient} from '../../lib/conduit/factory';
import {Source} from '../../lib/models/database/source';
export class SourceSyncWorker implements DoWork<string, string> { export class SourceSyncWorker implements DoWork<string, string> {
public work(input$: Observable<string>): Observable<string> { public work(input$: Observable<string>): Observable<string> {
@ -18,13 +19,28 @@ export class SourceSyncWorker implements DoWork<string, string> {
const sourceSyncMessage = JSON.parse(msg) as SourceSyncMessage const sourceSyncMessage = JSON.parse(msg) as SourceSyncMessage
const db = NewRepositiory(sourceSyncMessage.userIdentifier, sourceSyncMessage.encryptionKey) const db = NewRepositiory(sourceSyncMessage.userIdentifier, sourceSyncMessage.encryptionKey)
const client = NewClient(sourceSyncMessage.source.source_type, sourceSyncMessage.source) const client = NewClient(sourceSyncMessage.source.source_type, new Source(sourceSyncMessage.source))
//TODO: validate the FHIR version from the datasource matches the client //TODO: validate the FHIR version from the datasource matches the client
// if the source token has been refreshed, we need to store it in the DB. // if the source token has been refreshed, we need to store it in the DB.
// await db.UpsertSource() // await db.UpsertSource()
//lets refresh the source information if required.
console.log("!!!!!!!!!!!!!!STARTING WORKER SYNC!!!!!!!!!", sourceSyncMessage) console.log("!!!!!!!!!!!!!!STARTING WORKER SYNC!!!!!!!!!", sourceSyncMessage)
return client.SyncAll(db) return client.RefreshSourceToken()
.then((wasSourceRefreshed)=>{
if(wasSourceRefreshed){
//the source was updated, we need to save the updated source information
return db.UpsertSource(client.source)
.then(() => {
return client
})
}
return client
})
.then((client)=> {
return client.SyncAll(db)
})
.then((resp) => { .then((resp) => {
console.log("!!!!!!!!!!!!!COMPLETE WORKER SYNC!!!!!!!!!!", resp) console.log("!!!!!!!!!!!!!COMPLETE WORKER SYNC!!!!!!!!!!", resp)
sourceSyncMessage.response = resp sourceSyncMessage.response = resp

View File

@ -2,6 +2,12 @@ import {Source} from '../../../models/database/source';
import * as Oauth from '@panva/oauth4webapi'; import * as Oauth from '@panva/oauth4webapi';
import {IResourceRaw} from '../../interface'; import {IResourceRaw} from '../../interface';
class SourceUpdateStatus {
is_updated: boolean = false
source: Source
}
// BaseClient is an abstract/partial class, its intended to be used by FHIR clients, and generically handle OAuth requests. // BaseClient is an abstract/partial class, its intended to be used by FHIR clients, and generically handle OAuth requests.
export abstract class BaseClient { export abstract class BaseClient {
@ -64,7 +70,7 @@ export abstract class BaseClient {
//refresh the source if required //refresh the source if required
this.source = await this.refreshExpiredTokenIfRequired(this.source) await this.RefreshSourceToken()
//make a request to the protected resource //make a request to the protected resource
const resp = await Oauth.protectedResourceRequest(this.source.access_token, 'GET', new URL(resourceUrl), this.headers, null) const resp = await Oauth.protectedResourceRequest(this.source.access_token, 'GET', new URL(resourceUrl), this.headers, null)
@ -78,6 +84,14 @@ export abstract class BaseClient {
// return err // return err
} }
public async RefreshSourceToken(): Promise<boolean>{
let sourceUpdateStatus = await this.refreshExpiredTokenIfRequired(this.source)
this.source = sourceUpdateStatus.source
return sourceUpdateStatus.is_updated
}
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
// Protected methods // Protected methods
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
@ -87,17 +101,13 @@ export abstract class BaseClient {
// Private methods // Private methods
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
private getCORSProxyPath(): string { private async refreshExpiredTokenIfRequired(source: Source): Promise<SourceUpdateStatus> {
const basePath = globalThis.location.pathname.split('/web').slice(0, 1)[0]; const sourceUpdateStatus = new SourceUpdateStatus()
return `${globalThis.location.origin}${basePath || '/'}cors/`
}
private async refreshExpiredTokenIfRequired(source: Source): Promise<Source> {
//check if token has expired, and a refreshtoken is available //check if token has expired, and a refreshtoken is available
// Note: source.expires_at is in seconds, Date.now() is in milliseconds. // Note: source.expires_at is in seconds, Date.now() is in milliseconds.
if(source.expires_at > Math.floor(Date.now() / 1000)) { //not expired return if(source.expires_at > Math.floor(Date.now() / 1000)) { //not expired return
return Promise.resolve(source) sourceUpdateStatus.source = source
return Promise.resolve(sourceUpdateStatus)
} }
if(!source.refresh_token){ if(!source.refresh_token){
return Promise.reject(new Error("access token is expired, but no refresh token available")) return Promise.reject(new Error("access token is expired, but no refresh token available"))
@ -109,7 +119,9 @@ export abstract class BaseClient {
return Oauth.processRefreshTokenResponse(this.oauthAuthorizationServer, this.oauthClient, refreshTokenResp) return Oauth.processRefreshTokenResponse(this.oauthAuthorizationServer, this.oauthClient, refreshTokenResp)
}) })
.then((newToken) => { .then((newToken) => {
if(newToken.access_token != source.access_token){ if(newToken.access_token != source.access_token){
sourceUpdateStatus.is_updated = true
// { // {
// access_token: 'token', // access_token: 'token',
// token_type: 'bearer', // token_type: 'bearer',
@ -128,7 +140,14 @@ export abstract class BaseClient {
source.refresh_token = newToken.refresh_token as string source.refresh_token = newToken.refresh_token as string
} }
} }
return source sourceUpdateStatus.source = source
return sourceUpdateStatus
}) })
} }
private getCORSProxyPath(): string {
const basePath = globalThis.location.pathname.split('/web').slice(0, 1)[0];
return `${globalThis.location.origin}${basePath || '/'}cors/`
}
} }

View File

@ -1,10 +1,13 @@
import {IDatabaseRepository} from '../database/interface'; import {IDatabaseRepository} from '../database/interface';
import {UpsertSummary} from '../models/fasten/upsert-summary'; import {UpsertSummary} from '../models/fasten/upsert-summary';
import {Source} from '../models/database/source';
export interface IClient { export interface IClient {
fhirVersion: string fhirVersion: string
source: Source
GetRequest(resourceSubpath: string): Promise<any> GetRequest(resourceSubpath: string): Promise<any>
GetFhirVersion(): Promise<any> GetFhirVersion(): Promise<any>
RefreshSourceToken(): Promise<boolean>
/** /**
* This function attempts to retrieve a Patient Bundle and sync all resources to the database * This function attempts to retrieve a Patient Bundle and sync all resources to the database