package database import ( "context" "encoding/json" "errors" "fmt" "strings" "time" "github.com/fastenhealth/fasten-onprem/backend/pkg" "github.com/fastenhealth/fasten-onprem/backend/pkg/config" "github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus" "github.com/fastenhealth/fasten-onprem/backend/pkg/models" databaseModel "github.com/fastenhealth/fasten-onprem/backend/pkg/models/database" "github.com/fastenhealth/fasten-onprem/backend/pkg/utils" sourceModel "github.com/fastenhealth/fasten-sources/clients/models" "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/sirupsen/logrus" "gorm.io/datatypes" "gorm.io/gorm" ) type GormRepository struct { AppConfig config.Interface Logger logrus.FieldLogger GormClient *gorm.DB EventBus event_bus.Interface } func (gr *GormRepository) Close() error { return nil } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // User //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func (gr *GormRepository) CreateUser(ctx context.Context, user *models.User) error { if err := user.HashPassword(user.Password); err != nil { return err } record := gr.GormClient.Create(user) if record.Error != nil { return record.Error } //create user settings err := gr.PopulateDefaultUserSettings(ctx, user.ID) if err != nil { return err } return nil } func (gr *GormRepository) GetUserByUsername(ctx context.Context, username string) (*models.User, error) { var foundUser models.User result := gr.GormClient.WithContext(ctx).Where(models.User{Username: username}).First(&foundUser) return &foundUser, result.Error } // TODO: check for error, right now we return a nil which may cause a panic. // TODO: can we cache the current user? //SECURITY: func (gr *GormRepository) GetCurrentUser(ctx context.Context) (*models.User, error) { username := ctx.Value(pkg.ContextKeyTypeAuthUsername) if username == nil { ginCtx, ginCtxOk := ctx.(*gin.Context) if !ginCtxOk { return nil, fmt.Errorf("could not convert context to gin context") } var exists bool username, exists = ginCtx.Get(pkg.ContextKeyTypeAuthUsername) if !exists { return nil, fmt.Errorf("could not extract username from context") } } var currentUser models.User usernameStr, usernameStrOk := username.(string) if !usernameStrOk { return nil, fmt.Errorf("could not convert username to string: %v", username) } result := gr.GormClient. WithContext(ctx). First(¤tUser, map[string]interface{}{"username": usernameStr}) if result.Error != nil { return nil, fmt.Errorf("could not retrieve current user: %v", result.Error) } return ¤tUser, nil } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Glossary //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func (gr *GormRepository) CreateGlossaryEntry(ctx context.Context, glossaryEntry *models.Glossary) error { record := gr.GormClient.WithContext(ctx).Create(glossaryEntry) if record.Error != nil { return record.Error } return nil } func (gr *GormRepository) GetGlossaryEntry(ctx context.Context, code string, codeSystem string) (*models.Glossary, error) { var foundGlossaryEntry models.Glossary result := gr.GormClient.WithContext(ctx). Where(models.Glossary{Code: code, CodeSystem: codeSystem}). First(&foundGlossaryEntry) return &foundGlossaryEntry, result.Error } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Summary //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func (gr *GormRepository) GetSummary(ctx context.Context) (*models.Summary, error) { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, currentUserErr } // we want a count of all resources for this user by type var resourceCountResults []map[string]interface{} resourceTypes := databaseModel.GetAllowedResourceTypes() for _, resourceType := range resourceTypes { tableName, err := databaseModel.GetTableNameByResourceType(resourceType) if err != nil { return nil, err } var count int64 result := gr.GormClient.WithContext(ctx). Table(tableName). Where(models.OriginBase{ UserID: currentUser.ID, }). Count(&count) if result.Error != nil { return nil, result.Error } if count == 0 { continue //don't add resource counts if the count is 0 } resourceCountResults = append(resourceCountResults, map[string]interface{}{ "resource_type": resourceType, "count": count, }) } // we want a list of all sources (when they were last updated) sources, err := gr.GetSources(ctx) if err != nil { return nil, err } // we want the main Patient for each source patients, err := gr.GetPatientForSources(ctx) if err != nil { return nil, err } if resourceCountResults == nil { resourceCountResults = []map[string]interface{}{} } summary := &models.Summary{ Sources: sources, ResourceTypeCounts: resourceCountResults, Patients: patients, } return summary, nil } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Resource //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // This function will create a new resource if it does not exist, or update an existing resource if it does exist. // It will also create associations between fhir resources // This function is called directly by fasten-sources func (gr *GormRepository) UpsertRawResource(ctx context.Context, sourceCredential sourceModel.SourceCredential, rawResource sourceModel.RawResourceFhir) (bool, error) { source := sourceCredential.(*models.SourceCredential) //convert from a raw resource (from fasten-sources) to a ResourceFhir (which matches the database models) wrappedResourceModel := &models.ResourceBase{ OriginBase: models.OriginBase{ ModelBase: models.ModelBase{}, UserID: source.UserID, SourceID: source.ID, SourceResourceID: rawResource.SourceResourceID, SourceResourceType: rawResource.SourceResourceType, }, SortTitle: rawResource.SortTitle, SortDate: rawResource.SortDate, ResourceRaw: datatypes.JSON(rawResource.ResourceRaw), RelatedResource: nil, } if len(rawResource.SourceUri) > 0 { wrappedResourceModel.SourceUri = &rawResource.SourceUri } //create associations //note: we create the association in the related_resources table **before** the model actually exists. //note: these associations are not reciprocal, (i.e. if Procedure references Location, Location may not reference Procedure) if rawResource.ReferencedResources != nil && len(rawResource.ReferencedResources) > 0 { for _, referencedResource := range rawResource.ReferencedResources { parts := strings.Split(referencedResource, "/") if len(parts) != 2 { continue } relatedResource := &models.ResourceBase{ OriginBase: models.OriginBase{ SourceID: source.ID, SourceResourceType: parts[0], SourceResourceID: parts[1], }, RelatedResource: nil, } err := gr.AddResourceAssociation( ctx, source, wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, source, relatedResource.SourceResourceType, relatedResource.SourceResourceID, ) if err != nil { return false, err } } } return gr.UpsertResource(ctx, wrappedResourceModel) } // UpsertResource // this method will upsert a resource, however it will not create associations. // UPSERT operation // - call FindOrCreate // - check if the resource exists // - if it does not exist, insert it // // - if no error during FindOrCreate && no rows affected (nothing was created) // - update the resource using Updates operation func (gr *GormRepository) UpsertResource(ctx context.Context, wrappedResourceModel *models.ResourceBase) (bool, error) { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return false, currentUserErr } wrappedResourceModel.UserID = currentUser.ID cachedResourceRaw := wrappedResourceModel.ResourceRaw gr.Logger.Infof("insert/update FHIRResource (%v) %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID) wrappedFhirResourceModel, err := databaseModel.NewFhirResourceModelByType(wrappedResourceModel.SourceResourceType) if err != nil { return false, err } wrappedFhirResourceModel.SetOriginBase(wrappedResourceModel.OriginBase) wrappedFhirResourceModel.SetSortTitle(wrappedResourceModel.SortTitle) wrappedFhirResourceModel.SetSortDate(wrappedResourceModel.SortDate) wrappedFhirResourceModel.SetSourceUri(wrappedResourceModel.SourceUri) //TODO: this takes too long, we need to find a way to do this processing faster or in the background async. err = wrappedFhirResourceModel.PopulateAndExtractSearchParameters(json.RawMessage(wrappedResourceModel.ResourceRaw)) if err != nil { gr.Logger.Warnf("ignoring: an error occurred while extracting SearchParameters using FHIRPath (%s/%s): %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, err) //wrappedFhirResourceModel.SetResourceRaw(wrappedResourceModel.ResourceRaw) } eventSourceSync := models.NewEventSourceSync( currentUser.ID.String(), wrappedFhirResourceModel.GetSourceID().String(), wrappedFhirResourceModel.GetSourceResourceType(), wrappedFhirResourceModel.GetSourceResourceID(), ) err = gr.EventBus.PublishMessage(eventSourceSync) if err != nil { gr.Logger.Warnf("ignoring: an error occurred while publishing event to eventBus (%s/%s): %v", wrappedResourceModel.SourceResourceType, wrappedResourceModel.SourceResourceID, err) } createResult := gr.GormClient.WithContext(ctx).Where(models.OriginBase{ SourceID: wrappedFhirResourceModel.GetSourceID(), SourceResourceID: wrappedFhirResourceModel.GetSourceResourceID(), SourceResourceType: wrappedFhirResourceModel.GetSourceResourceType(), //TODO: and UpdatedAt > old UpdatedAt }).Omit("RelatedResource.*").FirstOrCreate(wrappedFhirResourceModel) if createResult.Error != nil { return false, createResult.Error } else if createResult.RowsAffected == 0 { //at this point, wrappedResourceModel contains the data found in the database. // check if the database resource matches the new resource. if wrappedResourceModel.ResourceRaw.String() != string(cachedResourceRaw) { updateResult := createResult.Omit("RelatedResource.*").Updates(wrappedResourceModel) return updateResult.RowsAffected > 0, updateResult.Error } else { return false, nil } } else { //resource was created return createResult.RowsAffected > 0, createResult.Error } } func (gr *GormRepository) ListResources(ctx context.Context, queryOptions models.ListResourceQueryOptions) ([]models.ResourceBase, error) { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, currentUserErr } queryParam := models.OriginBase{ UserID: currentUser.ID, } if len(queryOptions.SourceResourceType) > 0 { queryParam.SourceResourceType = queryOptions.SourceResourceType } if len(queryOptions.SourceID) > 0 { sourceUUID, err := uuid.Parse(queryOptions.SourceID) if err != nil { return nil, err } queryParam.SourceID = sourceUUID } if len(queryOptions.SourceResourceID) > 0 { queryParam.SourceResourceID = queryOptions.SourceResourceID } manifestJson, _ := json.MarshalIndent(queryParam, "", " ") gr.Logger.Debugf("THE QUERY OBJECT===========> %v", string(manifestJson)) var wrappedResourceModels []models.ResourceBase queryBuilder := gr.GormClient.WithContext(ctx) if len(queryOptions.SourceResourceType) > 0 { tableName, err := databaseModel.GetTableNameByResourceType(queryOptions.SourceResourceType) if err != nil { return nil, err } queryBuilder = queryBuilder. Where(queryParam). Table(tableName) if queryOptions.Limit > 0 { queryBuilder = queryBuilder.Limit(queryOptions.Limit).Offset(queryOptions.Offset) } return wrappedResourceModels, queryBuilder.Find(&wrappedResourceModels).Error } else { if queryOptions.Limit > 0 { queryBuilder = queryBuilder.Limit(queryOptions.Limit).Offset(queryOptions.Offset) } //there is no FHIR Resource name specified, so we're querying across all FHIR resources return gr.getResourcesFromAllTables(queryBuilder, queryParam) } } // TODO: should this be deprecated? (replaced by ListResources) func (gr *GormRepository) GetResourceByResourceTypeAndId(ctx context.Context, sourceResourceType string, sourceResourceId string) (*models.ResourceBase, error) { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, currentUserErr } tableName, err := databaseModel.GetTableNameByResourceType(sourceResourceType) if err != nil { return nil, err } queryParam := models.OriginBase{ UserID: currentUser.ID, SourceResourceType: sourceResourceType, SourceResourceID: sourceResourceId, } var wrappedResourceModel models.ResourceBase results := gr.GormClient.WithContext(ctx). Where(queryParam). Table(tableName). First(&wrappedResourceModel) return &wrappedResourceModel, results.Error } // we need to figure out how to get the source resource type from the source resource id, or if we're searching across every table :( func (gr *GormRepository) GetResourceBySourceId(ctx context.Context, sourceId string, sourceResourceId string) (*models.ResourceBase, error) { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, currentUserErr } sourceIdUUID, err := uuid.Parse(sourceId) if err != nil { return nil, err } queryParam := models.OriginBase{ UserID: currentUser.ID, SourceID: sourceIdUUID, SourceResourceID: sourceResourceId, } //there is no FHIR Resource name specified, so we're querying across all FHIR resources wrappedResourceModels, err := gr.getResourcesFromAllTables(gr.GormClient.WithContext(ctx), queryParam) if len(wrappedResourceModels) > 0 { return &wrappedResourceModels[0], err } else { return nil, fmt.Errorf("no resource found with source id %s and source resource id %s", sourceId, sourceResourceId) } } // Get the patient for each source (for the current user) func (gr *GormRepository) GetPatientForSources(ctx context.Context) ([]models.ResourceBase, error) { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, currentUserErr } //SELECT * FROM resource_bases WHERE user_id = "" and source_resource_type = "Patient" GROUP BY source_id tableName, err := databaseModel.GetTableNameByResourceType("Patient") if err != nil { return nil, err } var wrappedResourceModels []models.ResourceBase results := gr.GormClient.WithContext(ctx). //Group("source_id"). //broken in Postgres. Where(models.OriginBase{ UserID: currentUser.ID, SourceResourceType: "Patient", }). Table(tableName). Find(&wrappedResourceModels) return wrappedResourceModels, results.Error } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Resource Associations //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // verifyAssociationPermission ensure that the sources are "owned" by the same user, and that the user is the current user func (gr *GormRepository) verifyAssociationPermission(ctx context.Context, sourceUserID uuid.UUID, relatedSourceUserID uuid.UUID) error { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return currentUserErr } if sourceUserID != relatedSourceUserID { return fmt.Errorf("user id's must match when adding associations") } else if sourceUserID != currentUser.ID { return fmt.Errorf("user id's must match current user") } return nil } func (gr *GormRepository) AddResourceAssociation(ctx context.Context, source *models.SourceCredential, resourceType string, resourceId string, relatedSource *models.SourceCredential, relatedResourceType string, relatedResourceId string) error { //ensure that the sources are "owned" by the same user err := gr.verifyAssociationPermission(ctx, source.UserID, relatedSource.UserID) if err != nil { return err } err = gr.GormClient.WithContext(ctx).Table("related_resources").Create(map[string]interface{}{ "resource_base_user_id": source.UserID, "resource_base_source_id": source.ID, "resource_base_source_resource_type": resourceType, "resource_base_source_resource_id": resourceId, "related_resource_user_id": relatedSource.UserID, "related_resource_source_id": relatedSource.ID, "related_resource_source_resource_type": relatedResourceType, "related_resource_source_resource_id": relatedResourceId, }).Error uniqueConstraintError := errors.New("UNIQUE constraint failed") if err != nil { if strings.HasPrefix(err.Error(), uniqueConstraintError.Error()) { gr.Logger.Warnf("Ignoring an error when creating a related_resource association for %s/%s: %v", resourceType, resourceId, err) //we can safely ignore this error return nil } } return err } func (gr *GormRepository) RemoveResourceAssociation(ctx context.Context, source *models.SourceCredential, resourceType string, resourceId string, relatedSource *models.SourceCredential, relatedResourceType string, relatedResourceId string) error { //ensure that the sources are "owned" by the same user err := gr.verifyAssociationPermission(ctx, source.UserID, relatedSource.UserID) if err != nil { return err } //manually delete association results := gr.GormClient.WithContext(ctx). //Table("related_resources"). Delete(&models.RelatedResource{}, map[string]interface{}{ "resource_base_user_id": source.UserID, "resource_base_source_id": source.ID, "resource_base_source_resource_type": resourceType, "resource_base_source_resource_id": resourceId, "related_resource_user_id": relatedSource.UserID, "related_resource_source_id": relatedSource.ID, "related_resource_source_resource_type": relatedResourceType, "related_resource_source_resource_id": relatedResourceId, }) if results.Error != nil { return results.Error } else if results.RowsAffected == 0 { return fmt.Errorf("no association found for %s/%s and %s/%s", resourceType, resourceId, relatedResourceType, relatedResourceId) } return nil } func (gr *GormRepository) FindResourceAssociationsByTypeAndId(ctx context.Context, source *models.SourceCredential, resourceType string, resourceId string) ([]models.RelatedResource, error) { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, currentUserErr } if source.UserID != currentUser.ID { return nil, fmt.Errorf("source credential must match the current user id") } // SELECT * FROM related_resources WHERE user_id = "53c1e930-63af-46c9-b760-8e83cbc1abd9"; var relatedResources []models.RelatedResource result := gr.GormClient.WithContext(ctx). Where(models.RelatedResource{ ResourceBaseUserID: currentUser.ID, ResourceBaseSourceID: source.ID, ResourceBaseSourceResourceType: resourceType, ResourceBaseSourceResourceID: resourceId, }). Find(&relatedResources) return relatedResources, result.Error } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Resource Composition (Grouping) //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // AddResourceComposition // this will group resources together into a "Composition" -- primarily to group related Encounters & Conditions into one semantic root. // algorithm: // - find source for each resource // - (SECURITY) ensure the current user and the source for each resource matches // - check if there is a Composition resource Type already. // - if Composition type already exists: // - update "relatesTo" field with additional data. // - else: // - Create a Composition resource type (populated with "relatesTo" references to all provided Resources) // // - add AddResourceAssociation for all resources linked to the Composition resource // - store the Composition resource // TODO: determine if we should be using a List Resource instead of a Composition resource func (gr *GormRepository) AddResourceComposition(ctx context.Context, compositionTitle string, resources []*models.ResourceBase) error { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return currentUserErr } //generate placeholder source placeholderSource := models.SourceCredential{UserID: currentUser.ID, SourceType: "manual", ModelBase: models.ModelBase{ID: uuid.Nil}} existingCompositionResources := []*models.ResourceBase{} rawResourceLookupTable := map[string]*models.ResourceBase{} //find the source for each resource we'd like to merge. (for ownership verification) sourceLookup := map[uuid.UUID]*models.SourceCredential{} for _, resource := range resources { if resource.SourceResourceType == pkg.FhirResourceTypeComposition { //skip, Composition resources don't have a valid SourceCredential existingCompositionResources = append(existingCompositionResources, resource) //compositions may include existing resources, make sure we handle these for _, related := range resource.RelatedResource { rawResourceLookupTable[fmt.Sprintf("%s/%s", related.SourceResourceType, related.SourceResourceID)] = related } continue } if _, sourceOk := sourceLookup[resource.SourceID]; !sourceOk { //source has not been added yet, lets query for it. sourceCred, err := gr.GetSource(ctx, resource.SourceID.String()) if err != nil { return fmt.Errorf("could not find source %s", resource.SourceID.String()) } sourceLookup[resource.SourceID] = sourceCred } rawResourceLookupTable[fmt.Sprintf("%s/%s", resource.SourceResourceType, resource.SourceResourceID)] = resource } // SECURITY: ensure the current user and the source for each resource matches for _, source := range sourceLookup { if source.UserID != currentUser.ID { return fmt.Errorf("source must be owned by the current user: %s vs %s", source.UserID, currentUser.ID) } } // - check if there is a Composition resource Type already. var compositionResource *models.ResourceBase if len(existingCompositionResources) > 0 { //- if Composition type already exists in this set // - update "relatesTo" field with additional data. compositionResource = existingCompositionResources[0] //disassociate all existing remaining composition resources. for _, existingCompositionResource := range existingCompositionResources[1:] { for _, relatedResource := range existingCompositionResource.RelatedResource { if err := gr.RemoveResourceAssociation( ctx, &placeholderSource, existingCompositionResource.SourceResourceType, existingCompositionResource.SourceResourceID, sourceLookup[relatedResource.SourceID], relatedResource.SourceResourceType, relatedResource.SourceResourceID, ); err != nil { //ignoring errors, could be due to duplicate edges return fmt.Errorf("an error occurred while removing resource association: %v", err) } } //remove this resource compositionTable, err := databaseModel.GetTableNameByResourceType("Composition") if err != nil { return fmt.Errorf("an error occurred while finding Composition resource table: %v", err) } //TODO: we may need to delete with using the FhirComposition struct type deleteResult := gr.GormClient.WithContext(ctx). Table(compositionTable). Delete(existingCompositionResource) if deleteResult.Error != nil { return fmt.Errorf("an error occurred while removing Composition resource(%s/%s): %v", existingCompositionResource.SourceResourceType, existingCompositionResource.SourceID, err) } else if deleteResult.RowsAffected != 1 { return fmt.Errorf("composition resource was not deleted %s/%s", existingCompositionResource.SourceResourceType, existingCompositionResource.SourceID) } } } else { //- else: // - Create a Composition resource type (populated with "relatesTo" references to all provided Resources) compositionResource = &models.ResourceBase{ OriginBase: models.OriginBase{ UserID: placeholderSource.UserID, // SourceID: placeholderSource.ID, //Empty SourceID expected ("0000-0000-0000-0000") SourceResourceType: pkg.FhirResourceTypeComposition, SourceResourceID: uuid.New().String(), }, } } // - Generate an "updated" RawResource json blob rawCompositionResource := models.ResourceComposition{ Title: compositionTitle, RelatesTo: []models.ResourceCompositionRelatesTo{}, } for relatedResourceKey, _ := range rawResourceLookupTable { rawCompositionResource.RelatesTo = append(rawCompositionResource.RelatesTo, models.ResourceCompositionRelatesTo{ Target: models.ResourceCompositionRelatesToTarget{ TargetReference: models.ResourceCompositionRelatesToTargetReference{ Reference: relatedResourceKey, }, }, }) } rawResourceJson, err := json.Marshal(rawCompositionResource) if err != nil { return err } compositionResource.ResourceRaw = rawResourceJson compositionResource.SortTitle = &compositionTitle compositionResource.RelatedResource = utils.SortResourcePtrListByDate(resources) compositionResource.SortDate = compositionResource.RelatedResource[0].SortDate //store the Composition resource _, err = gr.UpsertResource(ctx, compositionResource) if err != nil { return err } // - add AddResourceAssociation for all resources linked to the Composition resource for _, resource := range rawResourceLookupTable { if err := gr.AddResourceAssociation( ctx, &placeholderSource, compositionResource.SourceResourceType, compositionResource.SourceResourceID, sourceLookup[resource.SourceID], resource.SourceResourceType, resource.SourceResourceID, ); err != nil { return err } } return nil } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // SourceCredential //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func (gr *GormRepository) CreateSource(ctx context.Context, sourceCreds *models.SourceCredential) error { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return currentUserErr } sourceCreds.UserID = currentUser.ID //Assign will **always** update the source credential in the DB with data passed into this function. return gr.GormClient.WithContext(ctx). Where(models.SourceCredential{ UserID: sourceCreds.UserID, SourceType: sourceCreds.SourceType, Patient: sourceCreds.Patient}). Assign(*sourceCreds).FirstOrCreate(sourceCreds).Error } func (gr *GormRepository) UpdateSource(ctx context.Context, sourceCreds *models.SourceCredential) error { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return currentUserErr } sourceCreds.UserID = currentUser.ID //Assign will **always** update the source credential in the DB with data passed into this function. return gr.GormClient.WithContext(ctx). Where(models.SourceCredential{ ModelBase: models.ModelBase{ID: sourceCreds.ID}, UserID: sourceCreds.UserID, SourceType: sourceCreds.SourceType, }).Updates(models.SourceCredential{ AccessToken: sourceCreds.AccessToken, RefreshToken: sourceCreds.RefreshToken, ExpiresAt: sourceCreds.ExpiresAt, DynamicClientId: sourceCreds.DynamicClientId, DynamicClientRegistrationMode: sourceCreds.DynamicClientRegistrationMode, DynamicClientJWKS: sourceCreds.DynamicClientJWKS, LatestBackgroundJobID: sourceCreds.LatestBackgroundJobID, }).Error } func (gr *GormRepository) GetSource(ctx context.Context, sourceId string) (*models.SourceCredential, error) { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, currentUserErr } sourceUUID, err := uuid.Parse(sourceId) if err != nil { return nil, err } var sourceCred models.SourceCredential results := gr.GormClient.WithContext(ctx). Where(models.SourceCredential{UserID: currentUser.ID, ModelBase: models.ModelBase{ID: sourceUUID}}). Preload("LatestBackgroundJob"). First(&sourceCred) return &sourceCred, results.Error } func (gr *GormRepository) GetSourceSummary(ctx context.Context, sourceId string) (*models.SourceSummary, error) { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, currentUserErr } sourceUUID, err := uuid.Parse(sourceId) if err != nil { return nil, err } sourceSummary := &models.SourceSummary{} source, err := gr.GetSource(ctx, sourceId) if err != nil { return nil, err } sourceSummary.Source = source //group by resource type and return counts // SELECT source_resource_type as resource_type, COUNT(*) as count FROM resource_bases WHERE source_id = "53c1e930-63af-46c9-b760-8e83cbc1abd9" GROUP BY source_resource_type; var resourceTypeCounts []map[string]interface{} resourceTypes := databaseModel.GetAllowedResourceTypes() for _, resourceType := range resourceTypes { tableName, err := databaseModel.GetTableNameByResourceType(resourceType) if err != nil { return nil, err } var count int64 result := gr.GormClient.WithContext(ctx). Table(tableName). Where(models.OriginBase{ UserID: currentUser.ID, SourceID: sourceUUID, }). Count(&count) if result.Error != nil { return nil, result.Error } if count == 0 { continue //don't add resource counts if the count is 0 } resourceTypeCounts = append(resourceTypeCounts, map[string]interface{}{ "source_id": sourceId, "resource_type": resourceType, "count": count, }) } sourceSummary.ResourceTypeCounts = resourceTypeCounts //set patient patientTableName, err := databaseModel.GetTableNameByResourceType("Patient") if err != nil { return nil, err } var wrappedPatientResourceModel models.ResourceBase patientResults := gr.GormClient.WithContext(ctx). Where(models.OriginBase{ UserID: currentUser.ID, SourceResourceType: "Patient", SourceID: sourceUUID, }). Table(patientTableName). First(&wrappedPatientResourceModel) if patientResults.Error != nil { return nil, patientResults.Error } sourceSummary.Patient = &wrappedPatientResourceModel return sourceSummary, nil } func (gr *GormRepository) GetSources(ctx context.Context) ([]models.SourceCredential, error) { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, currentUserErr } var sourceCreds []models.SourceCredential results := gr.GormClient.WithContext(ctx). Where(models.SourceCredential{UserID: currentUser.ID}). Preload("LatestBackgroundJob"). Find(&sourceCreds) return sourceCreds, results.Error } func (gr *GormRepository) DeleteSource(ctx context.Context, sourceId string) (int64, error) { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return 0, currentUserErr } if strings.TrimSpace(sourceId) == "" { return 0, fmt.Errorf("sourceId cannot be blank") } //delete all resources for this source sourceUUID, err := uuid.Parse(sourceId) if err != nil { return 0, err } rowsEffected := int64(0) resourceTypes := databaseModel.GetAllowedResourceTypes() for _, resourceType := range resourceTypes { tableName, err := databaseModel.GetTableNameByResourceType(resourceType) if err != nil { return 0, err } results := gr.GormClient.WithContext(ctx). Where(models.OriginBase{ UserID: currentUser.ID, SourceID: sourceUUID, }). Table(tableName). Delete(&models.ResourceBase{}) rowsEffected += results.RowsAffected if results.Error != nil { return rowsEffected, results.Error } } //delete relatedResources entries results := gr.GormClient.WithContext(ctx). Where(models.RelatedResource{ResourceBaseUserID: currentUser.ID, ResourceBaseSourceID: sourceUUID}). Delete(&models.RelatedResource{}) if results.Error != nil { return rowsEffected, results.Error } //soft delete the source credential results = gr.GormClient.WithContext(ctx). Where(models.SourceCredential{ ModelBase: models.ModelBase{ ID: sourceUUID, }, UserID: currentUser.ID, }). Delete(&models.SourceCredential{}) rowsEffected += results.RowsAffected return rowsEffected, results.Error } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Background Job //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func (gr *GormRepository) CreateBackgroundJob(ctx context.Context, backgroundJob *models.BackgroundJob) error { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return currentUserErr } backgroundJob.UserID = currentUser.ID record := gr.GormClient.Create(backgroundJob) return record.Error } func (gr *GormRepository) GetBackgroundJob(ctx context.Context, backgroundJobId string) (*models.BackgroundJob, error) { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, currentUserErr } backgroundJobUUID, err := uuid.Parse(backgroundJobId) if err != nil { return nil, err } var backgroundJob models.BackgroundJob results := gr.GormClient.WithContext(ctx). Where(models.SourceCredential{UserID: currentUser.ID, ModelBase: models.ModelBase{ID: backgroundJobUUID}}). First(&backgroundJob) return &backgroundJob, results.Error } func (gr *GormRepository) UpdateBackgroundJob(ctx context.Context, backgroundJob *models.BackgroundJob) error { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return currentUserErr } backgroundJob.UserID = currentUser.ID return gr.GormClient.WithContext(ctx). Where(models.BackgroundJob{ ModelBase: models.ModelBase{ID: backgroundJob.ID}, UserID: backgroundJob.UserID, }).Updates(models.BackgroundJob{ JobStatus: backgroundJob.JobStatus, Data: backgroundJob.Data, LockedTime: backgroundJob.LockedTime, DoneTime: backgroundJob.DoneTime, Retries: backgroundJob.Retries, Schedule: backgroundJob.Schedule, }).Error } func (gr *GormRepository) ListBackgroundJobs(ctx context.Context, queryOptions models.BackgroundJobQueryOptions) ([]models.BackgroundJob, error) { currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { return nil, currentUserErr } queryParam := models.BackgroundJob{ UserID: currentUser.ID, } if queryOptions.JobType != nil { queryParam.JobType = *queryOptions.JobType } if queryOptions.Status != nil { queryParam.JobStatus = *queryOptions.Status } var backgroundJobs []models.BackgroundJob query := gr.GormClient.WithContext(ctx). //Group("source_id"). //broken in Postgres. Where(queryParam).Order("locked_time DESC") if queryOptions.Limit > 0 { query = query.Limit(queryOptions.Limit) } if queryOptions.Offset > 0 { query = query.Offset(queryOptions.Offset) } return backgroundJobs, query.Find(&backgroundJobs).Error } func (gr *GormRepository) BackgroundJobCheckpoint(ctx context.Context, checkpointData map[string]interface{}, errorData map[string]interface{}) { gr.Logger.Info("begin checkpointing background job...") if len(checkpointData) == 0 && len(errorData) == 0 { gr.Logger.Info("no changes detected. Skipping checkpoint") return //nothing to do } defer gr.Logger.Info("end checkpointing background job") currentUser, currentUserErr := gr.GetCurrentUser(ctx) if currentUserErr != nil { gr.Logger.Warning("could not find current user info context. Ignoring checkpoint", currentUserErr) return } //make sure we do an atomic update backgroundJobId, ok := ctx.Value(pkg.ContextKeyTypeBackgroundJobID).(string) if !ok { gr.Logger.Warning("could not find background job id in context. Ignoring checkpoint") return } backgroundJobUUID, err := uuid.Parse(backgroundJobId) if err != nil { gr.Logger.Warning("could not parse background job id. Ignoring checkpoint", err) return } //https://gorm.io/docs/advanced_query.html#Locking-FOR-UPDATE //TODO: if using another database type (not SQLITE) we need to make sure we use the correct locking strategy //This is not a problem in SQLITE because it does database (or table) level locking by default //var backgroundJob models.BackgroundJob //gr.GormClient.Clauses(clause.Locking{Strength: "UPDATE"}).Find(&backgroundJob) txErr := gr.GormClient.Transaction(func(tx *gorm.DB) error { //retrieve the background job by id var backgroundJob models.BackgroundJob backgroundJobFindResults := tx.WithContext(ctx). Where(models.BackgroundJob{ ModelBase: models.ModelBase{ID: backgroundJobUUID}, UserID: currentUser.ID, }). First(&backgroundJob) if backgroundJobFindResults.Error != nil { return backgroundJobFindResults.Error } //deserialize the job data var backgroundJobSyncData models.BackgroundJobSyncData if backgroundJob.Data != nil { err := json.Unmarshal(backgroundJob.Data, &backgroundJobSyncData) if err != nil { return err } } //update the job data with new data provided by the calling functiion changed := false if len(checkpointData) > 0 { backgroundJobSyncData.CheckpointData = checkpointData changed = true } if len(errorData) > 0 { backgroundJobSyncData.ErrorData = errorData changed = true } //define a background job with the fields we're going to update now := time.Now() updatedBackgroundJob := models.BackgroundJob{ LockedTime: &now, } if changed { serializedData, err := json.Marshal(backgroundJobSyncData) if err != nil { return err } updatedBackgroundJob.Data = serializedData } return tx.WithContext(ctx). Where(models.BackgroundJob{ ModelBase: models.ModelBase{ID: backgroundJobUUID}, UserID: currentUser.ID, }).Updates(updatedBackgroundJob).Error }) if txErr != nil { gr.Logger.Warning("could not find or update background job. Ignoring checkpoint", txErr) } } // when server restarts, we should unlock all locked jobs, and set their status to failed // SECURITY: this is global, and effects all users. func (gr *GormRepository) CancelAllLockedBackgroundJobsAndFail() error { now := time.Now() return gr.GormClient. Where(models.BackgroundJob{JobStatus: pkg.BackgroundJobStatusLocked}). Updates(models.BackgroundJob{ JobStatus: pkg.BackgroundJobStatusFailed, DoneTime: &now, }).Error } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Utilities //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Internal function // This function will return a list of resources from all FHIR tables in the database // The query allows us to set the source id, source resource id, source resource type // SECURITY: this function assumes the user has already been authenticated // TODO: theres probably a more efficient way of doing this with GORM func (gr *GormRepository) getResourcesFromAllTables(queryBuilder *gorm.DB, queryParam models.OriginBase) ([]models.ResourceBase, error) { wrappedResourceModels := []models.ResourceBase{} resourceTypes := databaseModel.GetAllowedResourceTypes() for _, resourceType := range resourceTypes { tableName, err := databaseModel.GetTableNameByResourceType(resourceType) if err != nil { return nil, err } var tempWrappedResourceModels []models.ResourceBase results := queryBuilder. Where(queryParam). Table(tableName). Find(&tempWrappedResourceModels) if results.Error != nil { return nil, results.Error } wrappedResourceModels = append(wrappedResourceModels, tempWrappedResourceModels...) } return wrappedResourceModels, nil }