From 862e3d6ea7ea50a15ee60e05b0ecc7337d3f5646 Mon Sep 17 00:00:00 2001 From: Jason Kulatunga Date: Fri, 8 Sep 2023 09:27:37 -0700 Subject: [PATCH] pass in the event bus to the DatabaseRepository for notifications. --- backend/pkg/database/sqlite_repository.go | 37 ++++++++++++++--------- backend/pkg/web/sse/event_bus.go | 1 + 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/backend/pkg/database/sqlite_repository.go b/backend/pkg/database/sqlite_repository.go index 918c5526..cd0bfc71 100644 --- a/backend/pkg/database/sqlite_repository.go +++ b/backend/pkg/database/sqlite_repository.go @@ -10,6 +10,7 @@ import ( "github.com/fastenhealth/fasten-onprem/backend/pkg/models" databaseModel "github.com/fastenhealth/fasten-onprem/backend/pkg/models/database" "github.com/fastenhealth/fasten-onprem/backend/pkg/utils" + "github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse" sourceModel "github.com/fastenhealth/fasten-sources/clients/models" "github.com/gin-gonic/gin" "github.com/glebarez/sqlite" @@ -61,6 +62,7 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger) AppConfig: appConfig, Logger: globalLogger, GormClient: database, + EventBus: sse.GetEventBusServer(), } //TODO: automigrate for now, this should be replaced with a migration tool once the DB has stabilized. @@ -92,6 +94,8 @@ type SqliteRepository struct { Logger logrus.FieldLogger GormClient *gorm.DB + + EventBus *sse.EventBus } func (sr *SqliteRepository) Migrate() error { @@ -137,7 +141,7 @@ func (sr *SqliteRepository) GetUserByUsername(ctx context.Context, username stri return &foundUser, result.Error } -//TODO: check for error, right now we return a nil which may cause a panic. +// TODO: check for error, right now we return a nil which may cause a panic. func (sr *SqliteRepository) GetCurrentUser(ctx context.Context) (*models.User, error) { username := ctx.Value(pkg.ContextKeyTypeAuthUsername) if username == nil { @@ -255,9 +259,9 @@ func (sr *SqliteRepository) GetSummary(ctx context.Context) (*models.Summary, er // Resource //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -//This function will create a new resource if it does not exist, or update an existing resource if it does exist. -//It will also create associations between fhir resources -//This function is called directly by fasten-sources +// This function will create a new resource if it does not exist, or update an existing resource if it does exist. +// It will also create associations between fhir resources +// This function is called directly by fasten-sources func (sr *SqliteRepository) UpsertRawResource(ctx context.Context, sourceCredential sourceModel.SourceCredential, rawResource sourceModel.RawResourceFhir) (bool, error) { source := sourceCredential.(*models.SourceCredential) @@ -321,10 +325,11 @@ func (sr *SqliteRepository) UpsertRawResource(ctx context.Context, sourceCredent // this method will upsert a resource, however it will not create associations. // UPSERT operation // - call FindOrCreate -// - check if the resource exists -// - if it does not exist, insert it +// - check if the resource exists +// - if it does not exist, insert it +// // - if no error during FindOrCreate && no rows affected (nothing was created) -// - update the resource using Updates operation +// - update the resource using Updates operation func (sr *SqliteRepository) UpsertResource(ctx context.Context, wrappedResourceModel *models.ResourceBase) (bool, error) { currentUser, currentUserErr := sr.GetCurrentUser(ctx) if currentUserErr != nil { @@ -352,6 +357,7 @@ func (sr *SqliteRepository) UpsertResource(ctx context.Context, wrappedResourceM //wrappedFhirResourceModel.SetResourceRaw(wrappedResourceModel.ResourceRaw) } + sr.EventBus.Message <- fmt.Sprintf("resource.upsert %s/%s", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID) createResult := sr.GormClient.WithContext(ctx).Where(models.OriginBase{ SourceID: wrappedFhirResourceModel.GetSourceID(), SourceResourceID: wrappedFhirResourceModel.GetSourceResourceID(), @@ -618,13 +624,14 @@ func (sr *SqliteRepository) FindResourceAssociationsByTypeAndId(ctx context.Cont // - find source for each resource // - (SECURITY) ensure the current user and the source for each resource matches // - check if there is a Composition resource Type already. -// - if Composition type already exists: -// - update "relatesTo" field with additional data. -// - else: -// - Create a Composition resource type (populated with "relatesTo" references to all provided Resources) +// - if Composition type already exists: +// - update "relatesTo" field with additional data. +// - else: +// - Create a Composition resource type (populated with "relatesTo" references to all provided Resources) +// // - add AddResourceAssociation for all resources linked to the Composition resource // - store the Composition resource -//TODO: determine if we should be using a List Resource instead of a Composition resource +// TODO: determine if we should be using a List Resource instead of a Composition resource func (sr *SqliteRepository) AddResourceComposition(ctx context.Context, compositionTitle string, resources []*models.ResourceBase) error { currentUser, currentUserErr := sr.GetCurrentUser(ctx) if currentUserErr != nil { @@ -943,11 +950,11 @@ func sqlitePragmaString(pragmas map[string]string) string { return "" } -//Internal function +// Internal function // This function will return a list of resources from all FHIR tables in the database // The query allows us to set the source id, source resource id, source resource type -//SECURITY: this function assumes the user has already been authenticated -//TODO: theres probably a more efficient way of doing this with GORM +// SECURITY: this function assumes the user has already been authenticated +// TODO: theres probably a more efficient way of doing this with GORM func (sr *SqliteRepository) getResourcesFromAllTables(queryBuilder *gorm.DB, queryParam models.OriginBase) ([]models.ResourceBase, error) { wrappedResourceModels := []models.ResourceBase{} resourceTypes := databaseModel.GetAllowedResourceTypes() diff --git a/backend/pkg/web/sse/event_bus.go b/backend/pkg/web/sse/event_bus.go index c99ffc8e..79f8dfcf 100644 --- a/backend/pkg/web/sse/event_bus.go +++ b/backend/pkg/web/sse/event_bus.go @@ -60,6 +60,7 @@ type EventBus struct { // It Listens all incoming requests from clients. // Handles addition and removal of clients and broadcast messages to clients. +// TODO: determine how to route messages based on authenticated client func (bus *EventBus) listen() { for { select {