fixed error where the source ID was not populated whne source is first created, causing next SyncAll command to have invalid source_id values.

Created default SyncAll function (using $everything) and SyncAllByResourceName which can be used as a fallback.

cleaned up DB layer , dont use Model unless Gorm cannot figure out the table. Use pointers consistently.
When syncing resources, store errors in a map, so that all resources get synced before returing errors
This commit is contained in:
Jason Kulatunga 2022-09-24 22:18:07 -07:00
parent 8531a028c9
commit 61d16cfb55
11 changed files with 154 additions and 68 deletions

View File

@ -11,11 +11,11 @@ type DatabaseRepository interface {
CreateUser(context.Context, *models.User) error CreateUser(context.Context, *models.User) error
GetUserByEmail(context.Context, string) (*models.User, error) GetUserByEmail(context.Context, string) (*models.User, error)
GetCurrentUser(context.Context) models.User GetCurrentUser(context.Context) *models.User
GetSummary(ctx context.Context) (*models.Summary, error) GetSummary(ctx context.Context) (*models.Summary, error)
UpsertResource(context.Context, models.ResourceFhir) error UpsertResource(context.Context, *models.ResourceFhir) error
GetResource(context.Context, string) (*models.ResourceFhir, error) GetResource(context.Context, string) (*models.ResourceFhir, error)
GetResourceBySourceId(context.Context, string, string) (*models.ResourceFhir, error) GetResourceBySourceId(context.Context, string, string) (*models.ResourceFhir, error)
ListResources(context.Context, models.ListResourceQueryOptions) ([]models.ResourceFhir, error) ListResources(context.Context, models.ListResourceQueryOptions) ([]models.ResourceFhir, error)

View File

@ -89,7 +89,7 @@ func (sr *sqliteRepository) CreateUser(ctx context.Context, user *models.User) e
if err := user.HashPassword(user.Password); err != nil { if err := user.HashPassword(user.Password); err != nil {
return err return err
} }
record := sr.gormClient.Create(&user) record := sr.gormClient.Create(user)
if record.Error != nil { if record.Error != nil {
return record.Error return record.Error
} }
@ -97,16 +97,16 @@ func (sr *sqliteRepository) CreateUser(ctx context.Context, user *models.User) e
} }
func (sr *sqliteRepository) GetUserByEmail(ctx context.Context, username string) (*models.User, error) { func (sr *sqliteRepository) GetUserByEmail(ctx context.Context, username string) (*models.User, error) {
var foundUser models.User var foundUser models.User
result := sr.gormClient.Model(models.User{}).Where(models.User{Username: username}).First(&foundUser) result := sr.gormClient.Where(models.User{Username: username}).First(&foundUser)
return &foundUser, result.Error return &foundUser, result.Error
} }
func (sr *sqliteRepository) GetCurrentUser(ctx context.Context) models.User { func (sr *sqliteRepository) GetCurrentUser(ctx context.Context) *models.User {
ginCtx := ctx.(*gin.Context) ginCtx := ctx.(*gin.Context)
var currentUser models.User var currentUser models.User
sr.gormClient.Model(models.User{}).First(&currentUser, models.User{Username: ginCtx.MustGet("AUTH_USERNAME").(string)}) sr.gormClient.First(&currentUser, models.User{Username: ginCtx.MustGet("AUTH_USERNAME").(string)})
return currentUser return &currentUser
} }
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -157,17 +157,17 @@ func (sr *sqliteRepository) GetSummary(ctx context.Context) (*models.Summary, er
// Resource // Resource
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (sr *sqliteRepository) UpsertResource(ctx context.Context, resourceModel models.ResourceFhir) error { func (sr *sqliteRepository) UpsertResource(ctx context.Context, resourceModel *models.ResourceFhir) error {
sr.logger.Infof("insert/update (%T) %v", resourceModel, resourceModel) sr.logger.Infof("insert/update (%T) %v", resourceModel, resourceModel)
if sr.gormClient.Debug().WithContext(ctx).Model(&resourceModel). if sr.gormClient.Debug().WithContext(ctx).
Where(models.OriginBase{ Where(models.OriginBase{
SourceID: resourceModel.GetSourceID(), SourceID: resourceModel.GetSourceID(),
SourceResourceID: resourceModel.GetSourceResourceID(), SourceResourceID: resourceModel.GetSourceResourceID(),
SourceResourceType: resourceModel.GetSourceResourceType(), //TODO: and UpdatedAt > old UpdatedAt SourceResourceType: resourceModel.GetSourceResourceType(), //TODO: and UpdatedAt > old UpdatedAt
}).Updates(&resourceModel).RowsAffected == 0 { }).Updates(resourceModel).RowsAffected == 0 {
sr.logger.Infof("resource does not exist, creating: %s %s %s", resourceModel.GetSourceID(), resourceModel.GetSourceResourceID(), resourceModel.GetSourceResourceType()) sr.logger.Infof("resource does not exist, creating: %s %s %s", resourceModel.GetSourceID(), resourceModel.GetSourceResourceID(), resourceModel.GetSourceResourceType())
return sr.gormClient.Debug().Model(&resourceModel).Create(&resourceModel).Error return sr.gormClient.Debug().Create(resourceModel).Error
} }
return nil return nil
} }
@ -274,12 +274,12 @@ func (sr *sqliteRepository) GetPatientForSources(ctx context.Context) ([]models.
func (sr *sqliteRepository) CreateSource(ctx context.Context, sourceCreds *models.Source) error { func (sr *sqliteRepository) CreateSource(ctx context.Context, sourceCreds *models.Source) error {
sourceCreds.UserID = sr.GetCurrentUser(ctx).ID sourceCreds.UserID = sr.GetCurrentUser(ctx).ID
if sr.gormClient.WithContext(ctx).Model(&sourceCreds). if sr.gormClient.WithContext(ctx).
Where(models.Source{ Where(models.Source{
UserID: sourceCreds.UserID, UserID: sourceCreds.UserID,
SourceType: sourceCreds.SourceType, SourceType: sourceCreds.SourceType,
PatientId: sourceCreds.PatientId}).Updates(&sourceCreds).RowsAffected == 0 { PatientId: sourceCreds.PatientId}).Updates(sourceCreds).RowsAffected == 0 {
return sr.gormClient.WithContext(ctx).Create(&sourceCreds).Error return sr.gormClient.WithContext(ctx).Create(sourceCreds).Error
} }
return nil return nil
} }

View File

@ -39,7 +39,7 @@ func (c AetnaClient) SyncAll(db database.DatabaseRepository) error {
//todo, create the resources in dependency order //todo, create the resources in dependency order
for _, apiModel := range wrappedResourceModels { for _, apiModel := range wrappedResourceModels {
err = db.UpsertResource(context.Background(), apiModel) err = db.UpsertResource(context.Background(), &apiModel)
if err != nil { if err != nil {
c.Logger.Info("An error occurred while upserting resource") c.Logger.Info("An error occurred while upserting resource")
return err return err

View File

@ -2,7 +2,6 @@ package athena
import ( import (
"context" "context"
"fmt"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/config" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/config"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/database" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/database"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/hub/internal/fhir/base" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/hub/internal/fhir/base"
@ -23,7 +22,6 @@ func NewClient(ctx context.Context, appConfig config.Interface, globalLogger log
} }
func (c AthenaClient) SyncAll(db database.DatabaseRepository) error { func (c AthenaClient) SyncAll(db database.DatabaseRepository) error {
supportedResources := []string{ supportedResources := []string{
"AllergyIntolerance", "AllergyIntolerance",
//"Binary", //"Binary",
@ -46,25 +44,6 @@ func (c AthenaClient) SyncAll(db database.DatabaseRepository) error {
"Procedure", "Procedure",
//"Provenance", //"Provenance",
} }
for _, resourceType := range supportedResources {
bundle, err := c.GetResourceBundle(fmt.Sprintf("%s?patient=%s", resourceType, c.Source.PatientId))
if err != nil {
return err
}
wrappedResourceModels, err := c.ProcessBundle(bundle)
if err != nil {
c.Logger.Infof("An error occurred while processing %s bundle %s", resourceType, c.Source.PatientId)
return err
}
//todo, create the resources in dependency order
for _, apiModel := range wrappedResourceModels {
err = db.UpsertResource(context.Background(), apiModel)
if err != nil {
return err
}
}
}
return nil
return c.SyncAllByResourceName(db, supportedResources)
} }

View File

@ -43,7 +43,7 @@ func (c *FHIR401Client) SyncAll(db database.DatabaseRepository) error {
//todo, create the resources in dependency order //todo, create the resources in dependency order
for _, apiModel := range wrappedResourceModels { for _, apiModel := range wrappedResourceModels {
err = db.UpsertResource(context.Background(), apiModel) err = db.UpsertResource(context.Background(), &apiModel)
if err != nil { if err != nil {
return err return err
} }
@ -51,6 +51,62 @@ func (c *FHIR401Client) SyncAll(db database.DatabaseRepository) error {
return nil return nil
} }
//TODO, find a way to sync references that cannot be searched by patient ID.
func (c *FHIR401Client) SyncAllByResourceName(db database.DatabaseRepository, resourceNames []string) error {
//Store the Patient
patientResource, err := c.GetPatient(c.Source.PatientId)
if err != nil {
return err
}
patientJson, err := patientResource.MarshalJSON()
if err != nil {
return err
}
patientResourceType, patientResourceId := patientResource.ResourceRef()
patientResourceFhir := models.ResourceFhir{
OriginBase: models.OriginBase{
UserID: c.Source.UserID,
SourceID: c.Source.ID,
SourceResourceType: patientResourceType,
SourceResourceID: *patientResourceId,
},
Payload: datatypes.JSON(patientJson),
}
db.UpsertResource(context.Background(), &patientResourceFhir)
//error map storage.
syncErrors := map[string]error{}
//Store all other resources.
for _, resourceType := range resourceNames {
bundle, err := c.GetResourceBundle(fmt.Sprintf("%s?patient=%s", resourceType, c.Source.PatientId))
if err != nil {
syncErrors[resourceType] = err
continue
}
wrappedResourceModels, err := c.ProcessBundle(bundle)
if err != nil {
c.Logger.Infof("An error occurred while processing %s bundle %s", resourceType, c.Source.PatientId)
syncErrors[resourceType] = err
continue
}
for _, apiModel := range wrappedResourceModels {
err = db.UpsertResource(context.Background(), &apiModel)
if err != nil {
syncErrors[resourceType] = err
continue
}
}
}
if len(syncErrors) > 0 {
return fmt.Errorf("%d error(s) occurred during sync: %v", len(syncErrors), syncErrors)
}
return nil
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// FHIR // FHIR
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -3,6 +3,7 @@ package bluebutton
import ( import (
"context" "context"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/config" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/config"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/database"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/hub/internal/fhir/base" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/hub/internal/fhir/base"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/models" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/models"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -19,3 +20,12 @@ func NewClient(ctx context.Context, appConfig config.Interface, globalLogger log
baseClient, baseClient,
}, updatedSource, err }, updatedSource, err
} }
func (c BlueButtonClient) SyncAll(db database.DatabaseRepository) error {
supportedResources := []string{
"ExplanationOfBenefit",
"Coverage",
}
return c.SyncAllByResourceName(db, supportedResources)
}

View File

@ -2,7 +2,6 @@ package cerner
import ( import (
"context" "context"
"fmt"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/config" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/config"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/database" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/database"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/hub/internal/fhir/base" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/hub/internal/fhir/base"
@ -50,25 +49,5 @@ func (c CernerClient) SyncAll(db database.DatabaseRepository) error {
"ServiceRequest", "ServiceRequest",
"Slot", "Slot",
} }
for _, resourceType := range supportedResources { return c.SyncAllByResourceName(db, supportedResources)
bundle, err := c.GetResourceBundle(fmt.Sprintf("%s?patient=%s", resourceType, c.Source.PatientId))
if err != nil {
continue //TODO: skippping failures in the resource retrival
}
wrappedResourceModels, err := c.ProcessBundle(bundle)
if err != nil {
c.Logger.Infof("An error occurred while processing %s bundle %s", resourceType, c.Source.PatientId)
return err
}
//todo, create the resources in dependency order
for _, apiModel := range wrappedResourceModels {
err = db.UpsertResource(context.Background(), apiModel)
if err != nil {
return err
}
}
}
return nil
} }

View File

@ -3,6 +3,7 @@ package epic
import ( import (
"context" "context"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/config" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/config"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/database"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/hub/internal/fhir/base" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/hub/internal/fhir/base"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/models" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/models"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -21,3 +22,33 @@ func NewClient(ctx context.Context, appConfig config.Interface, globalLogger log
baseClient, baseClient,
}, updatedSource, err }, updatedSource, err
} }
func (c EpicClient) SyncAll(db database.DatabaseRepository) error {
supportedResources := []string{
"AllergyIntolerance",
"CarePlan",
"CareTeam",
"Condition",
"Consent",
"Device",
"Encounter",
"FamilyMemberHistory",
"Goal",
"Immunization",
"InsurancePlan",
"MedicationRequest",
"NutritionOrder",
"Observation",
"Person",
"Procedure",
"Provenance",
"Questionnaire",
"QuestionnaireResponse",
"RelatedPerson",
"Schedule",
"ServiceRequest",
"Slot",
}
return c.SyncAllByResourceName(db, supportedResources)
}

View File

@ -3,6 +3,7 @@ package healthit
import ( import (
"context" "context"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/config" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/config"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/database"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/hub/internal/fhir/base" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/hub/internal/fhir/base"
"github.com/fastenhealth/fastenhealth-onprem/backend/pkg/models" "github.com/fastenhealth/fastenhealth-onprem/backend/pkg/models"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -21,3 +22,33 @@ func NewClient(ctx context.Context, appConfig config.Interface, globalLogger log
baseClient, baseClient,
}, updatedSource, err }, updatedSource, err
} }
func (c HealthItClient) SyncAll(db database.DatabaseRepository) error {
supportedResources := []string{
"AllergyIntolerance",
"CarePlan",
"CareTeam",
"Condition",
"Consent",
"Device",
"Encounter",
"FamilyMemberHistory",
"Goal",
"Immunization",
"InsurancePlan",
"MedicationRequest",
"NutritionOrder",
"Observation",
"Person",
"Procedure",
"Provenance",
"Questionnaire",
"QuestionnaireResponse",
"RelatedPerson",
"Schedule",
"ServiceRequest",
"Slot",
}
return c.SyncAllByResourceName(db, supportedResources)
}

View File

@ -85,7 +85,7 @@ func (m ManualClient) SyncAllBundle(db database.DatabaseRepository, bundleFile *
} }
// we need to upsert all resources (and make sure they are associated with new Source) // we need to upsert all resources (and make sure they are associated with new Source)
for _, apiModel := range resourceFhirList { for _, apiModel := range resourceFhirList {
err = db.UpsertResource(context.Background(), apiModel) err = db.UpsertResource(context.Background(), &apiModel)
if err != nil { if err != nil {
return fmt.Errorf("an error occurred while upserting resources: %w", err) return fmt.Errorf("an error occurred while upserting resources: %w", err)
} }

View File

@ -10,21 +10,21 @@ import (
func GetMetadataSource(c *gin.Context) { func GetMetadataSource(c *gin.Context) {
metadataSource := map[string]models.MetadataSource{ metadataSource := map[string]models.MetadataSource{
string(pkg.SourceTypeLogica): {Display: "Logica (Sandbox)", SourceType: pkg.SourceTypeLogica, Category: []string{"Sandbox"}, Supported: true},
string(pkg.SourceTypeAthena): {Display: "Athena (Sandbox)", SourceType: pkg.SourceTypeAthena, Category: []string{"Sandbox"}, Supported: true}, string(pkg.SourceTypeAthena): {Display: "Athena (Sandbox)", SourceType: pkg.SourceTypeAthena, Category: []string{"Sandbox"}, Supported: true},
string(pkg.SourceTypeEpic): {Display: "Epic (Sandbox)", SourceType: pkg.SourceTypeEpic, Category: []string{"Sandbox"}, Supported: true},
string(pkg.SourceTypeLogica): {Display: "Logica (Sandbox)", SourceType: pkg.SourceTypeLogica, Category: []string{"Sandbox"}, Supported: true},
string(pkg.SourceTypeHealthIT): {Display: "HealthIT (Sandbox)", SourceType: pkg.SourceTypeHealthIT, Category: []string{"Sandbox"}, Supported: true}, string(pkg.SourceTypeHealthIT): {Display: "HealthIT (Sandbox)", SourceType: pkg.SourceTypeHealthIT, Category: []string{"Sandbox"}, Supported: true},
// enabled // enabled
string(pkg.SourceTypeAetna): {Display: "Aetna", SourceType: pkg.SourceTypeAetna, Category: []string{"Insurance"}, Supported: true}, string(pkg.SourceTypeAetna): {Display: "Aetna", SourceType: pkg.SourceTypeAetna, Category: []string{"Insurance"}, Supported: true},
string(pkg.SourceTypeCigna): {Display: "Cigna", SourceType: pkg.SourceTypeCigna, Category: []string{"Insurance", "Hospital"}, Supported: true}, string(pkg.SourceTypeCigna): {Display: "Cigna", SourceType: pkg.SourceTypeCigna, Category: []string{"Insurance", "Hospital"}, Supported: true},
string(pkg.SourceTypeBlueButtonMedicare): {Display: "Medicare/VA Health (BlueButton)", SourceType: pkg.SourceTypeBlueButtonMedicare, Category: []string{"Hospital"}, Supported: true},
//TODO: infinite pagination for Encounters?? //TODO: infinite pagination for Encounters??
string(pkg.SourceTypeCerner): {Display: "Cerner (Sandbox)", SourceType: pkg.SourceTypeCerner, Category: []string{"Sandbox"}, Supported: true}, string(pkg.SourceTypeCerner): {Display: "Cerner (Sandbox)", SourceType: pkg.SourceTypeCerner, Category: []string{"Sandbox"}, Supported: true},
//TODO: does not support $everything endpoint. //TODO: fails with CORS error when swapping token. Should be confidential client.
string(pkg.SourceTypeBlueButtonMedicare): {Display: "Medicare/VA Health (BlueButton)", SourceType: pkg.SourceTypeBlueButtonMedicare, Category: []string{"Hospital"}, Supported: true}, string(pkg.SourceTypeCareEvolution): {Display: "CareEvolution (Sandbox)", SourceType: pkg.SourceTypeCareEvolution, Category: []string{"Sandbox"}, Supported: false},
string(pkg.SourceTypeEpic): {Display: "Epic (Sandbox)", SourceType: pkg.SourceTypeEpic, Category: []string{"Sandbox"}, Supported: true},
string(pkg.SourceTypeCareEvolution): {Display: "CareEvolution (Sandbox)", SourceType: pkg.SourceTypeCareEvolution, Category: []string{"Sandbox"}, Supported: true},
// pending // pending
string(pkg.SourceTypeAnthem): {Display: "Anthem", SourceType: pkg.SourceTypeAnthem, Category: []string{"Insurance"}}, string(pkg.SourceTypeAnthem): {Display: "Anthem", SourceType: pkg.SourceTypeAnthem, Category: []string{"Insurance"}},