diff --git a/backend/pkg/config/config.go b/backend/pkg/config/config.go index 3534e75b..76b5e514 100644 --- a/backend/pkg/config/config.go +++ b/backend/pkg/config/config.go @@ -30,6 +30,7 @@ func (c *configuration) Init() error { c.SetDefault("web.allow_unsafe_endpoints", false) c.SetDefault("web.src.frontend.path", "/opt/fasten/web") + c.SetDefault("database.type", "sqlite") c.SetDefault("database.location", "/opt/fasten/db/fasten.db") c.SetDefault("cache.location", "/opt/fasten/cache/") diff --git a/backend/pkg/constants.go b/backend/pkg/constants.go index 5b77b9ed..6d056e26 100644 --- a/backend/pkg/constants.go +++ b/backend/pkg/constants.go @@ -5,6 +5,8 @@ type BackgroundJobStatus string type BackgroundJobType string type BackgroundJobSchedule string +type DatabaseRepositoryType string + const ( ResourceListPageSize int = 20 @@ -36,4 +38,7 @@ const ( BackgroundJobScheduleWeekly BackgroundJobSchedule = "WEEKLY" BackgroundJobScheduleBiWeekly BackgroundJobSchedule = "BIWEEKLY" BackgroundJobScheduleMonthly BackgroundJobSchedule = "MONTHLY" + + DatabaseRepositoryTypeSqlite DatabaseRepositoryType = "sqlite" + DatabaseRepositoryTypePostgres DatabaseRepositoryType = "postgres" ) diff --git a/backend/pkg/database/factory.go b/backend/pkg/database/factory.go new file mode 100644 index 00000000..7575d44b --- /dev/null +++ b/backend/pkg/database/factory.go @@ -0,0 +1,20 @@ +package database + +import ( + "github.com/fastenhealth/fasten-onprem/backend/pkg" + "github.com/fastenhealth/fasten-onprem/backend/pkg/config" + "github.com/fastenhealth/fasten-onprem/backend/pkg/errors" + "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" + "github.com/sirupsen/logrus" +) + +func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger, eventBus event_bus.Interface) (DatabaseRepository, error) { + switch pkg.DatabaseRepositoryType(appConfig.GetString("database.type")) { + case pkg.DatabaseRepositoryTypeSqlite: + return newSqliteRepository(appConfig, globalLogger, eventBus) + case pkg.DatabaseRepositoryTypePostgres: + return newPostgresRepository(appConfig, globalLogger, eventBus) + default: + return nil, errors.DatabaseTypeNotSupportedError(appConfig.GetString("database.type")) + } +} diff --git a/backend/pkg/database/gorm_common.go b/backend/pkg/database/gorm_common.go new file mode 100644 index 00000000..65d0acf2 --- /dev/null +++ b/backend/pkg/database/gorm_common.go @@ -0,0 +1,1173 @@ +package database + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/url" + "strings" + "time" + + "github.com/fastenhealth/fasten-onprem/backend/pkg" + "github.com/fastenhealth/fasten-onprem/backend/pkg/config" + "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" + "github.com/fastenhealth/fasten-onprem/backend/pkg/models" + databaseModel "github.com/fastenhealth/fasten-onprem/backend/pkg/models/database" + "github.com/fastenhealth/fasten-onprem/backend/pkg/utils" + sourceModel "github.com/fastenhealth/fasten-sources/clients/models" + "github.com/gin-gonic/gin" + "github.com/google/uuid" + "github.com/sirupsen/logrus" + "gorm.io/datatypes" + "gorm.io/gorm" +) + +type GormRepository struct { + AppConfig config.Interface + Logger logrus.FieldLogger + + GormClient *gorm.DB + + EventBus event_bus.Interface +} + +func (gr *GormRepository) Migrate() error { + err := gr.GormClient.AutoMigrate( + &models.User{}, + &models.SourceCredential{}, + &models.BackgroundJob{}, + &models.Glossary{}, + &models.UserSettingEntry{}, + ) + if err != nil { + return fmt.Errorf("Failed to automigrate! - %v", err) + } + return nil +} + +func (gr *GormRepository) Close() error { + return nil +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// User +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +func (gr *GormRepository) CreateUser(ctx context.Context, user *models.User) error { + if err := user.HashPassword(user.Password); err != nil { + return err + } + record := gr.GormClient.Create(user) + if record.Error != nil { + return record.Error + } + + //create user settings + err := gr.PopulateDefaultUserSettings(ctx, user.ID) + if err != nil { + return err + } + return nil +} +func (gr *GormRepository) GetUserByUsername(ctx context.Context, username string) (*models.User, error) { + var foundUser models.User + result := gr.GormClient.WithContext(ctx).Where(models.User{Username: username}).First(&foundUser) + return &foundUser, result.Error +} + +// TODO: check for error, right now we return a nil which may cause a panic. +// TODO: can we cache the current user? //SECURITY: +func (gr *GormRepository) GetCurrentUser(ctx context.Context) (*models.User, error) { + username := ctx.Value(pkg.ContextKeyTypeAuthUsername) + if username == nil { + ginCtx, ginCtxOk := ctx.(*gin.Context) + if !ginCtxOk { + return nil, fmt.Errorf("could not convert context to gin context") + } + var exists bool + username, exists = ginCtx.Get(pkg.ContextKeyTypeAuthUsername) + if !exists { + return nil, fmt.Errorf("could not extract username from context") + } + } + + var currentUser models.User + usernameStr, usernameStrOk := username.(string) + if !usernameStrOk { + return nil, fmt.Errorf("could not convert username to string: %v", username) + } + + result := gr.GormClient. + WithContext(ctx). + First(¤tUser, map[string]interface{}{"username": usernameStr}) + + if result.Error != nil { + return nil, fmt.Errorf("could not retrieve current user: %v", result.Error) + } + + return ¤tUser, nil +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Glossary +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +func (gr *GormRepository) CreateGlossaryEntry(ctx context.Context, glossaryEntry *models.Glossary) error { + record := gr.GormClient.WithContext(ctx).Create(glossaryEntry) + if record.Error != nil { + return record.Error + } + return nil +} + +func (gr *GormRepository) GetGlossaryEntry(ctx context.Context, code string, codeSystem string) (*models.Glossary, error) { + var foundGlossaryEntry models.Glossary + result := gr.GormClient.WithContext(ctx). + Where(models.Glossary{Code: code, CodeSystem: codeSystem}). + First(&foundGlossaryEntry) + return &foundGlossaryEntry, result.Error +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Summary +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +func (gr *GormRepository) GetSummary(ctx context.Context) (*models.Summary, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return nil, currentUserErr + } + + // we want a count of all resources for this user by type + var resourceCountResults []map[string]interface{} + + resourceTypes := databaseModel.GetAllowedResourceTypes() + for _, resourceType := range resourceTypes { + tableName, err := databaseModel.GetTableNameByResourceType(resourceType) + if err != nil { + return nil, err + } + var count int64 + result := gr.GormClient.WithContext(ctx). + Table(tableName). + Where(models.OriginBase{ + UserID: currentUser.ID, + }). + Count(&count) + if result.Error != nil { + return nil, result.Error + } + if count == 0 { + continue //don't add resource counts if the count is 0 + } + resourceCountResults = append(resourceCountResults, map[string]interface{}{ + "resource_type": resourceType, + "count": count, + }) + } + + // we want a list of all sources (when they were last updated) + sources, err := gr.GetSources(ctx) + if err != nil { + return nil, err + } + + // we want the main Patient for each source + patients, err := gr.GetPatientForSources(ctx) + if err != nil { + return nil, err + } + + if resourceCountResults == nil { + resourceCountResults = []map[string]interface{}{} + } + summary := &models.Summary{ + Sources: sources, + ResourceTypeCounts: resourceCountResults, + Patients: patients, + } + + return summary, nil +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Resource +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// This function will create a new resource if it does not exist, or update an existing resource if it does exist. +// It will also create associations between fhir resources +// This function is called directly by fasten-sources +func (gr *GormRepository) UpsertRawResource(ctx context.Context, sourceCredential sourceModel.SourceCredential, rawResource sourceModel.RawResourceFhir) (bool, error) { + + source := sourceCredential.(*models.SourceCredential) + + //convert from a raw resource (from fasten-sources) to a ResourceFhir (which matches the database models) + wrappedResourceModel := &models.ResourceBase{ + OriginBase: models.OriginBase{ + ModelBase: models.ModelBase{}, + UserID: source.UserID, + SourceID: source.ID, + SourceResourceID: rawResource.SourceResourceID, + SourceResourceType: rawResource.SourceResourceType, + }, + SortTitle: rawResource.SortTitle, + SortDate: rawResource.SortDate, + ResourceRaw: datatypes.JSON(rawResource.ResourceRaw), + RelatedResource: nil, + } + if len(rawResource.SourceUri) > 0 { + wrappedResourceModel.SourceUri = &rawResource.SourceUri + } + + //create associations + //note: we create the association in the related_resources table **before** the model actually exists. + //note: these associations are not reciprocal, (i.e. if Procedure references Location, Location may not reference Procedure) + if rawResource.ReferencedResources != nil && len(rawResource.ReferencedResources) > 0 { + for _, referencedResource := range rawResource.ReferencedResources { + parts := strings.Split(referencedResource, "/") + if len(parts) != 2 { + continue + } + + relatedResource := &models.ResourceBase{ + OriginBase: models.OriginBase{ + SourceID: source.ID, + SourceResourceType: parts[0], + SourceResourceID: parts[1], + }, + RelatedResource: nil, + } + err := gr.AddResourceAssociation( + ctx, + source, + wrappedResourceModel.SourceResourceType, + wrappedResourceModel.SourceResourceID, + source, + relatedResource.SourceResourceType, + relatedResource.SourceResourceID, + ) + if err != nil { + return false, err + } + } + } + + return gr.UpsertResource(ctx, wrappedResourceModel) + +} + +// UpsertResource +// this method will upsert a resource, however it will not create associations. +// UPSERT operation +// - call FindOrCreate +// - check if the resource exists +// - if it does not exist, insert it +// +// - if no error during FindOrCreate && no rows affected (nothing was created) +// - update the resource using Updates operation +func (gr *GormRepository) UpsertResource(ctx context.Context, wrappedResourceModel *models.ResourceBase) (bool, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return false, currentUserErr + } + + wrappedResourceModel.UserID = currentUser.ID + cachedResourceRaw := wrappedResourceModel.ResourceRaw + + gr.Logger.Infof("insert/update FHIRResource (%v) %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID) + wrappedFhirResourceModel, err := databaseModel.NewFhirResourceModelByType(wrappedResourceModel.SourceResourceType) + if err != nil { + return false, err + } + + wrappedFhirResourceModel.SetOriginBase(wrappedResourceModel.OriginBase) + wrappedFhirResourceModel.SetSortTitle(wrappedResourceModel.SortTitle) + wrappedFhirResourceModel.SetSortDate(wrappedResourceModel.SortDate) + wrappedFhirResourceModel.SetSourceUri(wrappedResourceModel.SourceUri) + + //TODO: this takes too long, we need to find a way to do this processing faster or in the background async. + err = wrappedFhirResourceModel.PopulateAndExtractSearchParameters(json.RawMessage(wrappedResourceModel.ResourceRaw)) + if err != nil { + gr.Logger.Warnf("ignoring: an error occurred while extracting SearchParameters using FHIRPath (%s/%s): %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, err) + //wrappedFhirResourceModel.SetResourceRaw(wrappedResourceModel.ResourceRaw) + } + + eventSourceSync := models.NewEventSourceSync( + currentUser.ID.String(), + wrappedFhirResourceModel.GetSourceID().String(), + wrappedFhirResourceModel.GetSourceResourceType(), + wrappedFhirResourceModel.GetSourceResourceID(), + ) + + err = gr.EventBus.PublishMessage(eventSourceSync) + if err != nil { + gr.Logger.Warnf("ignoring: an error occurred while publishing event to eventBus (%s/%s): %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, err) + } + + createResult := gr.GormClient.WithContext(ctx).Where(models.OriginBase{ + SourceID: wrappedFhirResourceModel.GetSourceID(), + SourceResourceID: wrappedFhirResourceModel.GetSourceResourceID(), + SourceResourceType: wrappedFhirResourceModel.GetSourceResourceType(), //TODO: and UpdatedAt > old UpdatedAt + }).Omit("RelatedResource.*").FirstOrCreate(wrappedFhirResourceModel) + + if createResult.Error != nil { + return false, createResult.Error + } else if createResult.RowsAffected == 0 { + //at this point, wrappedResourceModel contains the data found in the database. + // check if the database resource matches the new resource. + if wrappedResourceModel.ResourceRaw.String() != string(cachedResourceRaw) { + updateResult := createResult.Omit("RelatedResource.*").Updates(wrappedResourceModel) + return updateResult.RowsAffected > 0, updateResult.Error + } else { + return false, nil + } + + } else { + //resource was created + return createResult.RowsAffected > 0, createResult.Error + } +} + +func (gr *GormRepository) ListResources(ctx context.Context, queryOptions models.ListResourceQueryOptions) ([]models.ResourceBase, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return nil, currentUserErr + } + + queryParam := models.OriginBase{ + UserID: currentUser.ID, + } + + if len(queryOptions.SourceResourceType) > 0 { + queryParam.SourceResourceType = queryOptions.SourceResourceType + } + + if len(queryOptions.SourceID) > 0 { + sourceUUID, err := uuid.Parse(queryOptions.SourceID) + if err != nil { + return nil, err + } + + queryParam.SourceID = sourceUUID + } + if len(queryOptions.SourceResourceID) > 0 { + queryParam.SourceResourceID = queryOptions.SourceResourceID + } + + manifestJson, _ := json.MarshalIndent(queryParam, "", " ") + gr.Logger.Debugf("THE QUERY OBJECT===========> %v", string(manifestJson)) + + var wrappedResourceModels []models.ResourceBase + queryBuilder := gr.GormClient.WithContext(ctx) + if len(queryOptions.SourceResourceType) > 0 { + tableName, err := databaseModel.GetTableNameByResourceType(queryOptions.SourceResourceType) + if err != nil { + return nil, err + } + queryBuilder = queryBuilder. + Where(queryParam). + Table(tableName) + + if queryOptions.Limit > 0 { + queryBuilder = queryBuilder.Limit(queryOptions.Limit).Offset(queryOptions.Offset) + } + return wrappedResourceModels, queryBuilder.Find(&wrappedResourceModels).Error + } else { + if queryOptions.Limit > 0 { + queryBuilder = queryBuilder.Limit(queryOptions.Limit).Offset(queryOptions.Offset) + } + //there is no FHIR Resource name specified, so we're querying across all FHIR resources + return gr.getResourcesFromAllTables(queryBuilder, queryParam) + } +} + +// TODO: should this be deprecated? (replaced by ListResources) +func (gr *GormRepository) GetResourceByResourceTypeAndId(ctx context.Context, sourceResourceType string, sourceResourceId string) (*models.ResourceBase, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return nil, currentUserErr + } + + tableName, err := databaseModel.GetTableNameByResourceType(sourceResourceType) + if err != nil { + return nil, err + } + + queryParam := models.OriginBase{ + UserID: currentUser.ID, + SourceResourceType: sourceResourceType, + SourceResourceID: sourceResourceId, + } + + var wrappedResourceModel models.ResourceBase + results := gr.GormClient.WithContext(ctx). + Where(queryParam). + Table(tableName). + First(&wrappedResourceModel) + + return &wrappedResourceModel, results.Error +} + +// we need to figure out how to get the source resource type from the source resource id, or if we're searching across every table :( +func (gr *GormRepository) GetResourceBySourceId(ctx context.Context, sourceId string, sourceResourceId string) (*models.ResourceBase, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return nil, currentUserErr + } + + sourceIdUUID, err := uuid.Parse(sourceId) + if err != nil { + return nil, err + } + + queryParam := models.OriginBase{ + UserID: currentUser.ID, + SourceID: sourceIdUUID, + SourceResourceID: sourceResourceId, + } + + //there is no FHIR Resource name specified, so we're querying across all FHIR resources + wrappedResourceModels, err := gr.getResourcesFromAllTables(gr.GormClient.WithContext(ctx), queryParam) + if len(wrappedResourceModels) > 0 { + return &wrappedResourceModels[0], err + } else { + return nil, fmt.Errorf("no resource found with source id %s and source resource id %s", sourceId, sourceResourceId) + } +} + +// Get the patient for each source (for the current user) +func (gr *GormRepository) GetPatientForSources(ctx context.Context) ([]models.ResourceBase, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return nil, currentUserErr + } + + //SELECT * FROM resource_bases WHERE user_id = "" and source_resource_type = "Patient" GROUP BY source_id + + tableName, err := databaseModel.GetTableNameByResourceType("Patient") + if err != nil { + return nil, err + } + + var wrappedResourceModels []models.ResourceBase + results := gr.GormClient.WithContext(ctx). + //Group("source_id"). //broken in Postgres. + Where(models.OriginBase{ + UserID: currentUser.ID, + SourceResourceType: "Patient", + }). + Table(tableName). + Find(&wrappedResourceModels) + + return wrappedResourceModels, results.Error +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Resource Associations +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// verifyAssociationPermission ensure that the sources are "owned" by the same user, and that the user is the current user +func (gr *GormRepository) verifyAssociationPermission(ctx context.Context, sourceUserID uuid.UUID, relatedSourceUserID uuid.UUID) error { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return currentUserErr + } + if sourceUserID != relatedSourceUserID { + return fmt.Errorf("user id's must match when adding associations") + } else if sourceUserID != currentUser.ID { + return fmt.Errorf("user id's must match current user") + } + + return nil +} + +func (gr *GormRepository) AddResourceAssociation(ctx context.Context, source *models.SourceCredential, resourceType string, resourceId string, relatedSource *models.SourceCredential, relatedResourceType string, relatedResourceId string) error { + //ensure that the sources are "owned" by the same user + err := gr.verifyAssociationPermission(ctx, source.UserID, relatedSource.UserID) + if err != nil { + return err + } + + err = gr.GormClient.WithContext(ctx).Table("related_resources").Create(map[string]interface{}{ + "resource_base_user_id": source.UserID, + "resource_base_source_id": source.ID, + "resource_base_source_resource_type": resourceType, + "resource_base_source_resource_id": resourceId, + "related_resource_user_id": relatedSource.UserID, + "related_resource_source_id": relatedSource.ID, + "related_resource_source_resource_type": relatedResourceType, + "related_resource_source_resource_id": relatedResourceId, + }).Error + uniqueConstraintError := errors.New("constraint failed: UNIQUE constraint failed") + if err != nil { + if strings.HasPrefix(err.Error(), uniqueConstraintError.Error()) { + gr.Logger.Warnf("Ignoring an error when creating a related_resource association for %s/%s: %v", resourceType, resourceId, err) + //we can safely ignore this error + return nil + } + } + return err +} + +func (gr *GormRepository) RemoveResourceAssociation(ctx context.Context, source *models.SourceCredential, resourceType string, resourceId string, relatedSource *models.SourceCredential, relatedResourceType string, relatedResourceId string) error { + //ensure that the sources are "owned" by the same user + err := gr.verifyAssociationPermission(ctx, source.UserID, relatedSource.UserID) + if err != nil { + return err + } + + //manually delete association + results := gr.GormClient.WithContext(ctx). + //Table("related_resources"). + Delete(&models.RelatedResource{}, map[string]interface{}{ + "resource_base_user_id": source.UserID, + "resource_base_source_id": source.ID, + "resource_base_source_resource_type": resourceType, + "resource_base_source_resource_id": resourceId, + "related_resource_user_id": relatedSource.UserID, + "related_resource_source_id": relatedSource.ID, + "related_resource_source_resource_type": relatedResourceType, + "related_resource_source_resource_id": relatedResourceId, + }) + + if results.Error != nil { + return results.Error + } else if results.RowsAffected == 0 { + return fmt.Errorf("no association found for %s/%s and %s/%s", resourceType, resourceId, relatedResourceType, relatedResourceId) + } + return nil +} + +func (gr *GormRepository) FindResourceAssociationsByTypeAndId(ctx context.Context, source *models.SourceCredential, resourceType string, resourceId string) ([]models.RelatedResource, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return nil, currentUserErr + } + + if source.UserID != currentUser.ID { + return nil, fmt.Errorf("source credential must match the current user id") + } + + // SELECT * FROM related_resources WHERE user_id = "53c1e930-63af-46c9-b760-8e83cbc1abd9"; + var relatedResources []models.RelatedResource + result := gr.GormClient.WithContext(ctx). + Where(models.RelatedResource{ + ResourceBaseUserID: currentUser.ID, + ResourceBaseSourceID: source.ID, + ResourceBaseSourceResourceType: resourceType, + ResourceBaseSourceResourceID: resourceId, + }). + Find(&relatedResources) + return relatedResources, result.Error +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Resource Composition (Grouping) +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// AddResourceComposition +// this will group resources together into a "Composition" -- primarily to group related Encounters & Conditions into one semantic root. +// algorithm: +// - find source for each resource +// - (SECURITY) ensure the current user and the source for each resource matches +// - check if there is a Composition resource Type already. +// - if Composition type already exists: +// - update "relatesTo" field with additional data. +// - else: +// - Create a Composition resource type (populated with "relatesTo" references to all provided Resources) +// +// - add AddResourceAssociation for all resources linked to the Composition resource +// - store the Composition resource +// TODO: determine if we should be using a List Resource instead of a Composition resource +func (gr *GormRepository) AddResourceComposition(ctx context.Context, compositionTitle string, resources []*models.ResourceBase) error { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return currentUserErr + } + + //generate placeholder source + placeholderSource := models.SourceCredential{UserID: currentUser.ID, SourceType: "manual", ModelBase: models.ModelBase{ID: uuid.MustParse("00000000-0000-0000-0000-000000000000")}} + + existingCompositionResources := []*models.ResourceBase{} + rawResourceLookupTable := map[string]*models.ResourceBase{} + + //find the source for each resource we'd like to merge. (for ownership verification) + sourceLookup := map[uuid.UUID]*models.SourceCredential{} + for _, resource := range resources { + if resource.SourceResourceType == pkg.FhirResourceTypeComposition { + //skip, Composition resources don't have a valid SourceCredential + existingCompositionResources = append(existingCompositionResources, resource) + + //compositions may include existing resources, make sure we handle these + for _, related := range resource.RelatedResource { + rawResourceLookupTable[fmt.Sprintf("%s/%s", related.SourceResourceType, related.SourceResourceID)] = related + } + continue + } + + if _, sourceOk := sourceLookup[resource.SourceID]; !sourceOk { + //source has not been added yet, lets query for it. + sourceCred, err := gr.GetSource(ctx, resource.SourceID.String()) + if err != nil { + return fmt.Errorf("could not find source %s", resource.SourceID.String()) + } + sourceLookup[resource.SourceID] = sourceCred + } + + rawResourceLookupTable[fmt.Sprintf("%s/%s", resource.SourceResourceType, resource.SourceResourceID)] = resource + } + + // SECURITY: ensure the current user and the source for each resource matches + for _, source := range sourceLookup { + if source.UserID != currentUser.ID { + return fmt.Errorf("source must be owned by the current user: %s vs %s", source.UserID, currentUser.ID) + } + } + + // - check if there is a Composition resource Type already. + var compositionResource *models.ResourceBase + + if len(existingCompositionResources) > 0 { + //- if Composition type already exists in this set + // - update "relatesTo" field with additional data. + compositionResource = existingCompositionResources[0] + + //disassociate all existing remaining composition resources. + for _, existingCompositionResource := range existingCompositionResources[1:] { + for _, relatedResource := range existingCompositionResource.RelatedResource { + if err := gr.RemoveResourceAssociation( + ctx, + &placeholderSource, + existingCompositionResource.SourceResourceType, + existingCompositionResource.SourceResourceID, + sourceLookup[relatedResource.SourceID], + relatedResource.SourceResourceType, + relatedResource.SourceResourceID, + ); err != nil { + //ignoring errors, could be due to duplicate edges + return fmt.Errorf("an error occurred while removing resource association: %v", err) + } + } + + //remove this resource + compositionTable, err := databaseModel.GetTableNameByResourceType("Composition") + if err != nil { + return fmt.Errorf("an error occurred while finding Composition resource table: %v", err) + } + //TODO: we may need to delete with using the FhirComposition struct type + deleteResult := gr.GormClient.WithContext(ctx). + Table(compositionTable). + Delete(existingCompositionResource) + if deleteResult.Error != nil { + return fmt.Errorf("an error occurred while removing Composition resource(%s/%s): %v", existingCompositionResource.SourceResourceType, existingCompositionResource.SourceID, err) + } else if deleteResult.RowsAffected != 1 { + return fmt.Errorf("composition resource was not deleted %s/%s", existingCompositionResource.SourceResourceType, existingCompositionResource.SourceID) + } + } + + } else { + //- else: + // - Create a Composition resource type (populated with "relatesTo" references to all provided Resources) + compositionResource = &models.ResourceBase{ + OriginBase: models.OriginBase{ + UserID: placeholderSource.UserID, // + SourceID: placeholderSource.ID, //Empty SourceID expected ("0000-0000-0000-0000") + SourceResourceType: pkg.FhirResourceTypeComposition, + SourceResourceID: uuid.New().String(), + }, + } + } + + // - Generate an "updated" RawResource json blob + rawCompositionResource := models.ResourceComposition{ + Title: compositionTitle, + RelatesTo: []models.ResourceCompositionRelatesTo{}, + } + + for relatedResourceKey, _ := range rawResourceLookupTable { + rawCompositionResource.RelatesTo = append(rawCompositionResource.RelatesTo, models.ResourceCompositionRelatesTo{ + Target: models.ResourceCompositionRelatesToTarget{ + TargetReference: models.ResourceCompositionRelatesToTargetReference{ + Reference: relatedResourceKey, + }, + }, + }) + } + + rawResourceJson, err := json.Marshal(rawCompositionResource) + if err != nil { + return err + } + compositionResource.ResourceRaw = rawResourceJson + + compositionResource.SortTitle = &compositionTitle + compositionResource.RelatedResource = utils.SortResourcePtrListByDate(resources) + compositionResource.SortDate = compositionResource.RelatedResource[0].SortDate + + //store the Composition resource + _, err = gr.UpsertResource(ctx, compositionResource) + if err != nil { + return err + } + + // - add AddResourceAssociation for all resources linked to the Composition resource + for _, resource := range rawResourceLookupTable { + if err := gr.AddResourceAssociation( + ctx, + &placeholderSource, + compositionResource.SourceResourceType, + compositionResource.SourceResourceID, + sourceLookup[resource.SourceID], + resource.SourceResourceType, + resource.SourceResourceID, + ); err != nil { + return err + } + } + + return nil +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// SourceCredential +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +func (gr *GormRepository) CreateSource(ctx context.Context, sourceCreds *models.SourceCredential) error { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return currentUserErr + } + sourceCreds.UserID = currentUser.ID + + //Assign will **always** update the source credential in the DB with data passed into this function. + return gr.GormClient.WithContext(ctx). + Where(models.SourceCredential{ + UserID: sourceCreds.UserID, + SourceType: sourceCreds.SourceType, + Patient: sourceCreds.Patient}). + Assign(*sourceCreds).FirstOrCreate(sourceCreds).Error +} + +func (gr *GormRepository) UpdateSource(ctx context.Context, sourceCreds *models.SourceCredential) error { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return currentUserErr + } + sourceCreds.UserID = currentUser.ID + + //Assign will **always** update the source credential in the DB with data passed into this function. + return gr.GormClient.WithContext(ctx). + Where(models.SourceCredential{ + ModelBase: models.ModelBase{ID: sourceCreds.ID}, + UserID: sourceCreds.UserID, + SourceType: sourceCreds.SourceType, + }).Updates(models.SourceCredential{ + AccessToken: sourceCreds.AccessToken, + RefreshToken: sourceCreds.RefreshToken, + ExpiresAt: sourceCreds.ExpiresAt, + DynamicClientId: sourceCreds.DynamicClientId, + DynamicClientRegistrationMode: sourceCreds.DynamicClientRegistrationMode, + DynamicClientJWKS: sourceCreds.DynamicClientJWKS, + LatestBackgroundJobID: sourceCreds.LatestBackgroundJobID, + }).Error +} + +func (gr *GormRepository) GetSource(ctx context.Context, sourceId string) (*models.SourceCredential, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return nil, currentUserErr + } + + sourceUUID, err := uuid.Parse(sourceId) + if err != nil { + return nil, err + } + + var sourceCred models.SourceCredential + results := gr.GormClient.WithContext(ctx). + Where(models.SourceCredential{UserID: currentUser.ID, ModelBase: models.ModelBase{ID: sourceUUID}}). + Preload("LatestBackgroundJob"). + First(&sourceCred) + + return &sourceCred, results.Error +} + +func (gr *GormRepository) GetSourceSummary(ctx context.Context, sourceId string) (*models.SourceSummary, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return nil, currentUserErr + } + + sourceUUID, err := uuid.Parse(sourceId) + if err != nil { + return nil, err + } + + sourceSummary := &models.SourceSummary{} + + source, err := gr.GetSource(ctx, sourceId) + if err != nil { + return nil, err + } + sourceSummary.Source = source + + //group by resource type and return counts + // SELECT source_resource_type as resource_type, COUNT(*) as count FROM resource_bases WHERE source_id = "53c1e930-63af-46c9-b760-8e83cbc1abd9" GROUP BY source_resource_type; + + var resourceTypeCounts []map[string]interface{} + + resourceTypes := databaseModel.GetAllowedResourceTypes() + for _, resourceType := range resourceTypes { + tableName, err := databaseModel.GetTableNameByResourceType(resourceType) + if err != nil { + return nil, err + } + var count int64 + result := gr.GormClient.WithContext(ctx). + Table(tableName). + Where(models.OriginBase{ + UserID: currentUser.ID, + SourceID: sourceUUID, + }). + Count(&count) + if result.Error != nil { + return nil, result.Error + } + if count == 0 { + continue //don't add resource counts if the count is 0 + } + resourceTypeCounts = append(resourceTypeCounts, map[string]interface{}{ + "source_id": sourceId, + "resource_type": resourceType, + "count": count, + }) + } + + sourceSummary.ResourceTypeCounts = resourceTypeCounts + + //set patient + patientTableName, err := databaseModel.GetTableNameByResourceType("Patient") + if err != nil { + return nil, err + } + var wrappedPatientResourceModel models.ResourceBase + patientResults := gr.GormClient.WithContext(ctx). + Where(models.OriginBase{ + UserID: currentUser.ID, + SourceResourceType: "Patient", + SourceID: sourceUUID, + }). + Table(patientTableName). + First(&wrappedPatientResourceModel) + + if patientResults.Error != nil { + return nil, patientResults.Error + } + sourceSummary.Patient = &wrappedPatientResourceModel + + return sourceSummary, nil +} + +func (gr *GormRepository) GetSources(ctx context.Context) ([]models.SourceCredential, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return nil, currentUserErr + } + + var sourceCreds []models.SourceCredential + results := gr.GormClient.WithContext(ctx). + Where(models.SourceCredential{UserID: currentUser.ID}). + Preload("LatestBackgroundJob"). + Find(&sourceCreds) + + return sourceCreds, results.Error +} + +func (gr *GormRepository) DeleteSource(ctx context.Context, sourceId string) (int64, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return 0, currentUserErr + } + + if strings.TrimSpace(sourceId) == "" { + return 0, fmt.Errorf("sourceId cannot be blank") + } + //delete all resources for this source + sourceUUID, err := uuid.Parse(sourceId) + if err != nil { + return 0, err + } + + rowsEffected := int64(0) + resourceTypes := databaseModel.GetAllowedResourceTypes() + for _, resourceType := range resourceTypes { + tableName, err := databaseModel.GetTableNameByResourceType(resourceType) + if err != nil { + return 0, err + } + results := gr.GormClient.WithContext(ctx). + Where(models.OriginBase{ + UserID: currentUser.ID, + SourceID: sourceUUID, + }). + Table(tableName). + Delete(&models.ResourceBase{}) + rowsEffected += results.RowsAffected + if results.Error != nil { + return rowsEffected, results.Error + } + } + + //delete relatedResources entries + results := gr.GormClient.WithContext(ctx). + Where(models.RelatedResource{ResourceBaseUserID: currentUser.ID, ResourceBaseSourceID: sourceUUID}). + Delete(&models.RelatedResource{}) + if results.Error != nil { + return rowsEffected, results.Error + } + + //soft delete the source credential + results = gr.GormClient.WithContext(ctx). + Where(models.SourceCredential{ + ModelBase: models.ModelBase{ + ID: sourceUUID, + }, + UserID: currentUser.ID, + }). + Delete(&models.SourceCredential{}) + rowsEffected += results.RowsAffected + return rowsEffected, results.Error +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Background Job +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +func (gr *GormRepository) CreateBackgroundJob(ctx context.Context, backgroundJob *models.BackgroundJob) error { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return currentUserErr + } + + backgroundJob.UserID = currentUser.ID + + record := gr.GormClient.Create(backgroundJob) + return record.Error +} + +func (gr *GormRepository) GetBackgroundJob(ctx context.Context, backgroundJobId string) (*models.BackgroundJob, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return nil, currentUserErr + } + + backgroundJobUUID, err := uuid.Parse(backgroundJobId) + if err != nil { + return nil, err + } + + var backgroundJob models.BackgroundJob + results := gr.GormClient.WithContext(ctx). + Where(models.SourceCredential{UserID: currentUser.ID, ModelBase: models.ModelBase{ID: backgroundJobUUID}}). + First(&backgroundJob) + + return &backgroundJob, results.Error +} + +func (gr *GormRepository) UpdateBackgroundJob(ctx context.Context, backgroundJob *models.BackgroundJob) error { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return currentUserErr + } + backgroundJob.UserID = currentUser.ID + + return gr.GormClient.WithContext(ctx). + Where(models.BackgroundJob{ + ModelBase: models.ModelBase{ID: backgroundJob.ID}, + UserID: backgroundJob.UserID, + }).Updates(models.BackgroundJob{ + JobStatus: backgroundJob.JobStatus, + Data: backgroundJob.Data, + LockedTime: backgroundJob.LockedTime, + DoneTime: backgroundJob.DoneTime, + Retries: backgroundJob.Retries, + Schedule: backgroundJob.Schedule, + }).Error +} + +func (gr *GormRepository) ListBackgroundJobs(ctx context.Context, queryOptions models.BackgroundJobQueryOptions) ([]models.BackgroundJob, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + return nil, currentUserErr + } + + queryParam := models.BackgroundJob{ + UserID: currentUser.ID, + } + + if queryOptions.JobType != nil { + queryParam.JobType = *queryOptions.JobType + } + if queryOptions.Status != nil { + queryParam.JobStatus = *queryOptions.Status + } + + var backgroundJobs []models.BackgroundJob + query := gr.GormClient.WithContext(ctx). + //Group("source_id"). //broken in Postgres. + Where(queryParam).Limit(queryOptions.Limit).Order("locked_time DESC") + + if queryOptions.Offset > 0 { + query = query.Offset(queryOptions.Offset) + } + + return backgroundJobs, query.Find(&backgroundJobs).Error +} + +func (gr *GormRepository) BackgroundJobCheckpoint(ctx context.Context, checkpointData map[string]interface{}, errorData map[string]interface{}) { + gr.Logger.Info("begin checkpointing background job...") + if len(checkpointData) == 0 && len(errorData) == 0 { + gr.Logger.Info("no changes detected. Skipping checkpoint") + return //nothing to do + } + defer gr.Logger.Info("end checkpointing background job") + + currentUser, currentUserErr := gr.GetCurrentUser(ctx) + if currentUserErr != nil { + gr.Logger.Warning("could not find current user info context. Ignoring checkpoint", currentUserErr) + return + } + + //make sure we do an atomic update + backgroundJobId, ok := ctx.Value(pkg.ContextKeyTypeBackgroundJobID).(string) + if !ok { + gr.Logger.Warning("could not find background job id in context. Ignoring checkpoint") + return + } + backgroundJobUUID, err := uuid.Parse(backgroundJobId) + if err != nil { + gr.Logger.Warning("could not parse background job id. Ignoring checkpoint", err) + return + } + //https://gorm.io/docs/advanced_query.html#Locking-FOR-UPDATE + //TODO: if using another database type (not SQLITE) we need to make sure we use the correct locking strategy + //This is not a problem in SQLITE because it does database (or table) level locking by default + //var backgroundJob models.BackgroundJob + //gr.GormClient.Clauses(clause.Locking{Strength: "UPDATE"}).Find(&backgroundJob) + + txErr := gr.GormClient.Transaction(func(tx *gorm.DB) error { + //retrieve the background job by id + var backgroundJob models.BackgroundJob + backgroundJobFindResults := tx.WithContext(ctx). + Where(models.BackgroundJob{ + ModelBase: models.ModelBase{ID: backgroundJobUUID}, + UserID: currentUser.ID, + }). + First(&backgroundJob) + if backgroundJobFindResults.Error != nil { + return backgroundJobFindResults.Error + } + + //deserialize the job data + var backgroundJobSyncData models.BackgroundJobSyncData + if backgroundJob.Data != nil { + err := json.Unmarshal(backgroundJob.Data, &backgroundJobSyncData) + if err != nil { + return err + } + } + + //update the job data with new data provided by the calling functiion + changed := false + if len(checkpointData) > 0 { + backgroundJobSyncData.CheckpointData = checkpointData + changed = true + } + if len(errorData) > 0 { + backgroundJobSyncData.ErrorData = errorData + changed = true + } + + //define a background job with the fields we're going to update + now := time.Now() + updatedBackgroundJob := models.BackgroundJob{ + LockedTime: &now, + } + if changed { + serializedData, err := json.Marshal(backgroundJobSyncData) + if err != nil { + return err + } + updatedBackgroundJob.Data = serializedData + + } + + return tx.WithContext(ctx). + Where(models.BackgroundJob{ + ModelBase: models.ModelBase{ID: backgroundJobUUID}, + UserID: currentUser.ID, + }).Updates(updatedBackgroundJob).Error + }) + + if txErr != nil { + gr.Logger.Warning("could not find or update background job. Ignoring checkpoint", txErr) + } + +} + +// when server restarts, we should unlock all locked jobs, and set their status to failed +// SECURITY: this is global, and effects all users. +func (gr *GormRepository) CancelAllLockedBackgroundJobsAndFail() error { + now := time.Now() + return gr.GormClient. + Where(models.BackgroundJob{JobStatus: pkg.BackgroundJobStatusLocked}). + Updates(models.BackgroundJob{ + JobStatus: pkg.BackgroundJobStatusFailed, + DoneTime: &now, + }).Error + +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Utilities +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +func sqlitePragmaString(pragmas map[string]string) string { + q := url.Values{} + for key, val := range pragmas { + q.Add("_pragma", fmt.Sprintf("%s=%s", key, val)) + } + + queryStr := q.Encode() + if len(queryStr) > 0 { + return "?" + queryStr + } + return "" +} + +// Internal function +// This function will return a list of resources from all FHIR tables in the database +// The query allows us to set the source id, source resource id, source resource type +// SECURITY: this function assumes the user has already been authenticated +// TODO: theres probably a more efficient way of doing this with GORM +func (gr *GormRepository) getResourcesFromAllTables(queryBuilder *gorm.DB, queryParam models.OriginBase) ([]models.ResourceBase, error) { + wrappedResourceModels := []models.ResourceBase{} + resourceTypes := databaseModel.GetAllowedResourceTypes() + for _, resourceType := range resourceTypes { + tableName, err := databaseModel.GetTableNameByResourceType(resourceType) + if err != nil { + return nil, err + } + var tempWrappedResourceModels []models.ResourceBase + results := queryBuilder. + Where(queryParam). + Table(tableName). + Find(&tempWrappedResourceModels) + if results.Error != nil { + return nil, results.Error + } + wrappedResourceModels = append(wrappedResourceModels, tempWrappedResourceModels...) + } + return wrappedResourceModels, nil +} diff --git a/backend/pkg/database/sqlite_repository_graph.go b/backend/pkg/database/gorm_repository_graph.go similarity index 91% rename from backend/pkg/database/sqlite_repository_graph.go rename to backend/pkg/database/gorm_repository_graph.go index befe30ac..1ee372d3 100644 --- a/backend/pkg/database/sqlite_repository_graph.go +++ b/backend/pkg/database/gorm_repository_graph.go @@ -3,14 +3,15 @@ package database import ( "context" "fmt" + "log" + "strings" + "github.com/dominikbraun/graph" "github.com/fastenhealth/fasten-onprem/backend/pkg" "github.com/fastenhealth/fasten-onprem/backend/pkg/models" - "github.com/fastenhealth/fasten-onprem/backend/pkg/utils" databaseModel "github.com/fastenhealth/fasten-onprem/backend/pkg/models/database" + "github.com/fastenhealth/fasten-onprem/backend/pkg/utils" "golang.org/x/exp/slices" - "log" - "strings" ) type VertexResourcePlaceholder struct { @@ -28,8 +29,8 @@ func (rp *VertexResourcePlaceholder) ID() string { // Retrieve a list of all fhir resources (vertex), and a list of all associations (edge) // Generate a graph // return list of root nodes, and their flattened related resources. -func (sr *SqliteRepository) GetFlattenedResourceGraph(ctx context.Context, graphType pkg.ResourceGraphType, options models.ResourceGraphOptions) (map[string][]*models.ResourceBase, *models.ResourceGraphMetadata, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) +func (gr *GormRepository) GetFlattenedResourceGraph(ctx context.Context, graphType pkg.ResourceGraphType, options models.ResourceGraphOptions) (map[string][]*models.ResourceBase, *models.ResourceGraphMetadata, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, nil, currentUserErr } @@ -45,7 +46,7 @@ func (sr *SqliteRepository) GetFlattenedResourceGraph(ctx context.Context, graph var relatedResourceRelationships []models.RelatedResource // SELECT * FROM related_resources WHERE user_id = "53c1e930-63af-46c9-b760-8e83cbc1abd9"; - result := sr.GormClient.WithContext(ctx). + result := gr.GormClient.WithContext(ctx). Where(models.RelatedResource{ ResourceBaseUserID: currentUser.ID, }). @@ -59,7 +60,7 @@ func (sr *SqliteRepository) GetFlattenedResourceGraph(ctx context.Context, graph g := graph.New(resourceVertexId, graph.Directed(), graph.Acyclic(), graph.Rooted()) //// Get list of all resources TODO - REPLACED THIS - //wrappedResourceModels, err := sr.ListResources(ctx, models.ListResourceQueryOptions{}) + //wrappedResourceModels, err := gr.ListResources(ctx, models.ListResourceQueryOptions{}) //if err != nil { // return nil, err //} @@ -109,7 +110,7 @@ func (sr *SqliteRepository) GetFlattenedResourceGraph(ctx context.Context, graph } //add recriprocial relationships (depending on the graph type) - relatedResourceRelationships = sr.PopulateGraphTypeReciprocalRelationships(graphType, relatedResourceRelationships) + relatedResourceRelationships = gr.PopulateGraphTypeReciprocalRelationships(graphType, relatedResourceRelationships) //add edges to graph for _, relationship := range relatedResourceRelationships { @@ -121,7 +122,7 @@ func (sr *SqliteRepository) GetFlattenedResourceGraph(ctx context.Context, graph if err != nil { //this may occur because vertices may not exist - sr.Logger.Warnf("ignoring, an error occurred while adding edge: %v", err) + gr.Logger.Warnf("ignoring, an error occurred while adding edge: %v", err) } } @@ -200,7 +201,7 @@ func (sr *SqliteRepository) GetFlattenedResourceGraph(ctx context.Context, graph // Step 2: now that we've created a relationship graph using placeholders, we need to determine which page of resources to return // and look up the actual resources from the database. - resourceListDictionary, totalElements, err := sr.InflateResourceGraphAtPage(resourcePlaceholderListDictionary, options.Page) + resourceListDictionary, totalElements, err := gr.InflateResourceGraphAtPage(resourcePlaceholderListDictionary, options.Page) if err != nil { return nil, nil, fmt.Errorf("error while paginating & inflating resource graph: %v", err) } @@ -215,7 +216,7 @@ func (sr *SqliteRepository) GetFlattenedResourceGraph(ctx context.Context, graph SourceID: resource.SourceID.String(), UserID: resource.UserID.String(), }) - sr.Logger.Debugf("populating resourcePlaceholder: %s", vertexId) + gr.Logger.Debugf("populating resourcePlaceholder: %s", vertexId) resource.RelatedResource = []*models.ResourceBase{} @@ -226,9 +227,9 @@ func (sr *SqliteRepository) GetFlattenedResourceGraph(ctx context.Context, graph //skip the current resourcePlaceholder if it's referenced in this list. //also skip the current resourcePlaceholder if its a Binary resourcePlaceholder (which is a special case) if vertexId != resourceVertexId(relatedResourcePlaceholder) && relatedResourcePlaceholder.ResourceType != "Binary" { - relatedResource, err := sr.GetResourceByResourceTypeAndId(ctx, relatedResourcePlaceholder.ResourceType, relatedResourcePlaceholder.ResourceID) + relatedResource, err := gr.GetResourceByResourceTypeAndId(ctx, relatedResourcePlaceholder.ResourceType, relatedResourcePlaceholder.ResourceID) if err != nil { - sr.Logger.Warnf("ignoring, cannot safely handle error which occurred while getting related resource: %v", err) + gr.Logger.Warnf("ignoring, cannot safely handle error which occurred while getting related resource: %v", err) return true } resource.RelatedResource = append( @@ -268,9 +269,9 @@ func (sr *SqliteRepository) GetFlattenedResourceGraph(ctx context.Context, graph vertexId := resourceKeysVertexId(currentResource.SourceID.String(), currentResource.SourceResourceType, currentResource.SourceResourceID) for relatedVertexId, _ := range adjacencyMap[vertexId] { relatedResourcePlaceholder, _ := g.Vertex(relatedVertexId) - relatedResourceFhir, err := sr.GetResourceByResourceTypeAndId(ctx, relatedResourcePlaceholder.ResourceType, relatedResourcePlaceholder.ResourceID) + relatedResourceFhir, err := gr.GetResourceByResourceTypeAndId(ctx, relatedResourcePlaceholder.ResourceType, relatedResourcePlaceholder.ResourceID) if err != nil { - sr.Logger.Warnf("ignoring, cannot safely handle error which occurred while getting related resource (flatten=false): %v", err) + gr.Logger.Warnf("ignoring, cannot safely handle error which occurred while getting related resource (flatten=false): %v", err) continue } flattenRelatedResourcesFn(relatedResourceFhir) @@ -295,7 +296,7 @@ func (sr *SqliteRepository) GetFlattenedResourceGraph(ctx context.Context, graph // - sort the root resources by date, desc // - use the page number + page size to determine which root resources to return // - return a dictionary of "source" resource lists -func (sr *SqliteRepository) InflateResourceGraphAtPage(resourcePlaceholderListDictionary map[string][]*VertexResourcePlaceholder, page int) (map[string][]*models.ResourceBase, int, error) { +func (gr *GormRepository) InflateResourceGraphAtPage(resourcePlaceholderListDictionary map[string][]*VertexResourcePlaceholder, page int) (map[string][]*models.ResourceBase, int, error) { totalElements := 0 // Step 3a: since we cant calulate the sort order until the resources are loaded, we need to load all the root resources first. @@ -319,7 +320,7 @@ func (sr *SqliteRepository) InflateResourceGraphAtPage(resourcePlaceholderListDi return nil, totalElements, err } var tableWrappedResourceModels []models.ResourceBase - sr.GormClient. + gr.GormClient. Where("(user_id, source_id, source_resource_type, source_resource_id) IN ?", selectList). Table(tableName). Find(&tableWrappedResourceModels) @@ -351,15 +352,15 @@ func (sr *SqliteRepository) InflateResourceGraphAtPage(resourcePlaceholderListDi return resourceListDictionary, totalElements, nil } -//We need to support the following types of graphs: +// We need to support the following types of graphs: // - Medical History // - AddressBook (contacts) // - Medications // - Billing Report -//edges are always "strongly connected", however "source" nodes (roots, like Condition or Encounter -- depending on ) are only one way. -//add an edge from every resource to its related resource. Keep in mind that FHIR resources may not contain reciprocal edges, so we ensure the graph is rooted by flipping any -//related resources that are "Condition" or "Encounter" -func (sr *SqliteRepository) PopulateGraphTypeReciprocalRelationships(graphType pkg.ResourceGraphType, relationships []models.RelatedResource) []models.RelatedResource { +// edges are always "strongly connected", however "source" nodes (roots, like Condition or Encounter -- depending on ) are only one way. +// add an edge from every resource to its related resource. Keep in mind that FHIR resources may not contain reciprocal edges, so we ensure the graph is rooted by flipping any +// related resources that are "Condition" or "Encounter" +func (gr *GormRepository) PopulateGraphTypeReciprocalRelationships(graphType pkg.ResourceGraphType, relationships []models.RelatedResource) []models.RelatedResource { reciprocalRelationships := []models.RelatedResource{} //prioritized lists of sources and sinks for the graph. We will use these to determine which resources are "root" nodes. @@ -469,7 +470,7 @@ func getSourcesAndSinksForGraphType(graphType pkg.ResourceGraphType) ([][]string return sources, sinks, sourceFlattenRelated } -//source resource types are resources that are at the root of the graph, nothing may reference them directly +// source resource types are resources that are at the root of the graph, nothing may reference them directly // loop though the list of source resource types, and see if the checkResourceType is one of them func foundResourceGraphSource(checkResourceType string, sourceResourceTypes [][]string) int { found := -1 @@ -482,7 +483,7 @@ func foundResourceGraphSource(checkResourceType string, sourceResourceTypes [][] return found } -//sink resource types are the leaves of the graph, they must not reference anything else. (only be referenced) +// sink resource types are the leaves of the graph, they must not reference anything else. (only be referenced) func foundResourceGraphSink(checkResourceType string, sinkResourceTypes [][]string) int { found := -1 for i, sinkResourceType := range sinkResourceTypes { diff --git a/backend/pkg/database/sqlite_repository_query.go b/backend/pkg/database/gorm_repository_query.go similarity index 98% rename from backend/pkg/database/sqlite_repository_query.go rename to backend/pkg/database/gorm_repository_query.go index e3a6e234..caa846e9 100644 --- a/backend/pkg/database/sqlite_repository_query.go +++ b/backend/pkg/database/gorm_repository_query.go @@ -3,6 +3,10 @@ package database import ( "context" "fmt" + "strconv" + "strings" + "time" + "github.com/fastenhealth/fasten-onprem/backend/pkg/models" databaseModel "github.com/fastenhealth/fasten-onprem/backend/pkg/models/database" "github.com/iancoleman/strcase" @@ -10,9 +14,6 @@ import ( "golang.org/x/exp/maps" "golang.org/x/exp/slices" "gorm.io/gorm" - "strconv" - "strings" - "time" ) type SearchParameterType string @@ -52,9 +53,9 @@ const TABLE_ALIAS = "fhir" // ) // AND (user_id = "6efcd7c5-3f29-4f0d-926d-a66ff68bbfc2") // GROUP BY `fhir`.`id` -func (sr *SqliteRepository) QueryResources(ctx context.Context, query models.QueryResource) (interface{}, error) { +func (gr *GormRepository) QueryResources(ctx context.Context, query models.QueryResource) (interface{}, error) { - sqlQuery, err := sr.sqlQueryResources(ctx, query) + sqlQuery, err := gr.sqlQueryResources(ctx, query) if err != nil { return nil, err } @@ -74,7 +75,7 @@ func (sr *SqliteRepository) QueryResources(ctx context.Context, query models.Que // see QueryResources // this function has all the logic, but should only be called directly for testing -func (sr *SqliteRepository) sqlQueryResources(ctx context.Context, query models.QueryResource) (*gorm.DB, error) { +func (gr *GormRepository) sqlQueryResources(ctx context.Context, query models.QueryResource) (*gorm.DB, error) { //todo, until we actually parse the select statement, we will just return all resources based on "from" //SECURITY: this is required to ensure that only valid resource types are queried (since it's controlled by the user) @@ -134,7 +135,7 @@ func (sr *SqliteRepository) sqlQueryResources(ctx context.Context, query models. } //SECURITY: for safety, we will always add/override the current user_id to the where clause. This is to ensure that the user doesnt attempt to override this value in their own where clause - currentUser, currentUserErr := sr.GetCurrentUser(ctx) + currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, currentUserErr } @@ -234,7 +235,7 @@ func (sr *SqliteRepository) sqlQueryResources(ctx context.Context, query models. fromClauses = lo.Uniq(fromClauses) fromClauses = lo.Compact(fromClauses) - sqlQuery := sr.GormClient.WithContext(ctx). + sqlQuery := gr.GormClient.WithContext(ctx). Select(strings.Join(selectClauses, ", ")). Where(strings.Join(whereClauses, " AND "), whereNamedParameters). Group(groupClause). @@ -253,7 +254,7 @@ func (sr *SqliteRepository) sqlQueryResources(ctx context.Context, query models. } /// INTERNAL functionality. These functions are exported for testing, but are not available in the Interface -//TODO: dont export these, instead use casting to convert the interface to the SqliteRepository struct, then call ehese functions directly +//TODO: dont export these, instead use casting to convert the interface to the GormRepository struct, then call ehese functions directly type SearchParameter struct { Name string diff --git a/backend/pkg/database/sqlite_repository_query_sql_test.go b/backend/pkg/database/gorm_repository_query_sql_test.go similarity index 96% rename from backend/pkg/database/sqlite_repository_query_sql_test.go rename to backend/pkg/database/gorm_repository_query_sql_test.go index c872f97d..85ae9dac 100644 --- a/backend/pkg/database/sqlite_repository_query_sql_test.go +++ b/backend/pkg/database/gorm_repository_query_sql_test.go @@ -3,6 +3,12 @@ package database import ( "context" "fmt" + "io/ioutil" + "log" + "os" + "strings" + "testing" + "github.com/fastenhealth/fasten-onprem/backend/pkg" mock_config "github.com/fastenhealth/fasten-onprem/backend/pkg/config/mock" "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" @@ -12,11 +18,6 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "gorm.io/gorm" - "io/ioutil" - "log" - "os" - "strings" - "testing" ) // Define the suite, and absorb the built-in basic suite @@ -42,6 +43,7 @@ func (suite *RepositorySqlTestSuite) BeforeTest(suiteName, testName string) { fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -71,7 +73,7 @@ func TestRepositorySqlTestSuite(t *testing.T) { func (suite *RepositorySqlTestSuite) TestQueryResources_SQL() { //setup - sqliteRepo := suite.TestRepository.(*SqliteRepository) + sqliteRepo := suite.TestRepository.(*GormRepository) sqliteRepo.GormClient = sqliteRepo.GormClient.Session(&gorm.Session{DryRun: true}) //test @@ -108,7 +110,7 @@ func (suite *RepositorySqlTestSuite) TestQueryResources_SQL() { func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithMultipleWhereConditions() { //setup - sqliteRepo := suite.TestRepository.(*SqliteRepository) + sqliteRepo := suite.TestRepository.(*GormRepository) sqliteRepo.GormClient = sqliteRepo.GormClient.Session(&gorm.Session{DryRun: true}) //test @@ -146,7 +148,7 @@ func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithMultipleWhereCon func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithPrimitiveOrderByAggregation() { //setup - sqliteRepo := suite.TestRepository.(*SqliteRepository) + sqliteRepo := suite.TestRepository.(*GormRepository) sqliteRepo.GormClient = sqliteRepo.GormClient.Session(&gorm.Session{DryRun: true}) //test @@ -183,7 +185,7 @@ func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithPrimitiveOrderBy func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithKeywordOrderByAggregation() { //setup - sqliteRepo := suite.TestRepository.(*SqliteRepository) + sqliteRepo := suite.TestRepository.(*GormRepository) sqliteRepo.GormClient = sqliteRepo.GormClient.Session(&gorm.Session{DryRun: true}) //test @@ -218,7 +220,7 @@ func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithKeywordOrderByAg func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithComplexOrderByAggregation() { //setup - sqliteRepo := suite.TestRepository.(*SqliteRepository) + sqliteRepo := suite.TestRepository.(*GormRepository) sqliteRepo.GormClient = sqliteRepo.GormClient.Session(&gorm.Session{DryRun: true}) //test @@ -255,7 +257,7 @@ func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithComplexOrderByAg func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithPrimitiveCountByAggregation() { //setup - sqliteRepo := suite.TestRepository.(*SqliteRepository) + sqliteRepo := suite.TestRepository.(*GormRepository) sqliteRepo.GormClient = sqliteRepo.GormClient.Session(&gorm.Session{DryRun: true}) //test @@ -292,7 +294,7 @@ func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithPrimitiveCountBy func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithKeywordCountByAggregation() { //setup - sqliteRepo := suite.TestRepository.(*SqliteRepository) + sqliteRepo := suite.TestRepository.(*GormRepository) sqliteRepo.GormClient = sqliteRepo.GormClient.Session(&gorm.Session{DryRun: true}) //test @@ -329,7 +331,7 @@ func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithKeywordCountByAg func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithWildcardCountByAggregation() { //setup - sqliteRepo := suite.TestRepository.(*SqliteRepository) + sqliteRepo := suite.TestRepository.(*GormRepository) sqliteRepo.GormClient = sqliteRepo.GormClient.Session(&gorm.Session{DryRun: true}) //test @@ -364,7 +366,7 @@ func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithWildcardCountByA func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithComplexCountByAggregation() { //setup - sqliteRepo := suite.TestRepository.(*SqliteRepository) + sqliteRepo := suite.TestRepository.(*GormRepository) sqliteRepo.GormClient = sqliteRepo.GormClient.Session(&gorm.Session{DryRun: true}) //test @@ -401,7 +403,7 @@ func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithComplexCountByAg func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithComplexGroupByWithOrderByMaxFnAggregation() { //setup - sqliteRepo := suite.TestRepository.(*SqliteRepository) + sqliteRepo := suite.TestRepository.(*GormRepository) sqliteRepo.GormClient = sqliteRepo.GormClient.Session(&gorm.Session{DryRun: true}) //test @@ -441,7 +443,7 @@ func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithComplexGroupByWi func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithTokenGroupByNoModifier() { //setup - sqliteRepo := suite.TestRepository.(*SqliteRepository) + sqliteRepo := suite.TestRepository.(*GormRepository) sqliteRepo.GormClient = sqliteRepo.GormClient.Session(&gorm.Session{DryRun: true}) //test @@ -478,7 +480,7 @@ func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithTokenGroupByNoMo func (suite *RepositorySqlTestSuite) TestQueryResources_SQL_WithTokenGroupByNoModifierWithLimit() { //setup - sqliteRepo := suite.TestRepository.(*SqliteRepository) + sqliteRepo := suite.TestRepository.(*GormRepository) sqliteRepo.GormClient = sqliteRepo.GormClient.Session(&gorm.Session{DryRun: true}) //test diff --git a/backend/pkg/database/sqlite_repository_query_test.go b/backend/pkg/database/gorm_repository_query_test.go similarity index 99% rename from backend/pkg/database/sqlite_repository_query_test.go rename to backend/pkg/database/gorm_repository_query_test.go index b6a9168c..1495606b 100644 --- a/backend/pkg/database/sqlite_repository_query_test.go +++ b/backend/pkg/database/gorm_repository_query_test.go @@ -2,6 +2,10 @@ package database import ( "context" + "strings" + "testing" + "time" + "github.com/fastenhealth/fasten-onprem/backend/pkg" mock_config "github.com/fastenhealth/fasten-onprem/backend/pkg/config/mock" "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" @@ -9,9 +13,6 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "gorm.io/gorm" - "strings" - "testing" - "time" ) // mimic tests from https://hl7.org/fhir/r4/search.html#token @@ -259,6 +260,7 @@ func (suite *RepositoryTestSuite) TestQueryResources_SQL() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -270,7 +272,7 @@ func (suite *RepositoryTestSuite) TestQueryResources_SQL() { err = dbRepo.CreateUser(context.Background(), userModel) require.NoError(suite.T(), err) - sqliteRepo := dbRepo.(*SqliteRepository) + sqliteRepo := dbRepo.(*GormRepository) sqliteRepo.GormClient = sqliteRepo.GormClient.Session(&gorm.Session{DryRun: true}) //test diff --git a/backend/pkg/database/sqlite_repository_settings.go b/backend/pkg/database/gorm_repository_settings.go similarity index 79% rename from backend/pkg/database/sqlite_repository_settings.go rename to backend/pkg/database/gorm_repository_settings.go index a33cadc3..e9f3eb8a 100644 --- a/backend/pkg/database/sqlite_repository_settings.go +++ b/backend/pkg/database/gorm_repository_settings.go @@ -3,19 +3,20 @@ package database import ( "context" "fmt" + "github.com/fastenhealth/fasten-onprem/backend/pkg/models" "github.com/google/uuid" ) // LoadSettings will retrieve settings from the database, store them in the AppConfig object, and return a Settings struct -func (sr *SqliteRepository) LoadUserSettings(ctx context.Context) (*models.UserSettings, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) +func (gr *GormRepository) LoadUserSettings(ctx context.Context) (*models.UserSettings, error) { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, currentUserErr } settingsEntries := []models.UserSettingEntry{} - if err := sr.GormClient. + if err := gr.GormClient. WithContext(ctx). Where(models.UserSettingEntry{ UserID: currentUser.ID, @@ -38,8 +39,8 @@ func (sr *SqliteRepository) LoadUserSettings(ctx context.Context) (*models.UserS // testing // curl -d '{"metrics": { "notify_level": 5, "status_filter_attributes": 5, "status_threshold": 5 }}' -H "Content-Type: application/json" -X POST http://localhost:9090/api/settings // SaveSettings will update settings in AppConfig object, then save the settings to the database. -func (sr *SqliteRepository) SaveUserSettings(ctx context.Context, newSettings *models.UserSettings) error { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) +func (gr *GormRepository) SaveUserSettings(ctx context.Context, newSettings *models.UserSettings) error { + currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return currentUserErr } @@ -47,7 +48,7 @@ func (sr *SqliteRepository) SaveUserSettings(ctx context.Context, newSettings *m //retrieve current settings from the database currentSettingsEntries := []models.UserSettingEntry{} - if err := sr.GormClient. + if err := gr.GormClient. WithContext(ctx). Where(models.UserSettingEntry{ UserID: currentUser.ID, @@ -66,8 +67,8 @@ func (sr *SqliteRepository) SaveUserSettings(ctx context.Context, newSettings *m for ndx, settingsEntry := range newSettingsEntries { // store in database. - //TODO: this should be `sr.gormClient.Updates(&settingsEntries).Error` - err := sr.GormClient. + //TODO: this should be `gr.gormClient.Updates(&settingsEntries).Error` + err := gr.GormClient. WithContext(ctx). Model(&models.UserSettingEntry{}). Where([]uuid.UUID{settingsEntry.ID}). @@ -80,7 +81,7 @@ func (sr *SqliteRepository) SaveUserSettings(ctx context.Context, newSettings *m return nil } -func (sr *SqliteRepository) PopulateDefaultUserSettings(ctx context.Context, userId uuid.UUID) error { +func (gr *GormRepository) PopulateDefaultUserSettings(ctx context.Context, userId uuid.UUID) error { //retrieve current settings from the database settingsEntries := []models.UserSettingEntry{} @@ -92,6 +93,6 @@ func (sr *SqliteRepository) PopulateDefaultUserSettings(ctx context.Context, use SettingValueArray: []string{}, }) - return sr.GormClient.WithContext(ctx).Create(settingsEntries).Error + return gr.GormClient.WithContext(ctx).Create(settingsEntries).Error } diff --git a/backend/pkg/database/sqlite_repository_settings_test.go b/backend/pkg/database/gorm_repository_settings_test.go similarity index 100% rename from backend/pkg/database/sqlite_repository_settings_test.go rename to backend/pkg/database/gorm_repository_settings_test.go diff --git a/backend/pkg/database/sqlite_repository_test.go b/backend/pkg/database/gorm_repository_test.go similarity index 96% rename from backend/pkg/database/sqlite_repository_test.go rename to backend/pkg/database/gorm_repository_test.go index 466d8495..c67ffeff 100644 --- a/backend/pkg/database/sqlite_repository_test.go +++ b/backend/pkg/database/gorm_repository_test.go @@ -3,6 +3,13 @@ package database import ( "encoding/json" "fmt" + "io/ioutil" + "log" + "net/http/httptest" + "os" + "testing" + "time" + "github.com/fastenhealth/fasten-onprem/backend/pkg" mock_config "github.com/fastenhealth/fasten-onprem/backend/pkg/config/mock" "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" @@ -18,18 +25,12 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "golang.org/x/net/context" - "io/ioutil" - "log" - "net/http/httptest" - "os" - "testing" - "time" ) func TestSourceCredentialInterface(t *testing.T) { t.Parallel() - repo := new(SqliteRepository) + repo := new(GormRepository) //assert require.Implements(t, (*sourceModels.DatabaseRepository)(nil), repo, "should implement the DatabaseRepository interface from fasten-sources") @@ -74,6 +75,7 @@ func (suite *RepositoryTestSuite) TestNewRepository() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() //test @@ -87,6 +89,7 @@ func (suite *RepositoryTestSuite) TestCreateUser() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -107,6 +110,7 @@ func (suite *RepositoryTestSuite) TestCreateUser_WithExitingUser_ShouldFail() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -135,6 +139,7 @@ func (suite *RepositoryTestSuite) TestCreateUser_WithUserProvidedId_ShouldBeRepl //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -160,6 +165,7 @@ func (suite *RepositoryTestSuite) TestGetUserByUsername() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -183,6 +189,7 @@ func (suite *RepositoryTestSuite) TestGetUserByUsername_WithInvalidUsername() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -205,6 +212,7 @@ func (suite *RepositoryTestSuite) TestGetCurrentUser_WithContextBackgroundAuthUs //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -229,6 +237,7 @@ func (suite *RepositoryTestSuite) TestGetCurrentUser_WithGinContextBackgroundAut //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -257,6 +266,7 @@ func (suite *RepositoryTestSuite) TestGetCurrentUser_WithContextBackgroundAuthUs //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -273,6 +283,7 @@ func (suite *RepositoryTestSuite) TestCreateGlossaryEntry() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -308,6 +319,7 @@ func (suite *RepositoryTestSuite) TestUpsertRawResource() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -364,6 +376,7 @@ func (suite *RepositoryTestSuite) TestUpsertRawResource_WithRelatedResourceAndDu //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -411,6 +424,7 @@ func (suite *RepositoryTestSuite) TestListResources() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -513,6 +527,7 @@ func (suite *RepositoryTestSuite) TestGetResourceByResourceTypeAndId() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -566,6 +581,7 @@ func (suite *RepositoryTestSuite) TestGetResourceBySourceId() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -619,6 +635,7 @@ func (suite *RepositoryTestSuite) TestGetPatientForSources() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -675,6 +692,7 @@ func (suite *RepositoryTestSuite) TestAddResourceAssociation() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -711,6 +729,7 @@ func (suite *RepositoryTestSuite) TestAddResourceAssociation_WithMismatchingSour //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -760,6 +779,7 @@ func (suite *RepositoryTestSuite) TestRemoveResourceAssociation() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -801,6 +821,7 @@ func (suite *RepositoryTestSuite) TestGetSourceSummary() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -881,6 +902,7 @@ func (suite *RepositoryTestSuite) TestGetSummary() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -984,6 +1006,7 @@ func (suite *RepositoryTestSuite) TestAddResourceComposition() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -1069,6 +1092,7 @@ func (suite *RepositoryTestSuite) TestAddResourceComposition_WithExistingComposi //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -1229,6 +1253,7 @@ func (suite *RepositoryTestSuite) TestCreateBackgroundJob_Sync() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -1263,6 +1288,7 @@ func (suite *RepositoryTestSuite) TestListBackgroundJobs() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) @@ -1339,6 +1365,7 @@ func (suite *RepositoryTestSuite) TestUpdateBackgroundJob() { //setup fakeConfig := mock_config.NewMockInterface(suite.MockCtrl) fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + fakeConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer()) require.NoError(suite.T(), err) diff --git a/backend/pkg/database/postgres_repository.go b/backend/pkg/database/postgres_repository.go new file mode 100644 index 00000000..e440850e --- /dev/null +++ b/backend/pkg/database/postgres_repository.go @@ -0,0 +1,75 @@ +package database + +import ( + "fmt" + "strings" + + "github.com/fastenhealth/fasten-onprem/backend/pkg/config" + "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" + "github.com/fastenhealth/fasten-onprem/backend/pkg/models" + databaseModel "github.com/fastenhealth/fasten-onprem/backend/pkg/models/database" + "github.com/sirupsen/logrus" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +func newPostgresRepository(appConfig config.Interface, globalLogger logrus.FieldLogger, eventBus event_bus.Interface) (DatabaseRepository, error) { + //backgroundContext := context.Background() + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Gorm/PostgreSQL setup + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + globalLogger.Infof("Trying to connect to postgres db: %s\n", appConfig.GetString("database.location")) + dsn := appConfig.GetString("database.location") + + database, err := gorm.Open(postgres.Open(dsn), &gorm.Config{ + //TODO: figure out how to log database queries again. + //logger: logger + DisableForeignKeyConstraintWhenMigrating: true, + }) + + if strings.ToUpper(appConfig.GetString("log.level")) == "DEBUG" { + database = database.Debug() //set debug globally + } + + if err != nil { + return nil, fmt.Errorf("Failed to connect to database! - %v", err) + } + globalLogger.Infof("Successfully connected to fasten postgres db: %s\n", dsn) + + fastenRepo := GormRepository{ + AppConfig: appConfig, + Logger: globalLogger, + GormClient: database, + EventBus: eventBus, + } + + //TODO: automigrate for now, this should be replaced with a migration tool once the DB has stabilized. + err = fastenRepo.Migrate() + if err != nil { + return nil, err + } + + //automigrate Fhir Resource Tables + err = databaseModel.Migrate(fastenRepo.GormClient) + if err != nil { + return nil, err + } + + // create/update admin user + //TODO: determine if this admin user is ncessary + //SECURITY: validate this user is necessary + adminUser := models.User{} + err = database.FirstOrCreate(&adminUser, models.User{Username: "admin"}).Error + if err != nil { + return nil, fmt.Errorf("Failed to create admin user! - %v", err) + } + + //fail any Locked jobs. This is necessary because the job may have been locked by a process that was killed. + err = fastenRepo.CancelAllLockedBackgroundJobsAndFail() + if err != nil { + return nil, err + } + + return &fastenRepo, nil +} diff --git a/backend/pkg/database/sqlite_repository.go b/backend/pkg/database/sqlite_repository.go index e04aafaf..45949bc7 100644 --- a/backend/pkg/database/sqlite_repository.go +++ b/backend/pkg/database/sqlite_repository.go @@ -1,29 +1,19 @@ package database import ( - "context" - "encoding/json" - "errors" "fmt" - "github.com/fastenhealth/fasten-onprem/backend/pkg" + "strings" + "github.com/fastenhealth/fasten-onprem/backend/pkg/config" "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" "github.com/fastenhealth/fasten-onprem/backend/pkg/models" databaseModel "github.com/fastenhealth/fasten-onprem/backend/pkg/models/database" - "github.com/fastenhealth/fasten-onprem/backend/pkg/utils" - sourceModel "github.com/fastenhealth/fasten-sources/clients/models" - "github.com/gin-gonic/gin" "github.com/glebarez/sqlite" - "github.com/google/uuid" "github.com/sirupsen/logrus" - "gorm.io/datatypes" "gorm.io/gorm" - "net/url" - "strings" - "time" ) -func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger, eventBus event_bus.Interface) (DatabaseRepository, error) { +func newSqliteRepository(appConfig config.Interface, globalLogger logrus.FieldLogger, eventBus event_bus.Interface) (DatabaseRepository, error) { //backgroundContext := context.Background() //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -78,7 +68,7 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger, // globalLogger.Infof("Journal mode: %v", journalMode) //} - fastenRepo := SqliteRepository{ + fastenRepo := GormRepository{ AppConfig: appConfig, Logger: globalLogger, GormClient: database, @@ -114,1152 +104,3 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger, return &fastenRepo, nil } - -type SqliteRepository struct { - AppConfig config.Interface - Logger logrus.FieldLogger - - GormClient *gorm.DB - - EventBus event_bus.Interface -} - -func (sr *SqliteRepository) Migrate() error { - err := sr.GormClient.AutoMigrate( - &models.User{}, - &models.SourceCredential{}, - &models.BackgroundJob{}, - &models.Glossary{}, - &models.UserSettingEntry{}, - ) - if err != nil { - return fmt.Errorf("Failed to automigrate! - %v", err) - } - return nil -} - -func (sr *SqliteRepository) Close() error { - return nil -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// User -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -func (sr *SqliteRepository) CreateUser(ctx context.Context, user *models.User) error { - if err := user.HashPassword(user.Password); err != nil { - return err - } - record := sr.GormClient.Create(user) - if record.Error != nil { - return record.Error - } - - //create user settings - err := sr.PopulateDefaultUserSettings(ctx, user.ID) - if err != nil { - return err - } - return nil -} -func (sr *SqliteRepository) GetUserByUsername(ctx context.Context, username string) (*models.User, error) { - var foundUser models.User - result := sr.GormClient.WithContext(ctx).Where(models.User{Username: username}).First(&foundUser) - return &foundUser, result.Error -} - -// TODO: check for error, right now we return a nil which may cause a panic. -// TODO: can we cache the current user? //SECURITY: -func (sr *SqliteRepository) GetCurrentUser(ctx context.Context) (*models.User, error) { - username := ctx.Value(pkg.ContextKeyTypeAuthUsername) - if username == nil { - ginCtx, ginCtxOk := ctx.(*gin.Context) - if !ginCtxOk { - return nil, fmt.Errorf("could not convert context to gin context") - } - var exists bool - username, exists = ginCtx.Get(pkg.ContextKeyTypeAuthUsername) - if !exists { - return nil, fmt.Errorf("could not extract username from context") - } - } - - var currentUser models.User - usernameStr, usernameStrOk := username.(string) - if !usernameStrOk { - return nil, fmt.Errorf("could not convert username to string: %v", username) - } - - result := sr.GormClient. - WithContext(ctx). - First(¤tUser, map[string]interface{}{"username": usernameStr}) - - if result.Error != nil { - return nil, fmt.Errorf("could not retrieve current user: %v", result.Error) - } - - return ¤tUser, nil -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Glossary -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -func (sr *SqliteRepository) CreateGlossaryEntry(ctx context.Context, glossaryEntry *models.Glossary) error { - record := sr.GormClient.WithContext(ctx).Create(glossaryEntry) - if record.Error != nil { - return record.Error - } - return nil -} - -func (sr *SqliteRepository) GetGlossaryEntry(ctx context.Context, code string, codeSystem string) (*models.Glossary, error) { - var foundGlossaryEntry models.Glossary - result := sr.GormClient.WithContext(ctx). - Where(models.Glossary{Code: code, CodeSystem: codeSystem}). - First(&foundGlossaryEntry) - return &foundGlossaryEntry, result.Error -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Summary -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -func (sr *SqliteRepository) GetSummary(ctx context.Context) (*models.Summary, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return nil, currentUserErr - } - - // we want a count of all resources for this user by type - var resourceCountResults []map[string]interface{} - - resourceTypes := databaseModel.GetAllowedResourceTypes() - for _, resourceType := range resourceTypes { - tableName, err := databaseModel.GetTableNameByResourceType(resourceType) - if err != nil { - return nil, err - } - var count int64 - result := sr.GormClient.WithContext(ctx). - Table(tableName). - Where(models.OriginBase{ - UserID: currentUser.ID, - }). - Count(&count) - if result.Error != nil { - return nil, result.Error - } - if count == 0 { - continue //don't add resource counts if the count is 0 - } - resourceCountResults = append(resourceCountResults, map[string]interface{}{ - "resource_type": resourceType, - "count": count, - }) - } - - // we want a list of all sources (when they were last updated) - sources, err := sr.GetSources(ctx) - if err != nil { - return nil, err - } - - // we want the main Patient for each source - patients, err := sr.GetPatientForSources(ctx) - if err != nil { - return nil, err - } - - if resourceCountResults == nil { - resourceCountResults = []map[string]interface{}{} - } - summary := &models.Summary{ - Sources: sources, - ResourceTypeCounts: resourceCountResults, - Patients: patients, - } - - return summary, nil -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Resource -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// This function will create a new resource if it does not exist, or update an existing resource if it does exist. -// It will also create associations between fhir resources -// This function is called directly by fasten-sources -func (sr *SqliteRepository) UpsertRawResource(ctx context.Context, sourceCredential sourceModel.SourceCredential, rawResource sourceModel.RawResourceFhir) (bool, error) { - - source := sourceCredential.(*models.SourceCredential) - - //convert from a raw resource (from fasten-sources) to a ResourceFhir (which matches the database models) - wrappedResourceModel := &models.ResourceBase{ - OriginBase: models.OriginBase{ - ModelBase: models.ModelBase{}, - UserID: source.UserID, - SourceID: source.ID, - SourceResourceID: rawResource.SourceResourceID, - SourceResourceType: rawResource.SourceResourceType, - }, - SortTitle: rawResource.SortTitle, - SortDate: rawResource.SortDate, - ResourceRaw: datatypes.JSON(rawResource.ResourceRaw), - RelatedResource: nil, - } - if len(rawResource.SourceUri) > 0 { - wrappedResourceModel.SourceUri = &rawResource.SourceUri - } - - //create associations - //note: we create the association in the related_resources table **before** the model actually exists. - //note: these associations are not reciprocal, (i.e. if Procedure references Location, Location may not reference Procedure) - if rawResource.ReferencedResources != nil && len(rawResource.ReferencedResources) > 0 { - for _, referencedResource := range rawResource.ReferencedResources { - parts := strings.Split(referencedResource, "/") - if len(parts) != 2 { - continue - } - - relatedResource := &models.ResourceBase{ - OriginBase: models.OriginBase{ - SourceID: source.ID, - SourceResourceType: parts[0], - SourceResourceID: parts[1], - }, - RelatedResource: nil, - } - err := sr.AddResourceAssociation( - ctx, - source, - wrappedResourceModel.SourceResourceType, - wrappedResourceModel.SourceResourceID, - source, - relatedResource.SourceResourceType, - relatedResource.SourceResourceID, - ) - if err != nil { - return false, err - } - } - } - - return sr.UpsertResource(ctx, wrappedResourceModel) - -} - -// UpsertResource -// this method will upsert a resource, however it will not create associations. -// UPSERT operation -// - call FindOrCreate -// - check if the resource exists -// - if it does not exist, insert it -// -// - if no error during FindOrCreate && no rows affected (nothing was created) -// - update the resource using Updates operation -func (sr *SqliteRepository) UpsertResource(ctx context.Context, wrappedResourceModel *models.ResourceBase) (bool, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return false, currentUserErr - } - - wrappedResourceModel.UserID = currentUser.ID - cachedResourceRaw := wrappedResourceModel.ResourceRaw - - sr.Logger.Infof("insert/update FHIRResource (%v) %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID) - wrappedFhirResourceModel, err := databaseModel.NewFhirResourceModelByType(wrappedResourceModel.SourceResourceType) - if err != nil { - return false, err - } - - wrappedFhirResourceModel.SetOriginBase(wrappedResourceModel.OriginBase) - wrappedFhirResourceModel.SetSortTitle(wrappedResourceModel.SortTitle) - wrappedFhirResourceModel.SetSortDate(wrappedResourceModel.SortDate) - wrappedFhirResourceModel.SetSourceUri(wrappedResourceModel.SourceUri) - - //TODO: this takes too long, we need to find a way to do this processing faster or in the background async. - err = wrappedFhirResourceModel.PopulateAndExtractSearchParameters(json.RawMessage(wrappedResourceModel.ResourceRaw)) - if err != nil { - sr.Logger.Warnf("ignoring: an error occurred while extracting SearchParameters using FHIRPath (%s/%s): %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, err) - //wrappedFhirResourceModel.SetResourceRaw(wrappedResourceModel.ResourceRaw) - } - - eventSourceSync := models.NewEventSourceSync( - currentUser.ID.String(), - wrappedFhirResourceModel.GetSourceID().String(), - wrappedFhirResourceModel.GetSourceResourceType(), - wrappedFhirResourceModel.GetSourceResourceID(), - ) - - err = sr.EventBus.PublishMessage(eventSourceSync) - if err != nil { - sr.Logger.Warnf("ignoring: an error occurred while publishing event to eventBus (%s/%s): %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, err) - } - - createResult := sr.GormClient.WithContext(ctx).Where(models.OriginBase{ - SourceID: wrappedFhirResourceModel.GetSourceID(), - SourceResourceID: wrappedFhirResourceModel.GetSourceResourceID(), - SourceResourceType: wrappedFhirResourceModel.GetSourceResourceType(), //TODO: and UpdatedAt > old UpdatedAt - }).Omit("RelatedResource.*").FirstOrCreate(wrappedFhirResourceModel) - - if createResult.Error != nil { - return false, createResult.Error - } else if createResult.RowsAffected == 0 { - //at this point, wrappedResourceModel contains the data found in the database. - // check if the database resource matches the new resource. - if wrappedResourceModel.ResourceRaw.String() != string(cachedResourceRaw) { - updateResult := createResult.Omit("RelatedResource.*").Updates(wrappedResourceModel) - return updateResult.RowsAffected > 0, updateResult.Error - } else { - return false, nil - } - - } else { - //resource was created - return createResult.RowsAffected > 0, createResult.Error - } -} - -func (sr *SqliteRepository) ListResources(ctx context.Context, queryOptions models.ListResourceQueryOptions) ([]models.ResourceBase, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return nil, currentUserErr - } - - queryParam := models.OriginBase{ - UserID: currentUser.ID, - } - - if len(queryOptions.SourceResourceType) > 0 { - queryParam.SourceResourceType = queryOptions.SourceResourceType - } - - if len(queryOptions.SourceID) > 0 { - sourceUUID, err := uuid.Parse(queryOptions.SourceID) - if err != nil { - return nil, err - } - - queryParam.SourceID = sourceUUID - } - if len(queryOptions.SourceResourceID) > 0 { - queryParam.SourceResourceID = queryOptions.SourceResourceID - } - - manifestJson, _ := json.MarshalIndent(queryParam, "", " ") - sr.Logger.Debugf("THE QUERY OBJECT===========> %v", string(manifestJson)) - - var wrappedResourceModels []models.ResourceBase - queryBuilder := sr.GormClient.WithContext(ctx) - if len(queryOptions.SourceResourceType) > 0 { - tableName, err := databaseModel.GetTableNameByResourceType(queryOptions.SourceResourceType) - if err != nil { - return nil, err - } - queryBuilder = queryBuilder. - Where(queryParam). - Table(tableName) - - if queryOptions.Limit > 0 { - queryBuilder = queryBuilder.Limit(queryOptions.Limit).Offset(queryOptions.Offset) - } - return wrappedResourceModels, queryBuilder.Find(&wrappedResourceModels).Error - } else { - if queryOptions.Limit > 0 { - queryBuilder = queryBuilder.Limit(queryOptions.Limit).Offset(queryOptions.Offset) - } - //there is no FHIR Resource name specified, so we're querying across all FHIR resources - return sr.getResourcesFromAllTables(queryBuilder, queryParam) - } -} - -// TODO: should this be deprecated? (replaced by ListResources) -func (sr *SqliteRepository) GetResourceByResourceTypeAndId(ctx context.Context, sourceResourceType string, sourceResourceId string) (*models.ResourceBase, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return nil, currentUserErr - } - - tableName, err := databaseModel.GetTableNameByResourceType(sourceResourceType) - if err != nil { - return nil, err - } - - queryParam := models.OriginBase{ - UserID: currentUser.ID, - SourceResourceType: sourceResourceType, - SourceResourceID: sourceResourceId, - } - - var wrappedResourceModel models.ResourceBase - results := sr.GormClient.WithContext(ctx). - Where(queryParam). - Table(tableName). - First(&wrappedResourceModel) - - return &wrappedResourceModel, results.Error -} - -// we need to figure out how to get the source resource type from the source resource id, or if we're searching across every table :( -func (sr *SqliteRepository) GetResourceBySourceId(ctx context.Context, sourceId string, sourceResourceId string) (*models.ResourceBase, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return nil, currentUserErr - } - - sourceIdUUID, err := uuid.Parse(sourceId) - if err != nil { - return nil, err - } - - queryParam := models.OriginBase{ - UserID: currentUser.ID, - SourceID: sourceIdUUID, - SourceResourceID: sourceResourceId, - } - - //there is no FHIR Resource name specified, so we're querying across all FHIR resources - wrappedResourceModels, err := sr.getResourcesFromAllTables(sr.GormClient.WithContext(ctx), queryParam) - if len(wrappedResourceModels) > 0 { - return &wrappedResourceModels[0], err - } else { - return nil, fmt.Errorf("no resource found with source id %s and source resource id %s", sourceId, sourceResourceId) - } -} - -// Get the patient for each source (for the current user) -func (sr *SqliteRepository) GetPatientForSources(ctx context.Context) ([]models.ResourceBase, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return nil, currentUserErr - } - - //SELECT * FROM resource_bases WHERE user_id = "" and source_resource_type = "Patient" GROUP BY source_id - - tableName, err := databaseModel.GetTableNameByResourceType("Patient") - if err != nil { - return nil, err - } - - var wrappedResourceModels []models.ResourceBase - results := sr.GormClient.WithContext(ctx). - //Group("source_id"). //broken in Postgres. - Where(models.OriginBase{ - UserID: currentUser.ID, - SourceResourceType: "Patient", - }). - Table(tableName). - Find(&wrappedResourceModels) - - return wrappedResourceModels, results.Error -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Resource Associations -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// verifyAssociationPermission ensure that the sources are "owned" by the same user, and that the user is the current user -func (sr *SqliteRepository) verifyAssociationPermission(ctx context.Context, sourceUserID uuid.UUID, relatedSourceUserID uuid.UUID) error { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return currentUserErr - } - if sourceUserID != relatedSourceUserID { - return fmt.Errorf("user id's must match when adding associations") - } else if sourceUserID != currentUser.ID { - return fmt.Errorf("user id's must match current user") - } - - return nil -} - -func (sr *SqliteRepository) AddResourceAssociation(ctx context.Context, source *models.SourceCredential, resourceType string, resourceId string, relatedSource *models.SourceCredential, relatedResourceType string, relatedResourceId string) error { - //ensure that the sources are "owned" by the same user - err := sr.verifyAssociationPermission(ctx, source.UserID, relatedSource.UserID) - if err != nil { - return err - } - - err = sr.GormClient.WithContext(ctx).Table("related_resources").Create(map[string]interface{}{ - "resource_base_user_id": source.UserID, - "resource_base_source_id": source.ID, - "resource_base_source_resource_type": resourceType, - "resource_base_source_resource_id": resourceId, - "related_resource_user_id": relatedSource.UserID, - "related_resource_source_id": relatedSource.ID, - "related_resource_source_resource_type": relatedResourceType, - "related_resource_source_resource_id": relatedResourceId, - }).Error - uniqueConstraintError := errors.New("constraint failed: UNIQUE constraint failed") - if err != nil { - if strings.HasPrefix(err.Error(), uniqueConstraintError.Error()) { - sr.Logger.Warnf("Ignoring an error when creating a related_resource association for %s/%s: %v", resourceType, resourceId, err) - //we can safely ignore this error - return nil - } - } - return err -} - -func (sr *SqliteRepository) RemoveResourceAssociation(ctx context.Context, source *models.SourceCredential, resourceType string, resourceId string, relatedSource *models.SourceCredential, relatedResourceType string, relatedResourceId string) error { - //ensure that the sources are "owned" by the same user - err := sr.verifyAssociationPermission(ctx, source.UserID, relatedSource.UserID) - if err != nil { - return err - } - - //manually delete association - results := sr.GormClient.WithContext(ctx). - //Table("related_resources"). - Delete(&models.RelatedResource{}, map[string]interface{}{ - "resource_base_user_id": source.UserID, - "resource_base_source_id": source.ID, - "resource_base_source_resource_type": resourceType, - "resource_base_source_resource_id": resourceId, - "related_resource_user_id": relatedSource.UserID, - "related_resource_source_id": relatedSource.ID, - "related_resource_source_resource_type": relatedResourceType, - "related_resource_source_resource_id": relatedResourceId, - }) - - if results.Error != nil { - return results.Error - } else if results.RowsAffected == 0 { - return fmt.Errorf("no association found for %s/%s and %s/%s", resourceType, resourceId, relatedResourceType, relatedResourceId) - } - return nil -} - -func (sr *SqliteRepository) FindResourceAssociationsByTypeAndId(ctx context.Context, source *models.SourceCredential, resourceType string, resourceId string) ([]models.RelatedResource, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return nil, currentUserErr - } - - if source.UserID != currentUser.ID { - return nil, fmt.Errorf("source credential must match the current user id") - } - - // SELECT * FROM related_resources WHERE user_id = "53c1e930-63af-46c9-b760-8e83cbc1abd9"; - var relatedResources []models.RelatedResource - result := sr.GormClient.WithContext(ctx). - Where(models.RelatedResource{ - ResourceBaseUserID: currentUser.ID, - ResourceBaseSourceID: source.ID, - ResourceBaseSourceResourceType: resourceType, - ResourceBaseSourceResourceID: resourceId, - }). - Find(&relatedResources) - return relatedResources, result.Error -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Resource Composition (Grouping) -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// AddResourceComposition -// this will group resources together into a "Composition" -- primarily to group related Encounters & Conditions into one semantic root. -// algorithm: -// - find source for each resource -// - (SECURITY) ensure the current user and the source for each resource matches -// - check if there is a Composition resource Type already. -// - if Composition type already exists: -// - update "relatesTo" field with additional data. -// - else: -// - Create a Composition resource type (populated with "relatesTo" references to all provided Resources) -// -// - add AddResourceAssociation for all resources linked to the Composition resource -// - store the Composition resource -// TODO: determine if we should be using a List Resource instead of a Composition resource -func (sr *SqliteRepository) AddResourceComposition(ctx context.Context, compositionTitle string, resources []*models.ResourceBase) error { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return currentUserErr - } - - //generate placeholder source - placeholderSource := models.SourceCredential{UserID: currentUser.ID, SourceType: "manual", ModelBase: models.ModelBase{ID: uuid.MustParse("00000000-0000-0000-0000-000000000000")}} - - existingCompositionResources := []*models.ResourceBase{} - rawResourceLookupTable := map[string]*models.ResourceBase{} - - //find the source for each resource we'd like to merge. (for ownership verification) - sourceLookup := map[uuid.UUID]*models.SourceCredential{} - for _, resource := range resources { - if resource.SourceResourceType == pkg.FhirResourceTypeComposition { - //skip, Composition resources don't have a valid SourceCredential - existingCompositionResources = append(existingCompositionResources, resource) - - //compositions may include existing resources, make sure we handle these - for _, related := range resource.RelatedResource { - rawResourceLookupTable[fmt.Sprintf("%s/%s", related.SourceResourceType, related.SourceResourceID)] = related - } - continue - } - - if _, sourceOk := sourceLookup[resource.SourceID]; !sourceOk { - //source has not been added yet, lets query for it. - sourceCred, err := sr.GetSource(ctx, resource.SourceID.String()) - if err != nil { - return fmt.Errorf("could not find source %s", resource.SourceID.String()) - } - sourceLookup[resource.SourceID] = sourceCred - } - - rawResourceLookupTable[fmt.Sprintf("%s/%s", resource.SourceResourceType, resource.SourceResourceID)] = resource - } - - // SECURITY: ensure the current user and the source for each resource matches - for _, source := range sourceLookup { - if source.UserID != currentUser.ID { - return fmt.Errorf("source must be owned by the current user: %s vs %s", source.UserID, currentUser.ID) - } - } - - // - check if there is a Composition resource Type already. - var compositionResource *models.ResourceBase - - if len(existingCompositionResources) > 0 { - //- if Composition type already exists in this set - // - update "relatesTo" field with additional data. - compositionResource = existingCompositionResources[0] - - //disassociate all existing remaining composition resources. - for _, existingCompositionResource := range existingCompositionResources[1:] { - for _, relatedResource := range existingCompositionResource.RelatedResource { - if err := sr.RemoveResourceAssociation( - ctx, - &placeholderSource, - existingCompositionResource.SourceResourceType, - existingCompositionResource.SourceResourceID, - sourceLookup[relatedResource.SourceID], - relatedResource.SourceResourceType, - relatedResource.SourceResourceID, - ); err != nil { - //ignoring errors, could be due to duplicate edges - return fmt.Errorf("an error occurred while removing resource association: %v", err) - } - } - - //remove this resource - compositionTable, err := databaseModel.GetTableNameByResourceType("Composition") - if err != nil { - return fmt.Errorf("an error occurred while finding Composition resource table: %v", err) - } - //TODO: we may need to delete with using the FhirComposition struct type - deleteResult := sr.GormClient.WithContext(ctx). - Table(compositionTable). - Delete(existingCompositionResource) - if deleteResult.Error != nil { - return fmt.Errorf("an error occurred while removing Composition resource(%s/%s): %v", existingCompositionResource.SourceResourceType, existingCompositionResource.SourceID, err) - } else if deleteResult.RowsAffected != 1 { - return fmt.Errorf("composition resource was not deleted %s/%s", existingCompositionResource.SourceResourceType, existingCompositionResource.SourceID) - } - } - - } else { - //- else: - // - Create a Composition resource type (populated with "relatesTo" references to all provided Resources) - compositionResource = &models.ResourceBase{ - OriginBase: models.OriginBase{ - UserID: placeholderSource.UserID, // - SourceID: placeholderSource.ID, //Empty SourceID expected ("0000-0000-0000-0000") - SourceResourceType: pkg.FhirResourceTypeComposition, - SourceResourceID: uuid.New().String(), - }, - } - } - - // - Generate an "updated" RawResource json blob - rawCompositionResource := models.ResourceComposition{ - Title: compositionTitle, - RelatesTo: []models.ResourceCompositionRelatesTo{}, - } - - for relatedResourceKey, _ := range rawResourceLookupTable { - rawCompositionResource.RelatesTo = append(rawCompositionResource.RelatesTo, models.ResourceCompositionRelatesTo{ - Target: models.ResourceCompositionRelatesToTarget{ - TargetReference: models.ResourceCompositionRelatesToTargetReference{ - Reference: relatedResourceKey, - }, - }, - }) - } - - rawResourceJson, err := json.Marshal(rawCompositionResource) - if err != nil { - return err - } - compositionResource.ResourceRaw = rawResourceJson - - compositionResource.SortTitle = &compositionTitle - compositionResource.RelatedResource = utils.SortResourcePtrListByDate(resources) - compositionResource.SortDate = compositionResource.RelatedResource[0].SortDate - - //store the Composition resource - _, err = sr.UpsertResource(ctx, compositionResource) - if err != nil { - return err - } - - // - add AddResourceAssociation for all resources linked to the Composition resource - for _, resource := range rawResourceLookupTable { - if err := sr.AddResourceAssociation( - ctx, - &placeholderSource, - compositionResource.SourceResourceType, - compositionResource.SourceResourceID, - sourceLookup[resource.SourceID], - resource.SourceResourceType, - resource.SourceResourceID, - ); err != nil { - return err - } - } - - return nil -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// SourceCredential -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -func (sr *SqliteRepository) CreateSource(ctx context.Context, sourceCreds *models.SourceCredential) error { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return currentUserErr - } - sourceCreds.UserID = currentUser.ID - - //Assign will **always** update the source credential in the DB with data passed into this function. - return sr.GormClient.WithContext(ctx). - Where(models.SourceCredential{ - UserID: sourceCreds.UserID, - SourceType: sourceCreds.SourceType, - Patient: sourceCreds.Patient}). - Assign(*sourceCreds).FirstOrCreate(sourceCreds).Error -} - -func (sr *SqliteRepository) UpdateSource(ctx context.Context, sourceCreds *models.SourceCredential) error { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return currentUserErr - } - sourceCreds.UserID = currentUser.ID - - //Assign will **always** update the source credential in the DB with data passed into this function. - return sr.GormClient.WithContext(ctx). - Where(models.SourceCredential{ - ModelBase: models.ModelBase{ID: sourceCreds.ID}, - UserID: sourceCreds.UserID, - SourceType: sourceCreds.SourceType, - }).Updates(models.SourceCredential{ - AccessToken: sourceCreds.AccessToken, - RefreshToken: sourceCreds.RefreshToken, - ExpiresAt: sourceCreds.ExpiresAt, - DynamicClientId: sourceCreds.DynamicClientId, - DynamicClientRegistrationMode: sourceCreds.DynamicClientRegistrationMode, - DynamicClientJWKS: sourceCreds.DynamicClientJWKS, - LatestBackgroundJobID: sourceCreds.LatestBackgroundJobID, - }).Error -} - -func (sr *SqliteRepository) GetSource(ctx context.Context, sourceId string) (*models.SourceCredential, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return nil, currentUserErr - } - - sourceUUID, err := uuid.Parse(sourceId) - if err != nil { - return nil, err - } - - var sourceCred models.SourceCredential - results := sr.GormClient.WithContext(ctx). - Where(models.SourceCredential{UserID: currentUser.ID, ModelBase: models.ModelBase{ID: sourceUUID}}). - Preload("LatestBackgroundJob"). - First(&sourceCred) - - return &sourceCred, results.Error -} - -func (sr *SqliteRepository) GetSourceSummary(ctx context.Context, sourceId string) (*models.SourceSummary, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return nil, currentUserErr - } - - sourceUUID, err := uuid.Parse(sourceId) - if err != nil { - return nil, err - } - - sourceSummary := &models.SourceSummary{} - - source, err := sr.GetSource(ctx, sourceId) - if err != nil { - return nil, err - } - sourceSummary.Source = source - - //group by resource type and return counts - // SELECT source_resource_type as resource_type, COUNT(*) as count FROM resource_bases WHERE source_id = "53c1e930-63af-46c9-b760-8e83cbc1abd9" GROUP BY source_resource_type; - - var resourceTypeCounts []map[string]interface{} - - resourceTypes := databaseModel.GetAllowedResourceTypes() - for _, resourceType := range resourceTypes { - tableName, err := databaseModel.GetTableNameByResourceType(resourceType) - if err != nil { - return nil, err - } - var count int64 - result := sr.GormClient.WithContext(ctx). - Table(tableName). - Where(models.OriginBase{ - UserID: currentUser.ID, - SourceID: sourceUUID, - }). - Count(&count) - if result.Error != nil { - return nil, result.Error - } - if count == 0 { - continue //don't add resource counts if the count is 0 - } - resourceTypeCounts = append(resourceTypeCounts, map[string]interface{}{ - "source_id": sourceId, - "resource_type": resourceType, - "count": count, - }) - } - - sourceSummary.ResourceTypeCounts = resourceTypeCounts - - //set patient - patientTableName, err := databaseModel.GetTableNameByResourceType("Patient") - if err != nil { - return nil, err - } - var wrappedPatientResourceModel models.ResourceBase - patientResults := sr.GormClient.WithContext(ctx). - Where(models.OriginBase{ - UserID: currentUser.ID, - SourceResourceType: "Patient", - SourceID: sourceUUID, - }). - Table(patientTableName). - First(&wrappedPatientResourceModel) - - if patientResults.Error != nil { - return nil, patientResults.Error - } - sourceSummary.Patient = &wrappedPatientResourceModel - - return sourceSummary, nil -} - -func (sr *SqliteRepository) GetSources(ctx context.Context) ([]models.SourceCredential, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return nil, currentUserErr - } - - var sourceCreds []models.SourceCredential - results := sr.GormClient.WithContext(ctx). - Where(models.SourceCredential{UserID: currentUser.ID}). - Preload("LatestBackgroundJob"). - Find(&sourceCreds) - - return sourceCreds, results.Error -} - -func (sr *SqliteRepository) DeleteSource(ctx context.Context, sourceId string) (int64, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return 0, currentUserErr - } - - if strings.TrimSpace(sourceId) == "" { - return 0, fmt.Errorf("sourceId cannot be blank") - } - //delete all resources for this source - sourceUUID, err := uuid.Parse(sourceId) - if err != nil { - return 0, err - } - - rowsEffected := int64(0) - resourceTypes := databaseModel.GetAllowedResourceTypes() - for _, resourceType := range resourceTypes { - tableName, err := databaseModel.GetTableNameByResourceType(resourceType) - if err != nil { - return 0, err - } - results := sr.GormClient.WithContext(ctx). - Where(models.OriginBase{ - UserID: currentUser.ID, - SourceID: sourceUUID, - }). - Table(tableName). - Delete(&models.ResourceBase{}) - rowsEffected += results.RowsAffected - if results.Error != nil { - return rowsEffected, results.Error - } - } - - //delete relatedResources entries - results := sr.GormClient.WithContext(ctx). - Where(models.RelatedResource{ResourceBaseUserID: currentUser.ID, ResourceBaseSourceID: sourceUUID}). - Delete(&models.RelatedResource{}) - if results.Error != nil { - return rowsEffected, results.Error - } - - //soft delete the source credential - results = sr.GormClient.WithContext(ctx). - Where(models.SourceCredential{ - ModelBase: models.ModelBase{ - ID: sourceUUID, - }, - UserID: currentUser.ID, - }). - Delete(&models.SourceCredential{}) - rowsEffected += results.RowsAffected - return rowsEffected, results.Error -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Background Job -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -func (sr *SqliteRepository) CreateBackgroundJob(ctx context.Context, backgroundJob *models.BackgroundJob) error { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return currentUserErr - } - - backgroundJob.UserID = currentUser.ID - - record := sr.GormClient.Create(backgroundJob) - return record.Error -} - -func (sr *SqliteRepository) GetBackgroundJob(ctx context.Context, backgroundJobId string) (*models.BackgroundJob, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return nil, currentUserErr - } - - backgroundJobUUID, err := uuid.Parse(backgroundJobId) - if err != nil { - return nil, err - } - - var backgroundJob models.BackgroundJob - results := sr.GormClient.WithContext(ctx). - Where(models.SourceCredential{UserID: currentUser.ID, ModelBase: models.ModelBase{ID: backgroundJobUUID}}). - First(&backgroundJob) - - return &backgroundJob, results.Error -} - -func (sr *SqliteRepository) UpdateBackgroundJob(ctx context.Context, backgroundJob *models.BackgroundJob) error { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return currentUserErr - } - backgroundJob.UserID = currentUser.ID - - return sr.GormClient.WithContext(ctx). - Where(models.BackgroundJob{ - ModelBase: models.ModelBase{ID: backgroundJob.ID}, - UserID: backgroundJob.UserID, - }).Updates(models.BackgroundJob{ - JobStatus: backgroundJob.JobStatus, - Data: backgroundJob.Data, - LockedTime: backgroundJob.LockedTime, - DoneTime: backgroundJob.DoneTime, - Retries: backgroundJob.Retries, - Schedule: backgroundJob.Schedule, - }).Error -} - -func (sr *SqliteRepository) ListBackgroundJobs(ctx context.Context, queryOptions models.BackgroundJobQueryOptions) ([]models.BackgroundJob, error) { - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - return nil, currentUserErr - } - - queryParam := models.BackgroundJob{ - UserID: currentUser.ID, - } - - if queryOptions.JobType != nil { - queryParam.JobType = *queryOptions.JobType - } - if queryOptions.Status != nil { - queryParam.JobStatus = *queryOptions.Status - } - - var backgroundJobs []models.BackgroundJob - query := sr.GormClient.WithContext(ctx). - //Group("source_id"). //broken in Postgres. - Where(queryParam).Limit(queryOptions.Limit).Order("locked_time DESC") - - if queryOptions.Offset > 0 { - query = query.Offset(queryOptions.Offset) - } - - return backgroundJobs, query.Find(&backgroundJobs).Error -} - -func (sr *SqliteRepository) BackgroundJobCheckpoint(ctx context.Context, checkpointData map[string]interface{}, errorData map[string]interface{}) { - sr.Logger.Info("begin checkpointing background job...") - if len(checkpointData) == 0 && len(errorData) == 0 { - sr.Logger.Info("no changes detected. Skipping checkpoint") - return //nothing to do - } - defer sr.Logger.Info("end checkpointing background job") - - currentUser, currentUserErr := sr.GetCurrentUser(ctx) - if currentUserErr != nil { - sr.Logger.Warning("could not find current user info context. Ignoring checkpoint", currentUserErr) - return - } - - //make sure we do an atomic update - backgroundJobId, ok := ctx.Value(pkg.ContextKeyTypeBackgroundJobID).(string) - if !ok { - sr.Logger.Warning("could not find background job id in context. Ignoring checkpoint") - return - } - backgroundJobUUID, err := uuid.Parse(backgroundJobId) - if err != nil { - sr.Logger.Warning("could not parse background job id. Ignoring checkpoint", err) - return - } - //https://gorm.io/docs/advanced_query.html#Locking-FOR-UPDATE - //TODO: if using another database type (not SQLITE) we need to make sure we use the correct locking strategy - //This is not a problem in SQLITE because it does database (or table) level locking by default - //var backgroundJob models.BackgroundJob - //sr.GormClient.Clauses(clause.Locking{Strength: "UPDATE"}).Find(&backgroundJob) - - txErr := sr.GormClient.Transaction(func(tx *gorm.DB) error { - //retrieve the background job by id - var backgroundJob models.BackgroundJob - backgroundJobFindResults := tx.WithContext(ctx). - Where(models.BackgroundJob{ - ModelBase: models.ModelBase{ID: backgroundJobUUID}, - UserID: currentUser.ID, - }). - First(&backgroundJob) - if backgroundJobFindResults.Error != nil { - return backgroundJobFindResults.Error - } - - //deserialize the job data - var backgroundJobSyncData models.BackgroundJobSyncData - if backgroundJob.Data != nil { - err := json.Unmarshal(backgroundJob.Data, &backgroundJobSyncData) - if err != nil { - return err - } - } - - //update the job data with new data provided by the calling functiion - changed := false - if len(checkpointData) > 0 { - backgroundJobSyncData.CheckpointData = checkpointData - changed = true - } - if len(errorData) > 0 { - backgroundJobSyncData.ErrorData = errorData - changed = true - } - - //define a background job with the fields we're going to update - now := time.Now() - updatedBackgroundJob := models.BackgroundJob{ - LockedTime: &now, - } - if changed { - serializedData, err := json.Marshal(backgroundJobSyncData) - if err != nil { - return err - } - updatedBackgroundJob.Data = serializedData - - } - - return tx.WithContext(ctx). - Where(models.BackgroundJob{ - ModelBase: models.ModelBase{ID: backgroundJobUUID}, - UserID: currentUser.ID, - }).Updates(updatedBackgroundJob).Error - }) - - if txErr != nil { - sr.Logger.Warning("could not find or update background job. Ignoring checkpoint", txErr) - } - -} - -// when server restarts, we should unlock all locked jobs, and set their status to failed -// SECURITY: this is global, and effects all users. -func (sr *SqliteRepository) CancelAllLockedBackgroundJobsAndFail() error { - now := time.Now() - return sr.GormClient. - Where(models.BackgroundJob{JobStatus: pkg.BackgroundJobStatusLocked}). - Updates(models.BackgroundJob{ - JobStatus: pkg.BackgroundJobStatusFailed, - DoneTime: &now, - }).Error - -} - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Utilities -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -func sqlitePragmaString(pragmas map[string]string) string { - q := url.Values{} - for key, val := range pragmas { - q.Add("_pragma", fmt.Sprintf("%s=%s", key, val)) - } - - queryStr := q.Encode() - if len(queryStr) > 0 { - return "?" + queryStr - } - return "" -} - -// Internal function -// This function will return a list of resources from all FHIR tables in the database -// The query allows us to set the source id, source resource id, source resource type -// SECURITY: this function assumes the user has already been authenticated -// TODO: theres probably a more efficient way of doing this with GORM -func (sr *SqliteRepository) getResourcesFromAllTables(queryBuilder *gorm.DB, queryParam models.OriginBase) ([]models.ResourceBase, error) { - wrappedResourceModels := []models.ResourceBase{} - resourceTypes := databaseModel.GetAllowedResourceTypes() - for _, resourceType := range resourceTypes { - tableName, err := databaseModel.GetTableNameByResourceType(resourceType) - if err != nil { - return nil, err - } - var tempWrappedResourceModels []models.ResourceBase - results := queryBuilder. - Where(queryParam). - Table(tableName). - Find(&tempWrappedResourceModels) - if results.Error != nil { - return nil, results.Error - } - wrappedResourceModels = append(wrappedResourceModels, tempWrappedResourceModels...) - } - return wrappedResourceModels, nil -} diff --git a/backend/pkg/errors/errors.go b/backend/pkg/errors/errors.go index 41b3f815..8262ba6e 100644 --- a/backend/pkg/errors/errors.go +++ b/backend/pkg/errors/errors.go @@ -17,3 +17,10 @@ type ConfigValidationError string func (str ConfigValidationError) Error() string { return fmt.Sprintf("ConfigValidationError: %q", string(str)) } + +// Raised when the database type is unsupported +type DatabaseTypeNotSupportedError string + +func (str DatabaseTypeNotSupportedError) Error() string { + return fmt.Sprintf("DatabaseTypeNotSupportedError: %q", string(str)) +} diff --git a/backend/pkg/models/database/generate.go b/backend/pkg/models/database/generate.go index 54b553c3..af0d657a 100644 --- a/backend/pkg/models/database/generate.go +++ b/backend/pkg/models/database/generate.go @@ -6,13 +6,17 @@ package main import ( "encoding/json" "fmt" - "github.com/dave/jennifer/jen" - "github.com/iancoleman/strcase" - "golang.org/x/exp/slices" "io/ioutil" "log" + "os" "sort" "strings" + + "github.com/dave/jennifer/jen" + "github.com/fastenhealth/fasten-onprem/backend/pkg/config" + "github.com/fastenhealth/fasten-onprem/backend/pkg/errors" + "github.com/iancoleman/strcase" + "golang.org/x/exp/slices" ) type SearchParameter struct { @@ -47,6 +51,22 @@ PLEASE DO NOT EDIT BY HAND `, "\n"), "\n") func main() { + // Read config file for database type + appconfig, err := config.Create() + if err != nil { + fmt.Printf("FATAL: %+v\n", err) + os.Exit(1) + } + + // Find and read the config file + err = appconfig.ReadConfig("../../../../config.yaml") + if _, ok := err.(errors.ConfigFileMissingError); ok { // Handle errors reading the config file + //ignore "could not find config file" + } else if err != nil { + os.Exit(1) + } + databaseType := appconfig.GetString("database.type") + // Read the search-parameters.json file searchParamsData, err := ioutil.ReadFile("search-parameters.json") if err != nil { @@ -188,10 +208,18 @@ func main() { golangFieldStatement = g.Id(fieldName).Id(golangFieldType) } } - golangFieldStatement.Tag(map[string]string{ - "json": fmt.Sprintf("%s,omitempty", strcase.ToLowerCamel(fieldName)), - "gorm": fmt.Sprintf("column:%s;%s", strcase.ToLowerCamel(fieldName), mapGormType(fieldInfo.FieldType)), - }) + + if databaseType == "sqlite" { + golangFieldStatement.Tag(map[string]string{ + "json": fmt.Sprintf("%s,omitempty", strcase.ToLowerCamel(fieldName)), + "gorm": fmt.Sprintf("column:%s;%s", strcase.ToLowerCamel(fieldName), mapGormTypeSqlite(fieldInfo.FieldType)), + }) + } else { + golangFieldStatement.Tag(map[string]string{ + "json": fmt.Sprintf("%s,omitempty", strcase.ToLowerCamel(fieldName)), + "gorm": fmt.Sprintf("column:%s;%s", strcase.ToLowerCamel(fieldName), mapGormTypePostgres(fieldInfo.FieldType)), + }) + } } }) @@ -663,7 +691,7 @@ func mapFieldType(fieldType string) string { } // https://www.sqlite.org/datatype3.html -func mapGormType(fieldType string) string { +func mapGormTypeSqlite(fieldType string) string { // gorm:"type:text;serializer:json" switch fieldType { @@ -687,3 +715,26 @@ func mapGormType(fieldType string) string { return "type:text" } } + +func mapGormTypePostgres(fieldType string) string { + switch fieldType { + case "number": + return "type:real" + case "token": + return "type:text;serializer:json" + case "reference": + return "type:text;serializer:json" + case "date": + return "type:timestamptz" + case "string": + return "type:text;serializer:json" + case "uri": + return "type:text" + case "special": + return "type:text;serializer:json" + case "quantity": + return "type:text;serializer:json" + default: + return "type:text" + } +} diff --git a/backend/pkg/web/handler/source_test.go b/backend/pkg/web/handler/source_test.go index b49e903c..d7a201da 100644 --- a/backend/pkg/web/handler/source_test.go +++ b/backend/pkg/web/handler/source_test.go @@ -51,6 +51,7 @@ func (suite *SourceHandlerTestSuite) BeforeTest(suiteName, testName string) { appConfig := mock_config.NewMockInterface(suite.MockCtrl) appConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes() + appConfig.EXPECT().GetString("database.type").Return("sqlite").AnyTimes() appConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes() suite.AppConfig = appConfig diff --git a/config.yaml b/config.yaml index 47783c50..25fb57a7 100644 --- a/config.yaml +++ b/config.yaml @@ -20,9 +20,10 @@ web: frontend: path: /opt/fasten/web database: - location: '/opt/fasten/db/fasten.db' + type: 'sqlite' # postgres also supported, but still experimental changes + location: '/opt/fasten/db/fasten.db' # if postgres use a DSN, eg. `host=localhost user=gorm password=gorm dbname=gorm port=9920 sslmode=required TimeZone=Asia/Shanghai` log: - file: '' #absolute or relative paths allowed, eg. web.log + file: '' # absolute or relative paths allowed, eg. web.log level: INFO jwt: issuer: diff --git a/go.mod b/go.mod index 8e8b37ba..240ad0c9 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,13 @@ require ( golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 golang.org/x/net v0.14.0 gorm.io/datatypes v1.0.7 - gorm.io/gorm v1.24.1 + gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55 +) + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx/v5 v5.4.3 // indirect ) require ( @@ -104,6 +110,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gorm.io/driver/mysql v1.3.2 // indirect + gorm.io/driver/postgres v1.5.3 modernc.org/libc v1.19.0 // indirect modernc.org/mathutil v1.5.0 // indirect modernc.org/memory v1.4.0 // indirect diff --git a/go.sum b/go.sum index c2621375..aa6c707c 100644 --- a/go.sum +++ b/go.sum @@ -470,6 +470,8 @@ github.com/jackc/pgproto3/v2 v2.2.0 h1:r7JypeP2D3onoQTCxWdTpCtJ4D+qpKr0TxvoyMhZ5 github.com/jackc/pgproto3/v2 v2.2.0/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= @@ -482,6 +484,8 @@ github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQ github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= github.com/jackc/pgx/v4 v4.15.0 h1:B7dTkXsdILD3MF987WGGCcg+tvLW6bZJdEcqVFeU//w= github.com/jackc/pgx/v4 v4.15.0/go.mod h1:D/zyOyXiaM1TmVWnOM18p0xdDtdakRBa0RsVGI3U3bw= +github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY= +github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= @@ -1286,6 +1290,8 @@ gorm.io/driver/mysql v1.3.2 h1:QJryWiqQ91EvZ0jZL48NOpdlPdMjdip1hQ8bTgo4H7I= gorm.io/driver/mysql v1.3.2/go.mod h1:ChK6AHbHgDCFZyJp0F+BmVGb06PSIoh9uVYKAlRbb2U= gorm.io/driver/postgres v1.3.4 h1:evZ7plF+Bp+Lr1mO5NdPvd6M/N98XtwHixGB+y7fdEQ= gorm.io/driver/postgres v1.3.4/go.mod h1:y0vEuInFKJtijuSGu9e5bs5hzzSzPK+LancpKpvbRBw= +gorm.io/driver/postgres v1.5.3 h1:qKGY5CPHOuj47K/VxbCXJfFvIUeqMSXXadqdCY+MbBU= +gorm.io/driver/postgres v1.5.3/go.mod h1:F+LtvlFhZT7UBiA81mC9W6Su3D4WUhSboc/36QZU0gk= gorm.io/driver/sqlite v1.3.1 h1:bwfE+zTEWklBYoEodIOIBwuWHpnx52Z9zJFW5F33WLk= gorm.io/driver/sqlite v1.3.1/go.mod h1:wJx0hJspfycZ6myN38x1O/AqLtNS6c5o9TndewFbELg= gorm.io/driver/sqlserver v1.3.1 h1:F5t6ScMzOgy1zukRTIZgLZwKahgt3q1woAILVolKpOI= @@ -1295,6 +1301,8 @@ gorm.io/gorm v1.23.6/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= gorm.io/gorm v1.24.0/go.mod h1:DVrVomtaYTbqs7gB/x2uVvqnXzv0nqjB396B8cG4dBA= gorm.io/gorm v1.24.1 h1:CgvzRniUdG67hBAzsxDGOAuq4Te1osVMYsa1eQbd4fs= gorm.io/gorm v1.24.1/go.mod h1:DVrVomtaYTbqs7gB/x2uVvqnXzv0nqjB396B8cG4dBA= +gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55 h1:sC1Xj4TYrLqg1n3AN10w871An7wJM0gzgcm8jkIkECQ= +gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=