pass in the event bus to the DatabaseRepository for notifications.

This commit is contained in:
Jason Kulatunga 2023-09-08 09:27:37 -07:00
parent 8ff42142fb
commit 862e3d6ea7
2 changed files with 23 additions and 15 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/fastenhealth/fasten-onprem/backend/pkg/models"
databaseModel "github.com/fastenhealth/fasten-onprem/backend/pkg/models/database"
"github.com/fastenhealth/fasten-onprem/backend/pkg/utils"
"github.com/fastenhealth/fasten-onprem/backend/pkg/web/sse"
sourceModel "github.com/fastenhealth/fasten-sources/clients/models"
"github.com/gin-gonic/gin"
"github.com/glebarez/sqlite"
@ -61,6 +62,7 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger)
AppConfig: appConfig,
Logger: globalLogger,
GormClient: database,
EventBus: sse.GetEventBusServer(),
}
//TODO: automigrate for now, this should be replaced with a migration tool once the DB has stabilized.
@ -92,6 +94,8 @@ type SqliteRepository struct {
Logger logrus.FieldLogger
GormClient *gorm.DB
EventBus *sse.EventBus
}
func (sr *SqliteRepository) Migrate() error {
@ -137,7 +141,7 @@ func (sr *SqliteRepository) GetUserByUsername(ctx context.Context, username stri
return &foundUser, result.Error
}
//TODO: check for error, right now we return a nil which may cause a panic.
// TODO: check for error, right now we return a nil which may cause a panic.
func (sr *SqliteRepository) GetCurrentUser(ctx context.Context) (*models.User, error) {
username := ctx.Value(pkg.ContextKeyTypeAuthUsername)
if username == nil {
@ -255,9 +259,9 @@ func (sr *SqliteRepository) GetSummary(ctx context.Context) (*models.Summary, er
// Resource
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//This function will create a new resource if it does not exist, or update an existing resource if it does exist.
//It will also create associations between fhir resources
//This function is called directly by fasten-sources
// This function will create a new resource if it does not exist, or update an existing resource if it does exist.
// It will also create associations between fhir resources
// This function is called directly by fasten-sources
func (sr *SqliteRepository) UpsertRawResource(ctx context.Context, sourceCredential sourceModel.SourceCredential, rawResource sourceModel.RawResourceFhir) (bool, error) {
source := sourceCredential.(*models.SourceCredential)
@ -321,10 +325,11 @@ func (sr *SqliteRepository) UpsertRawResource(ctx context.Context, sourceCredent
// this method will upsert a resource, however it will not create associations.
// UPSERT operation
// - call FindOrCreate
// - check if the resource exists
// - if it does not exist, insert it
// - check if the resource exists
// - if it does not exist, insert it
//
// - if no error during FindOrCreate && no rows affected (nothing was created)
// - update the resource using Updates operation
// - update the resource using Updates operation
func (sr *SqliteRepository) UpsertResource(ctx context.Context, wrappedResourceModel *models.ResourceBase) (bool, error) {
currentUser, currentUserErr := sr.GetCurrentUser(ctx)
if currentUserErr != nil {
@ -352,6 +357,7 @@ func (sr *SqliteRepository) UpsertResource(ctx context.Context, wrappedResourceM
//wrappedFhirResourceModel.SetResourceRaw(wrappedResourceModel.ResourceRaw)
}
sr.EventBus.Message <- fmt.Sprintf("resource.upsert %s/%s", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID)
createResult := sr.GormClient.WithContext(ctx).Where(models.OriginBase{
SourceID: wrappedFhirResourceModel.GetSourceID(),
SourceResourceID: wrappedFhirResourceModel.GetSourceResourceID(),
@ -618,13 +624,14 @@ func (sr *SqliteRepository) FindResourceAssociationsByTypeAndId(ctx context.Cont
// - find source for each resource
// - (SECURITY) ensure the current user and the source for each resource matches
// - check if there is a Composition resource Type already.
// - if Composition type already exists:
// - update "relatesTo" field with additional data.
// - else:
// - Create a Composition resource type (populated with "relatesTo" references to all provided Resources)
// - if Composition type already exists:
// - update "relatesTo" field with additional data.
// - else:
// - Create a Composition resource type (populated with "relatesTo" references to all provided Resources)
//
// - add AddResourceAssociation for all resources linked to the Composition resource
// - store the Composition resource
//TODO: determine if we should be using a List Resource instead of a Composition resource
// TODO: determine if we should be using a List Resource instead of a Composition resource
func (sr *SqliteRepository) AddResourceComposition(ctx context.Context, compositionTitle string, resources []*models.ResourceBase) error {
currentUser, currentUserErr := sr.GetCurrentUser(ctx)
if currentUserErr != nil {
@ -943,11 +950,11 @@ func sqlitePragmaString(pragmas map[string]string) string {
return ""
}
//Internal function
// Internal function
// This function will return a list of resources from all FHIR tables in the database
// The query allows us to set the source id, source resource id, source resource type
//SECURITY: this function assumes the user has already been authenticated
//TODO: theres probably a more efficient way of doing this with GORM
// SECURITY: this function assumes the user has already been authenticated
// TODO: theres probably a more efficient way of doing this with GORM
func (sr *SqliteRepository) getResourcesFromAllTables(queryBuilder *gorm.DB, queryParam models.OriginBase) ([]models.ResourceBase, error) {
wrappedResourceModels := []models.ResourceBase{}
resourceTypes := databaseModel.GetAllowedResourceTypes()

View File

@ -60,6 +60,7 @@ type EventBus struct {
// It Listens all incoming requests from clients.
// Handles addition and removal of clients and broadcast messages to clients.
// TODO: determine how to route messages based on authenticated client
func (bus *EventBus) listen() {
for {
select {