working upsert logic based on upsert plugin.

Refresh token is broken again.
This commit is contained in:
Jason Kulatunga 2022-10-10 21:56:01 -07:00
parent 2401ab66c7
commit ae9697331a
9 changed files with 208 additions and 56 deletions

View File

@ -1,7 +0,0 @@
export class ResourceFhir {
user_id?: string
source_id: string
source_resource_type: string
source_resource_id: string
payload: any
}

View File

@ -1,7 +0,0 @@
import { Source } from './source';
describe('Source', () => {
it('should create an instance', () => {
expect(new Source()).toBeTruthy();
});
});

View File

@ -1,26 +0,0 @@
export class Source {
id?: string
user_id?: number
source_type: string
patient_id: string
oauth_authorization_endpoint: string
oauth_token_endpoint: string
oauth_registration_endpoint: string
oauth_introspection_endpoint: string
oauth_userinfo_endpoint: string
oauth_token_endpoint_auth_methods_supported: string
api_endpoint_base_url: string
client_id: string
redirect_uri: string
scopes: string //space seperated string
access_token: string
refresh_token: string
id_token: string
expires_at: number
code_challenge: string
code_verifier: string
confidential: boolean
}

View File

@ -19,6 +19,10 @@ export class SourceSyncWorker implements DoWork<string, string> {
const db = NewRepositiory(sourceSyncMessage.userIdentifier, sourceSyncMessage.encryptionKey) const db = NewRepositiory(sourceSyncMessage.userIdentifier, sourceSyncMessage.encryptionKey)
const client = NewClient(sourceSyncMessage.source.source_type, sourceSyncMessage.source) const client = NewClient(sourceSyncMessage.source.source_type, sourceSyncMessage.source)
//TODO: validate the FHIR version from the datasource matches the client
// if the source token has been refreshed, we need to store it in the DB.
// await db.CreateSource()
console.log("!!!!!!!!!!!!!!STARTING WORKER SYNC!!!!!!!!!", sourceSyncMessage) console.log("!!!!!!!!!!!!!!STARTING WORKER SYNC!!!!!!!!!", sourceSyncMessage)
return client.SyncAll(db) return client.SyncAll(db)
.then((resp) => { .then((resp) => {

View File

@ -9,6 +9,8 @@ export interface IDatabaseDocument {
_id?: string _id?: string
_rev?: string _rev?: string
doc_type: string doc_type: string
updated_at?: string
populateId(): void populateId(): void
base64Id(): string base64Id(): string
} }

View File

@ -0,0 +1,86 @@
export class PouchdbUpsert {
public static upsert(db, docId, diffFun, cb?) {
var promise = PouchdbUpsert.upsertInner(db, docId, diffFun);
if (typeof cb !== 'function') {
return promise;
}
promise.then(function(resp) {
cb(null, resp);
}, cb);
};
public static putIfNotExists(db, docId, doc, cb?) {
if (typeof docId !== 'string') {
cb = doc;
doc = docId;
docId = doc._id;
}
var diffFun = function(existingDoc) {
if (existingDoc._rev) {
return false; // do nothing
}
return doc;
};
var promise = PouchdbUpsert.upsertInner(db, docId, diffFun);
if (typeof cb !== 'function') {
return promise;
}
promise.then(function(resp) {
cb(null, resp);
}, cb);
};
///////////////////////////////////////////////////////////////////////////////////////
// private methods
///////////////////////////////////////////////////////////////////////////////////////
// this is essentially the "update sugar" function from daleharvey/pouchdb#1388
// the diffFun tells us what delta to apply to the doc. it either returns
// the doc, or false if it doesn't need to do an update after all
private static upsertInner(db, docId, diffFun) {
if (typeof docId !== 'string') {
return Promise.reject(new Error('doc id is required'));
}
return db.get(docId).catch(function (err) {
/* istanbul ignore next */
if (err.status !== 404) {
throw err;
}
return {};
}).then(function (doc) {
// the user might change the _rev, so save it for posterity
var docRev = doc._rev;
var newDoc = diffFun(doc);
if (!newDoc) {
// if the diffFun returns falsy, we short-circuit as
// an optimization
return { updated: false, rev: docRev, id: docId };
}
// users aren't allowed to modify these values,
// so reset them here
newDoc._id = docId;
newDoc._rev = docRev;
return PouchdbUpsert.tryAndPut(db, newDoc, diffFun);
});
}
private static tryAndPut(db, doc, diffFun) {
return db.put(doc).then((res) => {
return {
updated: true,
rev: res.rev,
id: doc._id
};
}, (err) => {
/* istanbul ignore next */
if (err.status !== 409) {
throw err;
}
return this.upsertInner(db, doc._id, diffFun);
});
}
}

View File

@ -8,8 +8,11 @@ import {Base64} from '../utils/base64';
// PouchDB & plugins // PouchDB & plugins
import * as PouchDB from 'pouchdb/dist/pouchdb'; import * as PouchDB from 'pouchdb/dist/pouchdb';
import * as PouchCrypto from 'crypto-pouch'; import * as PouchCrypto from 'crypto-pouch';
import {PouchdbUpsert} from './plugins/upsert';
PouchDB.plugin(PouchCrypto); PouchDB.plugin(PouchCrypto);
// !!!!!!!!!!!!!!!!WARNING!!!!!!!!!!!!!!!!!!!!! // !!!!!!!!!!!!!!!!WARNING!!!!!!!!!!!!!!!!!!!!!
// most pouchdb plugins seem to fail when used in a webworker. // most pouchdb plugins seem to fail when used in a webworker.
// !!!!!!!!!!!!!!!!WARNING!!!!!!!!!!!!!!!!!!!!! // !!!!!!!!!!!!!!!!WARNING!!!!!!!!!!!!!!!!!!!!!
@ -19,6 +22,15 @@ PouchDB.plugin(PouchCrypto);
// PouchDB.plugin(find); // PouchDB.plugin(find);
// PouchDB.debug.enable('pouchdb:find') // PouchDB.debug.enable('pouchdb:find')
// import * as rawUpsert from 'pouchdb-upsert';
// const upsert: PouchDB.Plugin = (rawUpsert as any);
// PouchDB.plugin(upsert);
// import {PouchdbUpsert} from './plugins/upsert';
// const upsert = new PouchdbUpsert()
// console.log("typeof PouchdbUpsert",typeof upsert, upsert)
// PouchDB.plugin(upsert.default)
// YOU MUST USE globalThis not window or self. // YOU MUST USE globalThis not window or self.
// YOU MUST NOT USE console.* as its not available in a webworker context // YOU MUST NOT USE console.* as its not available in a webworker context
@ -85,7 +97,7 @@ export class PouchdbRepository implements IDatabaseRepository {
// Source // Source
public async CreateSource(source: Source): Promise<string> { public async CreateSource(source: Source): Promise<string> {
return this.createDocument(source); return this.upsertDocument(source);
} }
public async GetSource(source_id: string): Promise<Source> { public async GetSource(source_id: string): Promise<Source> {
@ -145,11 +157,11 @@ export class PouchdbRepository implements IDatabaseRepository {
// Resource // Resource
public async CreateResource(resource: ResourceFhir): Promise<string> { public async CreateResource(resource: ResourceFhir): Promise<string> {
return this.createDocument(resource); return this.upsertDocument(resource);
} }
public async CreateResources(resources: ResourceFhir[]): Promise<string[]> { public async CreateResources(resources: ResourceFhir[]): Promise<string[]> {
return this.createBulk(resources); return this.upsertBulk(resources);
} }
public async GetResource(resource_id: string): Promise<ResourceFhir> { public async GetResource(resource_id: string): Promise<ResourceFhir> {
@ -214,30 +226,80 @@ export class PouchdbRepository implements IDatabaseRepository {
// } // }
} }
// create a new document. Returns a promise of the generated id.
protected createDocument(doc: IDatabaseDocument) : Promise<string> { // update/insert a new document. Returns a promise of the generated id.
protected upsertDocument(newDoc: IDatabaseDocument) : Promise<string> {
// make sure we always "populate" the ID for every document before submitting // make sure we always "populate" the ID for every document before submitting
doc.populateId() newDoc.populateId()
// NOTE: All friends are given the key-prefix of "friend:". This way, when we go // NOTE: All friends are given the key-prefix of "friend:". This way, when we go
// to query for friends, we can limit the scope to keys with in this key-space. // to query for friends, we can limit the scope to keys with in this key-space.
return this.GetDB() return this.GetDB()
.then((db) => db.put(doc)) .then((db) => {
return PouchdbUpsert.upsert(db, newDoc._id, (existingDoc: IDatabaseDocument) => {
//diffFunc - function that takes the existing doc as input and returns an updated doc.
// If this diffFunc returns falsey, then the update won't be performed (as an optimization).
// If the document does not already exist, then {} will be the input to diffFunc.
const isExistingEmpty = Object.keys(existingDoc).length === 0
if(isExistingEmpty){
//always return new doc (and set update_at if not already set)
//if this is a ResourceFhir doc, see if theres a updatedDate already
if(newDoc.doc_type == DocType.ResourceFhir){
newDoc.updated_at = newDoc.updated_at || (newDoc as any).meta?.updated_at
}
newDoc.updated_at = newDoc.updated_at || (new Date().toISOString())
return newDoc
}
if(newDoc.doc_type == DocType.ResourceFhir){
//for resourceFhir docs, we only care about comparing the resource_raw content
const existingContent = JSON.stringify((existingDoc as ResourceFhir).resource_raw)
const newContent = JSON.stringify((newDoc as ResourceFhir).resource_raw)
if(existingContent == newContent){
return false //do not update
} else {
//theres a difference. Set the updated_at date if possible, otherwise use the current date
(newDoc as ResourceFhir).updated_at = (newDoc as any).meta?.updated_at || (new Date().toISOString())
return newDoc
}
} else if(newDoc.doc_type == DocType.Source){
delete existingDoc._rev
const existingContent = JSON.stringify(existingDoc)
const newContent = JSON.stringify(newDoc)
if(existingContent == newContent){
return false //do not update, content is the same for source object
} else {
//theres a difference. Set the updated_at date
(newDoc as Source).updated_at = (new Date().toISOString())
return { ...existingDoc, ...newDoc };
}
} else {
throw new Error("unknown doc_type, cannot diff for upsert: " + newDoc.doc_type)
}
})
})
.then(( result ): string => { .then(( result ): string => {
return( result.id ); return( result.id );
} }
); );
} }
// create multiple documents, returns a list of generated ids protected upsertBulk(docs: IDatabaseDocument[]): Promise<string[]> {
protected createBulk(docs: IDatabaseDocument[]): Promise<string[]> {
return this.GetDB() return this.GetDB()
.then((db) => { .then((db) => {
return db.bulkDocs(docs.map((doc) => { doc.populateId(); return doc }))
}) return Promise.all(docs.map((doc) => {
.then((results): string[] => { doc.populateId();
return results.map((result) => result.id) return this.upsertDocument(doc)
}))
}) })
} }
@ -271,6 +333,44 @@ export class PouchdbRepository implements IDatabaseRepository {
}) })
} }
//DEPRECATED
/**
* create multiple documents, returns a list of generated ids
* @deprecated
* @param docs
* @protected
*/
protected createBulk(docs: IDatabaseDocument[]): Promise<string[]> {
return this.GetDB()
.then((db) => {
return db.bulkDocs(docs.map((doc) => { doc.populateId(); return doc }))
})
.then((results): string[] => {
return results.map((result) => result.id)
})
}
/**
* create a new document. Returns a promise of the generated id.
* @deprecated
* @param doc
* @protected
*/
protected createDocument(doc: IDatabaseDocument) : Promise<string> {
// make sure we always "populate" the ID for every document before submitting
doc.populateId()
// NOTE: All friends are given the key-prefix of "friend:". This way, when we go
// to query for friends, we can limit the scope to keys with in this key-space.
return this.GetDB()
.then((db) => db.put(doc))
.then(( result ): string => {
return( result.id );
}
);
}
/////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////
// Sync private/protected methods // Sync private/protected methods
/////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////

View File

@ -6,9 +6,8 @@ export class ResourceFhir {
_id?: string _id?: string
_rev?: string _rev?: string
doc_type: DocType = DocType.ResourceFhir doc_type: DocType = DocType.ResourceFhir
updated_at?: string
created_at?: Date
updated_at?: Date
source_id: string = "" source_id: string = ""
source_resource_type: string = "" source_resource_type: string = ""
source_resource_id: string = "" source_resource_id: string = ""

View File

@ -7,8 +7,9 @@ export class Source extends LighthouseSourceMetadata{
_id?: string _id?: string
_rev?: string _rev?: string
doc_type: string doc_type: string
source_type: SourceType updated_at?: string
source_type: SourceType
patient: string patient: string
access_token: string access_token: string
refresh_token?: string refresh_token?: string