Background Jobs (#266)

This commit is contained in:
Jason Kulatunga 2023-10-08 16:29:26 -07:00 committed by GitHub
parent b8cf1c23ad
commit dcabfc8cd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1146 additions and 95 deletions

View File

@ -1,6 +1,9 @@
package pkg
type ResourceGraphType string
type BackgroundJobStatus string
type BackgroundJobType string
type BackgroundJobSchedule string
const (
ResourceListPageSize int = 20
@ -10,8 +13,9 @@ const (
ContextKeyTypeLogger string = "LOGGER"
ContextKeyTypeEventBusServer string = "EVENT_BUS_SERVER"
ContextKeyTypeAuthUsername string = "AUTH_USERNAME"
ContextKeyTypeAuthToken string = "AUTH_TOKEN"
ContextKeyTypeAuthUsername string = "AUTH_USERNAME"
ContextKeyTypeAuthToken string = "AUTH_TOKEN"
ContextKeyTypeBackgroundJobID string = "BACKGROUND_JOB_ID"
FhirResourceTypeComposition string = "Composition"
@ -19,4 +23,17 @@ const (
ResourceGraphTypeAddressBook ResourceGraphType = "AddressBook"
ResourceGraphTypeMedications ResourceGraphType = "Medications"
ResourceGraphTypeBillingReport ResourceGraphType = "BillingReport"
BackgroundJobStatusReady BackgroundJobStatus = "STATUS_READY"
BackgroundJobStatusLocked BackgroundJobStatus = "STATUS_LOCKED"
BackgroundJobStatusFailed BackgroundJobStatus = "STATUS_FAILED"
BackgroundJobStatusDone BackgroundJobStatus = "STATUS_DONE"
BackgroundJobTypeSync BackgroundJobType = "SYNC"
BackgroundJobTypeScheduledSync BackgroundJobType = "SCHEDULED_SYNC"
BackgroundJobScheduleDaily BackgroundJobSchedule = "DAILY"
BackgroundJobScheduleWeekly BackgroundJobSchedule = "WEEKLY"
BackgroundJobScheduleBiWeekly BackgroundJobSchedule = "BIWEEKLY"
BackgroundJobScheduleMonthly BackgroundJobSchedule = "MONTHLY"
)

View File

@ -42,6 +42,12 @@ type DatabaseRepository interface {
CreateGlossaryEntry(ctx context.Context, glossaryEntry *models.Glossary) error
GetGlossaryEntry(ctx context.Context, code string, codeSystem string) (*models.Glossary, error)
//background jobs
CreateBackgroundJob(ctx context.Context, backgroundJob *models.BackgroundJob) error
GetBackgroundJob(ctx context.Context, backgroundJobId string) (*models.BackgroundJob, error)
UpdateBackgroundJob(ctx context.Context, backgroundJob *models.BackgroundJob) error
ListBackgroundJobs(ctx context.Context, queryOptions models.BackgroundJobQueryOptions) ([]models.BackgroundJob, error)
//settings
LoadUserSettings(ctx context.Context) (*models.UserSettings, error)
SaveUserSettings(context.Context, *models.UserSettings) error
@ -49,4 +55,5 @@ type DatabaseRepository interface {
//used by fasten-sources Clients
UpsertRawResource(ctx context.Context, sourceCredentials sourcePkg.SourceCredential, rawResource sourcePkg.RawResourceFhir) (bool, error)
BackgroundJobCheckpoint(ctx context.Context, checkpointData map[string]interface{}, errorData map[string]interface{})
}

View File

@ -66,6 +66,18 @@ func (mr *MockDatabaseRepositoryMockRecorder) AddResourceComposition(ctx, compos
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddResourceComposition", reflect.TypeOf((*MockDatabaseRepository)(nil).AddResourceComposition), ctx, compositionTitle, resources)
}
// BackgroundJobCheckpoint mocks base method.
func (m *MockDatabaseRepository) BackgroundJobCheckpoint(ctx context.Context, checkpointData, errorData map[string]interface{}) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BackgroundJobCheckpoint", ctx, checkpointData, errorData)
}
// BackgroundJobCheckpoint indicates an expected call of BackgroundJobCheckpoint.
func (mr *MockDatabaseRepositoryMockRecorder) BackgroundJobCheckpoint(ctx, checkpointData, errorData interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BackgroundJobCheckpoint", reflect.TypeOf((*MockDatabaseRepository)(nil).BackgroundJobCheckpoint), ctx, checkpointData, errorData)
}
// Close mocks base method.
func (m *MockDatabaseRepository) Close() error {
m.ctrl.T.Helper()
@ -80,6 +92,20 @@ func (mr *MockDatabaseRepositoryMockRecorder) Close() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockDatabaseRepository)(nil).Close))
}
// CreateBackgroundJob mocks base method.
func (m *MockDatabaseRepository) CreateBackgroundJob(ctx context.Context, backgroundJob *models.BackgroundJob) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateBackgroundJob", ctx, backgroundJob)
ret0, _ := ret[0].(error)
return ret0
}
// CreateBackgroundJob indicates an expected call of CreateBackgroundJob.
func (mr *MockDatabaseRepositoryMockRecorder) CreateBackgroundJob(ctx, backgroundJob interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateBackgroundJob", reflect.TypeOf((*MockDatabaseRepository)(nil).CreateBackgroundJob), ctx, backgroundJob)
}
// CreateGlossaryEntry mocks base method.
func (m *MockDatabaseRepository) CreateGlossaryEntry(ctx context.Context, glossaryEntry *models.Glossary) error {
m.ctrl.T.Helper()
@ -153,18 +179,19 @@ func (mr *MockDatabaseRepositoryMockRecorder) GetCurrentUser(ctx interface{}) *g
}
// GetFlattenedResourceGraph mocks base method.
func (m *MockDatabaseRepository) GetFlattenedResourceGraph(ctx context.Context, graphType pkg.ResourceGraphType) (map[string][]*models.ResourceBase, error) {
func (m *MockDatabaseRepository) GetFlattenedResourceGraph(ctx context.Context, graphType pkg.ResourceGraphType, options models.ResourceGraphOptions) (map[string][]*models.ResourceBase, *models.ResourceGraphMetadata, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetFlattenedResourceGraph", ctx, graphType)
ret := m.ctrl.Call(m, "GetFlattenedResourceGraph", ctx, graphType, options)
ret0, _ := ret[0].(map[string][]*models.ResourceBase)
ret1, _ := ret[1].(error)
return ret0, ret1
ret1, _ := ret[1].(*models.ResourceGraphMetadata)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// GetFlattenedResourceGraph indicates an expected call of GetFlattenedResourceGraph.
func (mr *MockDatabaseRepositoryMockRecorder) GetFlattenedResourceGraph(ctx, graphType interface{}) *gomock.Call {
func (mr *MockDatabaseRepositoryMockRecorder) GetFlattenedResourceGraph(ctx, graphType, options interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFlattenedResourceGraph", reflect.TypeOf((*MockDatabaseRepository)(nil).GetFlattenedResourceGraph), ctx, graphType)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFlattenedResourceGraph", reflect.TypeOf((*MockDatabaseRepository)(nil).GetFlattenedResourceGraph), ctx, graphType, options)
}
// GetGlossaryEntry mocks base method.
@ -302,6 +329,21 @@ func (mr *MockDatabaseRepositoryMockRecorder) GetUserByUsername(arg0, arg1 inter
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUserByUsername", reflect.TypeOf((*MockDatabaseRepository)(nil).GetUserByUsername), arg0, arg1)
}
// ListBackgroundJobs mocks base method.
func (m *MockDatabaseRepository) ListBackgroundJobs(ctx context.Context, queryOptions models.BackgroundJobQueryOptions) ([]models.BackgroundJob, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ListBackgroundJobs", ctx, queryOptions)
ret0, _ := ret[0].([]models.BackgroundJob)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ListBackgroundJobs indicates an expected call of ListBackgroundJobs.
func (mr *MockDatabaseRepositoryMockRecorder) ListBackgroundJobs(ctx, queryOptions interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListBackgroundJobs", reflect.TypeOf((*MockDatabaseRepository)(nil).ListBackgroundJobs), ctx, queryOptions)
}
// ListResources mocks base method.
func (m *MockDatabaseRepository) ListResources(arg0 context.Context, arg1 models.ListResourceQueryOptions) ([]models.ResourceBase, error) {
m.ctrl.T.Helper()
@ -403,6 +445,20 @@ func (mr *MockDatabaseRepositoryMockRecorder) SaveUserSettings(arg0, arg1 interf
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveUserSettings", reflect.TypeOf((*MockDatabaseRepository)(nil).SaveUserSettings), arg0, arg1)
}
// UpdateBackgroundJob mocks base method.
func (m *MockDatabaseRepository) UpdateBackgroundJob(ctx context.Context, backgroundJob *models.BackgroundJob) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateBackgroundJob", ctx, backgroundJob)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateBackgroundJob indicates an expected call of UpdateBackgroundJob.
func (mr *MockDatabaseRepositoryMockRecorder) UpdateBackgroundJob(ctx, backgroundJob interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateBackgroundJob", reflect.TypeOf((*MockDatabaseRepository)(nil).UpdateBackgroundJob), ctx, backgroundJob)
}
// UpdateSource mocks base method.
func (m *MockDatabaseRepository) UpdateSource(ctx context.Context, sourceCreds *models.SourceCredential) error {
m.ctrl.T.Helper()

View File

@ -20,6 +20,7 @@ import (
"gorm.io/gorm"
"net/url"
"strings"
"time"
)
func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger, eventBus event_bus.Interface) (DatabaseRepository, error) {
@ -30,6 +31,7 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger,
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
globalLogger.Infof("Trying to connect to sqlite db: %s\n", appConfig.GetString("database.location"))
// BUSY TIMEOUT SETTING DOCS ---
// When a transaction cannot lock the database, because it is already locked by another one,
// SQLite by default throws an error: database is locked. This behavior is usually not appropriate when
// concurrent access is needed, typically when multiple processes write to the same database.
@ -39,11 +41,20 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger,
// https://rsqlite.r-dbi.org/reference/sqlitesetbusyhandler
// retrying for 30000 milliseconds, 30seconds - this would be unreasonable for a distributed multi-tenant application,
// but should be fine for local usage.
//
// JOURNAL MODE WAL DOCS ---
//
// Write-Ahead Logging or WAL (New Way)
// In this case all writes are appended to a temporary file (write-ahead log) and this file is periodically merged with the original database. When SQLite is searching for something it would first check this temporary file and if nothing is found proceed with the main database file.
// As a result, readers dont compete with writers and performance is much better compared to the Old Way.
// https://stackoverflow.com/questions/4060772/sqlite-concurrent-access
pragmaStr := sqlitePragmaString(map[string]string{
"busy_timeout": "30000",
"busy_timeout": "5000",
"foreign_keys": "ON",
"journal_mode": "wal",
})
database, err := gorm.Open(sqlite.Open(appConfig.GetString("database.location")+pragmaStr), &gorm.Config{
dsn := "file:" + appConfig.GetString("database.location") + pragmaStr
database, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{
//TODO: figure out how to log database queries again.
//logger: logger
DisableForeignKeyConstraintWhenMigrating: true,
@ -56,7 +67,16 @@ func NewRepository(appConfig config.Interface, globalLogger logrus.FieldLogger,
if err != nil {
return nil, fmt.Errorf("Failed to connect to database! - %v", err)
}
globalLogger.Infof("Successfully connected to fasten sqlite db: %s\n", appConfig.GetString("database.location"))
globalLogger.Infof("Successfully connected to fasten sqlite db: %s\n", dsn)
////verify journal mode
//var journalMode []map[string]interface{}
//resp := database.Raw("PRAGMA journal_mode;").Scan(&journalMode)
//if resp.Error != nil {
// return nil, fmt.Errorf("Failed to verify journal mode! - %v", resp.Error)
//} else {
// globalLogger.Infof("Journal mode: %v", journalMode)
//}
fastenRepo := SqliteRepository{
AppConfig: appConfig,
@ -102,6 +122,7 @@ func (sr *SqliteRepository) Migrate() error {
err := sr.GormClient.AutoMigrate(
&models.User{},
&models.SourceCredential{},
&models.BackgroundJob{},
&models.Glossary{},
&models.UserSettingEntry{},
)
@ -141,8 +162,8 @@ func (sr *SqliteRepository) GetUserByUsername(ctx context.Context, username stri
return &foundUser, result.Error
}
//TODO: check for error, right now we return a nil which may cause a panic.
//TODO: can we cache the current user? //SECURITY:
// TODO: check for error, right now we return a nil which may cause a panic.
// TODO: can we cache the current user? //SECURITY:
func (sr *SqliteRepository) GetCurrentUser(ctx context.Context) (*models.User, error) {
username := ctx.Value(pkg.ContextKeyTypeAuthUsername)
if username == nil {
@ -447,7 +468,7 @@ func (sr *SqliteRepository) ListResources(ctx context.Context, queryOptions mode
}
}
//TODO: should this be deprecated? (replaced by ListResources)
// TODO: should this be deprecated? (replaced by ListResources)
func (sr *SqliteRepository) GetResourceByResourceTypeAndId(ctx context.Context, sourceResourceType string, sourceResourceId string) (*models.ResourceBase, error) {
currentUser, currentUserErr := sr.GetCurrentUser(ctx)
if currentUserErr != nil {
@ -637,13 +658,14 @@ func (sr *SqliteRepository) FindResourceAssociationsByTypeAndId(ctx context.Cont
// - find source for each resource
// - (SECURITY) ensure the current user and the source for each resource matches
// - check if there is a Composition resource Type already.
// - if Composition type already exists:
// - update "relatesTo" field with additional data.
// - else:
// - Create a Composition resource type (populated with "relatesTo" references to all provided Resources)
// - if Composition type already exists:
// - update "relatesTo" field with additional data.
// - else:
// - Create a Composition resource type (populated with "relatesTo" references to all provided Resources)
//
// - add AddResourceAssociation for all resources linked to the Composition resource
// - store the Composition resource
//TODO: determine if we should be using a List Resource instead of a Composition resource
// TODO: determine if we should be using a List Resource instead of a Composition resource
func (sr *SqliteRepository) AddResourceComposition(ctx context.Context, compositionTitle string, resources []*models.ResourceBase) error {
currentUser, currentUserErr := sr.GetCurrentUser(ctx)
if currentUserErr != nil {
@ -833,6 +855,7 @@ func (sr *SqliteRepository) UpdateSource(ctx context.Context, sourceCreds *model
DynamicClientId: sourceCreds.DynamicClientId,
DynamicClientRegistrationMode: sourceCreds.DynamicClientRegistrationMode,
DynamicClientJWKS: sourceCreds.DynamicClientJWKS,
LatestBackgroundJobID: sourceCreds.LatestBackgroundJobID,
}).Error
}
@ -850,6 +873,7 @@ func (sr *SqliteRepository) GetSource(ctx context.Context, sourceId string) (*mo
var sourceCred models.SourceCredential
results := sr.GormClient.WithContext(ctx).
Where(models.SourceCredential{UserID: currentUser.ID, ModelBase: models.ModelBase{ID: sourceUUID}}).
Preload("LatestBackgroundJob").
First(&sourceCred)
return &sourceCred, results.Error
@ -940,11 +964,188 @@ func (sr *SqliteRepository) GetSources(ctx context.Context) ([]models.SourceCred
var sourceCreds []models.SourceCredential
results := sr.GormClient.WithContext(ctx).
Where(models.SourceCredential{UserID: currentUser.ID}).
Preload("LatestBackgroundJob").
Find(&sourceCreds)
return sourceCreds, results.Error
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Background Job
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (sr *SqliteRepository) CreateBackgroundJob(ctx context.Context, backgroundJob *models.BackgroundJob) error {
currentUser, currentUserErr := sr.GetCurrentUser(ctx)
if currentUserErr != nil {
return currentUserErr
}
backgroundJob.UserID = currentUser.ID
record := sr.GormClient.Create(backgroundJob)
return record.Error
}
func (sr *SqliteRepository) GetBackgroundJob(ctx context.Context, backgroundJobId string) (*models.BackgroundJob, error) {
currentUser, currentUserErr := sr.GetCurrentUser(ctx)
if currentUserErr != nil {
return nil, currentUserErr
}
backgroundJobUUID, err := uuid.Parse(backgroundJobId)
if err != nil {
return nil, err
}
var backgroundJob models.BackgroundJob
results := sr.GormClient.WithContext(ctx).
Where(models.SourceCredential{UserID: currentUser.ID, ModelBase: models.ModelBase{ID: backgroundJobUUID}}).
First(&backgroundJob)
return &backgroundJob, results.Error
}
func (sr *SqliteRepository) UpdateBackgroundJob(ctx context.Context, backgroundJob *models.BackgroundJob) error {
currentUser, currentUserErr := sr.GetCurrentUser(ctx)
if currentUserErr != nil {
return currentUserErr
}
backgroundJob.UserID = currentUser.ID
return sr.GormClient.WithContext(ctx).
Where(models.BackgroundJob{
ModelBase: models.ModelBase{ID: backgroundJob.ID},
UserID: backgroundJob.UserID,
}).Updates(models.BackgroundJob{
JobStatus: backgroundJob.JobStatus,
Data: backgroundJob.Data,
LockedTime: backgroundJob.LockedTime,
DoneTime: backgroundJob.DoneTime,
Retries: backgroundJob.Retries,
Schedule: backgroundJob.Schedule,
}).Error
}
func (sr *SqliteRepository) ListBackgroundJobs(ctx context.Context, queryOptions models.BackgroundJobQueryOptions) ([]models.BackgroundJob, error) {
currentUser, currentUserErr := sr.GetCurrentUser(ctx)
if currentUserErr != nil {
return nil, currentUserErr
}
queryParam := models.BackgroundJob{
UserID: currentUser.ID,
}
if queryOptions.JobType != nil {
queryParam.JobType = *queryOptions.JobType
}
if queryOptions.Status != nil {
queryParam.JobStatus = *queryOptions.Status
}
var backgroundJobs []models.BackgroundJob
query := sr.GormClient.WithContext(ctx).
//Group("source_id"). //broken in Postgres.
Where(queryParam).Limit(queryOptions.Limit).Order("locked_time DESC")
if queryOptions.Offset > 0 {
query = query.Offset(queryOptions.Offset)
}
return backgroundJobs, query.Find(&backgroundJobs).Error
}
func (sr *SqliteRepository) BackgroundJobCheckpoint(ctx context.Context, checkpointData map[string]interface{}, errorData map[string]interface{}) {
sr.Logger.Info("begin checkpointing background job...")
if len(checkpointData) == 0 && len(errorData) == 0 {
sr.Logger.Info("no changes detected. Skipping checkpoint")
return //nothing to do
}
defer sr.Logger.Info("end checkpointing background job")
currentUser, currentUserErr := sr.GetCurrentUser(ctx)
if currentUserErr != nil {
sr.Logger.Warning("could not find current user info context. Ignoring checkpoint", currentUserErr)
return
}
//make sure we do an atomic update
backgroundJobId, ok := ctx.Value(pkg.ContextKeyTypeBackgroundJobID).(string)
if !ok {
sr.Logger.Warning("could not find background job id in context. Ignoring checkpoint")
return
}
backgroundJobUUID, err := uuid.Parse(backgroundJobId)
if err != nil {
sr.Logger.Warning("could not parse background job id. Ignoring checkpoint", err)
return
}
//https://gorm.io/docs/advanced_query.html#Locking-FOR-UPDATE
//TODO: if using another database type (not SQLITE) we need to make sure we use the correct locking strategy
//This is not a problem in SQLITE because it does database (or table) level locking by default
//var backgroundJob models.BackgroundJob
//sr.GormClient.Clauses(clause.Locking{Strength: "UPDATE"}).Find(&backgroundJob)
txErr := sr.GormClient.Transaction(func(tx *gorm.DB) error {
//retrieve the background job by id
var backgroundJob models.BackgroundJob
backgroundJobFindResults := tx.WithContext(ctx).
Where(models.BackgroundJob{
ModelBase: models.ModelBase{ID: backgroundJobUUID},
UserID: currentUser.ID,
}).
First(&backgroundJob)
if backgroundJobFindResults.Error != nil {
return backgroundJobFindResults.Error
}
//deserialize the job data
var backgroundJobSyncData models.BackgroundJobSyncData
if backgroundJob.Data != nil {
err := json.Unmarshal(backgroundJob.Data, &backgroundJobSyncData)
if err != nil {
return err
}
}
//update the job data with new data provided by the calling functiion
changed := false
if len(checkpointData) > 0 {
backgroundJobSyncData.CheckpointData = checkpointData
changed = true
}
if len(errorData) > 0 {
backgroundJobSyncData.ErrorData = errorData
changed = true
}
//define a background job with the fields we're going to update
now := time.Now()
updatedBackgroundJob := models.BackgroundJob{
LockedTime: &now,
}
if changed {
serializedData, err := json.Marshal(backgroundJobSyncData)
if err != nil {
return err
}
updatedBackgroundJob.Data = serializedData
}
return tx.WithContext(ctx).
Where(models.BackgroundJob{
ModelBase: models.ModelBase{ID: backgroundJobUUID},
UserID: currentUser.ID,
}).Updates(updatedBackgroundJob).Error
})
if txErr != nil {
sr.Logger.Warning("could not find or update background job. Ignoring checkpoint", txErr)
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Utilities
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -952,7 +1153,7 @@ func (sr *SqliteRepository) GetSources(ctx context.Context) ([]models.SourceCred
func sqlitePragmaString(pragmas map[string]string) string {
q := url.Values{}
for key, val := range pragmas {
q.Add("_pragma", key+"="+val)
q.Add("_pragma", fmt.Sprintf("%s=%s", key, val))
}
queryStr := q.Encode()

View File

@ -8,6 +8,7 @@ import (
"github.com/fastenhealth/fasten-onprem/backend/pkg/event_bus"
"github.com/fastenhealth/fasten-onprem/backend/pkg/models"
sourceModels "github.com/fastenhealth/fasten-sources/clients/models"
sourcePkg "github.com/fastenhealth/fasten-sources/pkg"
"github.com/fastenhealth/gofhir-models/fhir401"
fhirutils "github.com/fastenhealth/gofhir-models/fhir401/utils"
"github.com/gin-gonic/gin"
@ -22,6 +23,7 @@ import (
"net/http/httptest"
"os"
"testing"
"time"
)
func TestSourceCredentialInterface(t *testing.T) {
@ -1222,3 +1224,159 @@ func (suite *RepositoryTestSuite) TestAddResourceComposition_WithExistingComposi
}, associations)
}
func (suite *RepositoryTestSuite) TestCreateBackgroundJob_Sync() {
//setup
fakeConfig := mock_config.NewMockInterface(suite.MockCtrl)
fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes()
fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes()
dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer())
require.NoError(suite.T(), err)
userModel := &models.User{
Username: "test_username",
Password: "testpassword",
Email: "test@test.com",
}
err = dbRepo.CreateUser(context.Background(), userModel)
require.NoError(suite.T(), err)
authContext := context.WithValue(context.Background(), pkg.ContextKeyTypeAuthUsername, "test_username")
//test
sourceCredential := models.SourceCredential{ModelBase: models.ModelBase{ID: uuid.New()}, SourceType: sourcePkg.SourceType("bluebutton")}
backgroundJob := models.NewSyncBackgroundJob(sourceCredential)
err = dbRepo.CreateBackgroundJob(
context.WithValue(authContext, pkg.ContextKeyTypeAuthUsername, "test_username"),
backgroundJob,
)
//assert
require.NoError(suite.T(), err)
require.NotEqual(suite.T(), uuid.Nil, backgroundJob.ID)
require.Equal(suite.T(), pkg.BackgroundJobTypeSync, backgroundJob.JobType)
require.Equal(suite.T(), pkg.BackgroundJobStatusLocked, backgroundJob.JobStatus)
require.NotNil(suite.T(), backgroundJob.LockedTime)
require.Nil(suite.T(), backgroundJob.DoneTime)
require.Equal(suite.T(), userModel.ID, backgroundJob.UserID)
}
func (suite *RepositoryTestSuite) TestListBackgroundJobs() {
//setup
fakeConfig := mock_config.NewMockInterface(suite.MockCtrl)
fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes()
fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes()
dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer())
require.NoError(suite.T(), err)
userModel := &models.User{
Username: "test_username",
Password: "testpassword",
Email: "test@test.com",
}
err = dbRepo.CreateUser(context.Background(), userModel)
require.NoError(suite.T(), err)
authContext := context.WithValue(context.Background(), pkg.ContextKeyTypeAuthUsername, "test_username")
otherUserModel := &models.User{
Username: "test_other_username",
Password: "testpassword",
Email: "testother@test.com",
}
err = dbRepo.CreateUser(context.Background(), otherUserModel)
require.NoError(suite.T(), err)
testSourceCredential := models.SourceCredential{
ModelBase: models.ModelBase{
ID: uuid.New(),
},
UserID: userModel.ID,
}
backgroundJob := models.NewSyncBackgroundJob(testSourceCredential)
err = dbRepo.CreateBackgroundJob(
context.WithValue(authContext, pkg.ContextKeyTypeAuthUsername, "test_username"),
backgroundJob,
)
backgroundJob2 := models.NewSyncBackgroundJob(testSourceCredential)
backgroundJob2.JobType = pkg.BackgroundJobTypeScheduledSync
err = dbRepo.CreateBackgroundJob(
context.WithValue(authContext, pkg.ContextKeyTypeAuthUsername, "test_username"),
backgroundJob2,
)
backgroundJob3 := models.NewSyncBackgroundJob(testSourceCredential)
backgroundJob3.JobStatus = pkg.BackgroundJobStatusFailed
err = dbRepo.CreateBackgroundJob(
context.WithValue(authContext, pkg.ContextKeyTypeAuthUsername, "test_username"),
backgroundJob3,
)
require.NoError(suite.T(), err)
//test
foundAllBackgroundJobs, err := dbRepo.ListBackgroundJobs(authContext, models.BackgroundJobQueryOptions{})
require.NoError(suite.T(), err)
syncJobType := pkg.BackgroundJobTypeSync
foundBackgroundJobsByType, err := dbRepo.ListBackgroundJobs(authContext, models.BackgroundJobQueryOptions{
JobType: &syncJobType,
})
require.NoError(suite.T(), err)
syncFailedStatus := pkg.BackgroundJobStatusFailed
foundBackgroundJobsByStatus, err := dbRepo.ListBackgroundJobs(authContext, models.BackgroundJobQueryOptions{
Status: &syncFailedStatus,
})
require.NoError(suite.T(), err)
//assert
require.Equal(suite.T(), len(foundAllBackgroundJobs), 3)
require.Equal(suite.T(), len(foundBackgroundJobsByType), 2)
require.Equal(suite.T(), len(foundBackgroundJobsByStatus), 1)
}
func (suite *RepositoryTestSuite) TestUpdateBackgroundJob() {
//setup
fakeConfig := mock_config.NewMockInterface(suite.MockCtrl)
fakeConfig.EXPECT().GetString("database.location").Return(suite.TestDatabase.Name()).AnyTimes()
fakeConfig.EXPECT().GetString("log.level").Return("INFO").AnyTimes()
dbRepo, err := NewRepository(fakeConfig, logrus.WithField("test", suite.T().Name()), event_bus.NewNoopEventBusServer())
require.NoError(suite.T(), err)
userModel := &models.User{
Username: "test_username",
Password: "testpassword",
Email: "test@test.com",
}
err = dbRepo.CreateUser(context.Background(), userModel)
require.NoError(suite.T(), err)
authContext := context.WithValue(context.Background(), pkg.ContextKeyTypeAuthUsername, "test_username")
sourceCredential := models.SourceCredential{ModelBase: models.ModelBase{ID: uuid.New()}, SourceType: sourcePkg.SourceType("bluebutton")}
backgroundJob := models.NewSyncBackgroundJob(sourceCredential)
err = dbRepo.CreateBackgroundJob(
context.WithValue(authContext, pkg.ContextKeyTypeAuthUsername, "test_username"),
backgroundJob,
)
//test
now := time.Now()
backgroundJob.JobStatus = pkg.BackgroundJobStatusFailed
backgroundJob.DoneTime = &now
err = dbRepo.UpdateBackgroundJob(
authContext,
backgroundJob,
)
require.NoError(suite.T(), err)
//list all records and ensure that the updated record is the same
foundAllBackgroundJobs, err := dbRepo.ListBackgroundJobs(authContext, models.BackgroundJobQueryOptions{})
require.NoError(suite.T(), err)
//assert
require.Equal(suite.T(), 1, len(foundAllBackgroundJobs))
require.Equal(suite.T(), backgroundJob.ID, foundAllBackgroundJobs[0].ID)
require.Equal(suite.T(), pkg.BackgroundJobStatusFailed, foundAllBackgroundJobs[0].JobStatus)
require.NotNil(suite.T(), foundAllBackgroundJobs[0].DoneTime)
}

View File

@ -0,0 +1,33 @@
package models
import (
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/google/uuid"
"gorm.io/datatypes"
"gorm.io/gorm"
"time"
)
type BackgroundJob struct {
ModelBase
User User `json:"user,omitempty"` //SECURITY: user and user.id will be set by the repository service
UserID uuid.UUID `json:"user_id"`
JobType pkg.BackgroundJobType `json:"job_type"`
Data datatypes.JSON `gorm:"column:data;type:text;serializer:json" json:"data,omitempty"`
JobStatus pkg.BackgroundJobStatus `json:"job_status"`
LockedTime *time.Time `json:"locked_time"`
DoneTime *time.Time `json:"done_time"`
Retries int `json:"retries"`
Schedule *pkg.BackgroundJobSchedule `json:"schedule,omitempty"`
}
func (b *BackgroundJob) BeforeCreate(tx *gorm.DB) (err error) {
if err := b.ModelBase.BeforeCreate(tx); err != nil {
return err
}
if b.JobStatus == "" {
b.JobStatus = pkg.BackgroundJobStatusReady
}
return
}

View File

@ -0,0 +1,12 @@
package models
import "github.com/fastenhealth/fasten-onprem/backend/pkg"
type BackgroundJobQueryOptions struct {
JobType *pkg.BackgroundJobType
Status *pkg.BackgroundJobStatus
//pagination
Limit int
Offset int
}

View File

@ -0,0 +1,12 @@
package models
import "github.com/fastenhealth/fasten-onprem/backend/pkg"
// TODO: this is a WIP.
func NewScheduledSyncBackgroundJob(schedule pkg.BackgroundJobSchedule) *BackgroundJob {
return &BackgroundJob{
JobType: pkg.BackgroundJobTypeScheduledSync,
JobStatus: pkg.BackgroundJobStatusReady, //scheduled jobs will not be processed immediately, so their status is set to READY
Schedule: &schedule,
}
}

View File

@ -0,0 +1,34 @@
package models
import (
"encoding/json"
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/google/uuid"
"time"
)
func NewSyncBackgroundJob(source SourceCredential) *BackgroundJob {
now := time.Now()
data := BackgroundJobSyncData{
SourceID: source.ID,
SourceType: string(source.SourceType),
CheckpointData: nil,
ErrorData: nil,
}
dataJson, _ := json.Marshal(data)
return &BackgroundJob{
JobType: pkg.BackgroundJobTypeSync,
JobStatus: pkg.BackgroundJobStatusLocked, //we lock the job immediately so that it doesn't get picked up by another worker
LockedTime: &now,
Data: dataJson,
}
}
type BackgroundJobSyncData struct {
SourceID uuid.UUID `json:"source_id"`
SourceType string `json:"source_type"`
CheckpointData map[string]interface{} `json:"checkpoint_data,omitempty"`
ErrorData map[string]interface{} `json:"error_data,omitempty"`
}

View File

@ -4,7 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg/jwk"
"github.com/fastenhealth/fasten-sources/pkg"
sourcesPkg "github.com/fastenhealth/fasten-sources/pkg"
"github.com/google/uuid"
"github.com/lestrrat-go/jwx/v2/jwa"
"github.com/lestrrat-go/jwx/v2/jwt"
@ -19,10 +19,13 @@ import (
// similar to LighthouseSourceDefinition from fasten-source
type SourceCredential struct {
ModelBase
User User `json:"user,omitempty"`
UserID uuid.UUID `json:"user_id" gorm:"uniqueIndex:idx_user_source_patient"`
SourceType pkg.SourceType `json:"source_type" gorm:"uniqueIndex:idx_user_source_patient"`
Patient string `json:"patient" gorm:"uniqueIndex:idx_user_source_patient"`
User *User `json:"user,omitempty"`
UserID uuid.UUID `json:"user_id" gorm:"uniqueIndex:idx_user_source_patient"`
SourceType sourcesPkg.SourceType `json:"source_type" gorm:"uniqueIndex:idx_user_source_patient"`
Patient string `json:"patient" gorm:"uniqueIndex:idx_user_source_patient"`
LatestBackgroundJob *BackgroundJob `json:"latest_background_job,omitempty"`
LatestBackgroundJobID *uuid.UUID `json:"-"`
//oauth endpoints
AuthorizationEndpoint string `json:"authorization_endpoint"`
@ -62,7 +65,7 @@ type SourceCredential struct {
DynamicClientId string `json:"dynamic_client_id"`
}
func (s *SourceCredential) GetSourceType() pkg.SourceType {
func (s *SourceCredential) GetSourceType() sourcesPkg.SourceType {
return s.SourceType
}
@ -131,7 +134,7 @@ func (s *SourceCredential) IsDynamicClient() bool {
return len(s.DynamicClientRegistrationMode) > 0
}
//this will set/update the AccessToken and Expiry using the dynamic client credentials
// this will set/update the AccessToken and Expiry using the dynamic client credentials
func (s *SourceCredential) RefreshDynamicClientAccessToken() error {
if len(s.DynamicClientRegistrationMode) == 0 {
return fmt.Errorf("dynamic client registration mode not set")

View File

@ -0,0 +1,194 @@
package handler
import (
"context"
"encoding/json"
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg"
"github.com/fastenhealth/fasten-onprem/backend/pkg/database"
"github.com/fastenhealth/fasten-onprem/backend/pkg/models"
"github.com/fastenhealth/fasten-sources/clients/factory"
sourceModels "github.com/fastenhealth/fasten-sources/clients/models"
sourcePkg "github.com/fastenhealth/fasten-sources/pkg"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"net/http"
"strconv"
"time"
)
// BackgroundJobSyncResources is a background job that syncs all FHIR resource for a given source
// It is a blocking function that will return only when the sync is complete or has failed
// It will create a background job and associate it with the source
// It will also update the access token and refresh token if they have been updated
// It will return the sync summary and error if any
// TODO: run in background thread, or use https://gobyexample.com/tickers
// TODO: use goroutine to truely run in the background (how will that work with DatabaseRepository, is that thread safe?) Mutex needed?
func BackgroundJobSyncResources(parentContext context.Context, logger *logrus.Entry, databaseRepo database.DatabaseRepository, sourceCred *models.SourceCredential) (sourceModels.UpsertSummary, error) {
var resultErr error
var backgroundJob *models.BackgroundJob
//Begin Sync JobStatus update process
//1. Check if the source is already syncing
if sourceCred.LatestBackgroundJob != nil && sourceCred.LatestBackgroundJob.JobStatus == pkg.BackgroundJobStatusLocked {
logger.Errorln("Sync operation already in progress, cannot continue.")
return sourceModels.UpsertSummary{}, fmt.Errorf("sync operation already in progress, cannot continue")
}
//since there's no sync in progress, lets create a new background job
//2. Create a new background job
backgroundJob = models.NewSyncBackgroundJob(*sourceCred)
err := databaseRepo.CreateBackgroundJob(parentContext, backgroundJob)
if err != nil {
resultErr = fmt.Errorf("an error occurred while creating background job: %w", err)
logger.Errorln(resultErr)
return sourceModels.UpsertSummary{}, resultErr
}
backgroundJobContext := CreateBackgroundJobContext(parentContext, backgroundJob.ID.String())
//3. Update the source with the background job id
sourceCred.LatestBackgroundJobID = &backgroundJob.ID
err = databaseRepo.UpdateSource(backgroundJobContext, sourceCred)
if err != nil {
logger.Warn("An error occurred while registering background job id with source, ignoring", err)
//we can safely ignore this error, because we'll be updating the status of the background job again later
}
// after creating the client, we should do a bulk import
sourceClient, err := factory.GetSourceClient(sourcePkg.GetFastenLighthouseEnv(), sourceCred.SourceType, backgroundJobContext, logger, sourceCred)
if err != nil {
resultErr = fmt.Errorf("an error occurred while initializing hub client using source credential: %w", err)
logger.Errorln(resultErr)
return sourceModels.UpsertSummary{}, resultErr
}
// BEGIN FINALIZER
defer func() {
//finalizer function - update the sync status to completed (or failed depending on the error status)
if sourceCred == nil {
logger.Errorln("sync status finalizer unable to complete, SourceCredential is null, ignoring", err)
return
} else {
//since we're finished with the sync (no matter the final status), we can clear the active background job id
sourceCred.LatestBackgroundJobID = nil
//this will also update the AccessToken & RefreshToken if they have been updated
err := databaseRepo.UpdateSource(backgroundJobContext, sourceCred)
if err != nil {
logger.Errorln("sync status finalizer failed updating source, ignoring", err)
}
}
//update the backgroundJob status to completed or failed
if backgroundJob == nil {
logger.Errorln("sync status finalizer unable to complete, BackgroundJob is null, ignoring", err)
return
} else {
//first, try to update the background job with the latest data
updatedBackgroundJob, err := databaseRepo.GetBackgroundJob(backgroundJobContext, backgroundJob.ID.String())
if err == nil {
//replace the current background job, with the updated one.
backgroundJob = updatedBackgroundJob
}
if resultErr == nil {
backgroundJob.JobStatus = pkg.BackgroundJobStatusDone
} else {
//if there's an error that we need to store, lets unmarshal the data from the backgroundjob
var backgroundJobSyncData models.BackgroundJobSyncData
if backgroundJob.Data != nil {
err = json.Unmarshal(backgroundJob.Data, &backgroundJobSyncData)
}
//ensure there's a map to store the error data
if backgroundJobSyncData.ErrorData == nil {
backgroundJobSyncData.ErrorData = map[string]interface{}{}
}
backgroundJobSyncData.ErrorData["final"] = resultErr.Error()
//marshal the new background job data
backgroundJob.Data, err = json.Marshal(backgroundJobSyncData)
backgroundJob.JobStatus = pkg.BackgroundJobStatusFailed
}
now := time.Now()
backgroundJob.DoneTime = &now
backgroundJob.LockedTime = nil
err = databaseRepo.UpdateBackgroundJob(backgroundJobContext, backgroundJob)
if err != nil {
logger.Errorln("sync status finalizer failed updating background job, ignoring", err)
}
}
}()
// END FINALIZER
summary, err := sourceClient.SyncAll(databaseRepo)
if err != nil {
resultErr = fmt.Errorf("an error occurred while bulk importing resources from source: %w", err)
logger.Errorln(resultErr)
return summary, resultErr
}
//update source incase the access token/refresh token has been updated
sourceCredential := sourceClient.GetSourceCredential()
sourceCredentialConcrete, ok := sourceCredential.(*models.SourceCredential)
if !ok {
resultErr = fmt.Errorf("an error occurred while updating source credential, source credential is not of type *models.SourceCredential")
logger.Errorln(resultErr)
return summary, resultErr
}
sourceCred = sourceCredentialConcrete
//updated credentials will be saved by the finalizer
return summary, resultErr
}
// Handlers
func ListBackgroundJobs(c *gin.Context) {
logger := c.MustGet(pkg.ContextKeyTypeLogger).(*logrus.Entry)
databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository)
backgroundJobQueryOptions := models.BackgroundJobQueryOptions{
Limit: pkg.ResourceListPageSize,
}
if len(c.Query("jobType")) > 0 {
jobType := pkg.BackgroundJobType(c.Query("jobType"))
backgroundJobQueryOptions.JobType = &jobType
}
if len(c.Query("status")) > 0 {
status := pkg.BackgroundJobStatus(c.Query("status"))
backgroundJobQueryOptions.Status = &status
}
if len(c.Query("page")) > 0 {
pageNumb, err := strconv.Atoi(c.Query("page"))
if err != nil {
logger.Errorln("An error occurred while calculating page number", err)
c.JSON(http.StatusInternalServerError, gin.H{"success": false})
return
}
backgroundJobQueryOptions.Offset = pageNumb * backgroundJobQueryOptions.Limit
}
backgroundJobs, err := databaseRepo.ListBackgroundJobs(c, backgroundJobQueryOptions)
if err != nil {
logger.Errorln("An error occurred while retrieving resources", err)
c.JSON(http.StatusInternalServerError, gin.H{"success": false})
return
}
c.JSON(http.StatusOK, gin.H{"success": true, "data": backgroundJobs})
}
// Utilities
func GetBackgroundContext(ginContext *gin.Context) context.Context {
return context.WithValue(context.Background(), pkg.ContextKeyTypeAuthUsername, ginContext.Value(pkg.ContextKeyTypeAuthUsername).(string))
}
func CreateBackgroundJobContext(parentContext context.Context, backgroundJobId string) context.Context {
return context.WithValue(parentContext, pkg.ContextKeyTypeBackgroundJobID, backgroundJobId)
}

View File

@ -2,7 +2,6 @@ package handler
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/fastenhealth/fasten-onprem/backend/pkg"
@ -11,7 +10,6 @@ import (
"github.com/fastenhealth/fasten-onprem/backend/pkg/jwk"
"github.com/fastenhealth/fasten-onprem/backend/pkg/models"
"github.com/fastenhealth/fasten-sources/clients/factory"
sourceModels "github.com/fastenhealth/fasten-sources/clients/models"
sourcePkg "github.com/fastenhealth/fasten-sources/pkg"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
@ -144,7 +142,7 @@ func CreateSource(c *gin.Context) {
// after creating the source, we should do a bulk import (in the background)
summary, err := SyncSourceResources(GetBackgroundContext(c), logger, databaseRepo, &sourceCred)
summary, err := BackgroundJobSyncResources(GetBackgroundContext(c), logger, databaseRepo, &sourceCred)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"success": false})
return
@ -168,7 +166,7 @@ func SourceSync(c *gin.Context) {
}
// after creating the source, we should do a bulk import (in the background)
summary, err := SyncSourceResources(GetBackgroundContext(c), logger, databaseRepo, sourceCred)
summary, err := BackgroundJobSyncResources(GetBackgroundContext(c), logger, databaseRepo, sourceCred)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"success": false})
return
@ -314,40 +312,3 @@ func ListSource(c *gin.Context) {
}
c.JSON(http.StatusOK, gin.H{"success": true, "data": sourceCreds})
}
func SyncSourceResources(c context.Context, logger *logrus.Entry, databaseRepo database.DatabaseRepository, sourceCred *models.SourceCredential) (sourceModels.UpsertSummary, error) {
// after creating the source, we should do a bulk import
sourceClient, err := factory.GetSourceClient(sourcePkg.GetFastenLighthouseEnv(), sourceCred.SourceType, c, logger, sourceCred)
if err != nil {
logger.Errorln("An error occurred while initializing hub client using source credential", err)
return sourceModels.UpsertSummary{}, err
}
summary, err := sourceClient.SyncAll(databaseRepo)
if err != nil {
logger.Errorln("An error occurred while bulk import of resources from source", err)
return summary, err
}
//update source incase the access token/refresh token has been updated
sourceCredential := sourceClient.GetSourceCredential()
sourceCredentialConcrete, ok := sourceCredential.(*models.SourceCredential)
if !ok {
logger.Errorln("An error occurred while updating source credential, source credential is not of type *models.SourceCredential")
return summary, fmt.Errorf("source credential is not of type *models.SourceCredential")
}
err = databaseRepo.UpdateSource(c, sourceCredentialConcrete)
if err != nil {
logger.Errorf("An error occurred while updating source credential: %v", err)
return summary, err
}
return summary, nil
}
//
func GetBackgroundContext(ginContext *gin.Context) context.Context {
//TODO: this should be a background context
return context.WithValue(ginContext.Request.Context(), pkg.ContextKeyTypeAuthUsername, ginContext.Value(pkg.ContextKeyTypeAuthUsername).(string))
}

View File

@ -77,6 +77,8 @@ func (ae *AppEngine) Setup() (*gin.RouterGroup, *gin.Engine) {
secure.POST("/dashboards", handler.AddDashboardLocation)
//secure.GET("/dashboard/:dashboardId", handler.GetDashboard)
secure.GET("/jobs", handler.ListBackgroundJobs)
secure.POST("/query", handler.QueryResourceFhir)
//server-side-events handler (only supported on mac/linux)

View File

@ -16,6 +16,7 @@ import {ResourceCreatorComponent} from './pages/resource-creator/resource-creato
import {ExploreComponent} from './pages/explore/explore.component';
import {environment} from '../environments/environment';
import {DesktopCallbackComponent} from './pages/desktop-callback/desktop-callback.component';
import {BackgroundJobsComponent} from './pages/background-jobs/background-jobs.component';
const routes: Routes = [
@ -39,6 +40,7 @@ const routes: Routes = [
{ path: 'desktop/callback/:source_id', component: DesktopCallbackComponent, canActivate: [ IsAuthenticatedAuthGuard] },
{ path: 'background-jobs', component: BackgroundJobsComponent, canActivate: [ IsAuthenticatedAuthGuard] },
{ path: 'patient-profile', component: PatientProfileComponent, canActivate: [ IsAuthenticatedAuthGuard] },
{ path: 'medical-history', component: MedicalHistoryComponent, canActivate: [ IsAuthenticatedAuthGuard] },
{ path: 'labs', component: ReportLabsComponent, canActivate: [ IsAuthenticatedAuthGuard] },

View File

@ -37,6 +37,7 @@ import {WidgetsModule} from './widgets/widgets.module';
import { ExploreComponent } from './pages/explore/explore.component';
import {DirectivesModule} from './directives/directives.module';
import { DesktopCallbackComponent } from './pages/desktop-callback/desktop-callback.component';
import { BackgroundJobsComponent } from './pages/background-jobs/background-jobs.component';
@NgModule({
declarations: [
@ -55,6 +56,7 @@ import { DesktopCallbackComponent } from './pages/desktop-callback/desktop-callb
ResourceCreatorComponent,
ExploreComponent,
DesktopCallbackComponent,
BackgroundJobsComponent,
],
imports: [
FormsModule,

View File

@ -29,7 +29,7 @@
</ul>
</div><!-- az-header-menu -->
<div class="az-header-right">
<a ngbTooltip="not yet implemented" class="az-header-search-link"><i class="fas fa-search"></i></a>
<a style="display:none;" ngbTooltip="not yet implemented" class="az-header-search-link"><i class="fas fa-search"></i></a>
<div class="az-header-message">
<a routerLink="/"><i class="typcn typcn-messages"></i></a>
</div><!-- az-header-message -->
@ -40,25 +40,17 @@
<a class="az-header-arrow" (click)="closeMenu($event)"><i class="icon ion-md-arrow-back"></i></a>
</div>
<h6 class="az-notification-title">Notifications</h6>
<p class="az-notification-text">You have 2 unread notification</p>
<p class="az-notification-text">Background Jobs & Updates</p>
<div class="az-notification-list">
<div class="media new">
<div class="az-img-user"><img src="assets/sources/aetna.png" alt=""></div>
<a *ngFor="let backgroundJob of backgroundJobs" class="media new" routerLink="/background-jobs">
<div><img style="max-width:50px;" src="assets/sources/{{backgroundJob.data?.source_type}}.png" alt=""></div>
<div class="media-body">
<p><strong>AETNA</strong> added 3 new records</p>
<span>Mar 15 12:32pm</span>
<p><strong>{{backgroundJob.data?.source_type}}</strong> added {{backgroundJob.data?.checkpoint_data?.summary?.UpdatedResources?.length}} new records</p>
<span>{{backgroundJob.done_time | amDateFormat:'MMM DD hh:mma'}}</span>
</div><!-- media-body -->
</div><!-- media -->
<div class="media new">
<div class="az-img-user online"><img src="assets/sources/cigna.png" alt=""></div>
<div class="media-body">
<p><strong>CIGNA</strong> added 34 new records</p>
<span>Mar 13 04:16am</span>
</div><!-- media-body -->
</div><!-- media -->
</a><!-- media -->
</div><!-- az-notification-list -->
<div class="dropdown-footer"><a ngbTooltip="not yet implemented">View All Notifications</a></div>
<div class="dropdown-footer"><a routerLink="/background-jobs">View History</a></div>
</div><!-- dropdown-menu -->
</div><!-- az-header-notification -->
<div class="dropdown az-profile-menu" ngbDropdown>

View File

@ -2,6 +2,8 @@ import { Component, OnInit } from '@angular/core';
import { Router } from '@angular/router';
import {AuthService} from '../../services/auth.service';
import {UserRegisteredClaims} from '../../models/fasten/user-registered-claims';
import {FastenApiService} from '../../services/fasten-api.service';
import {BackgroundJob} from '../../models/fasten/background-job';
@Component({
selector: 'app-header',
templateUrl: './header.component.html',
@ -9,7 +11,8 @@ import {UserRegisteredClaims} from '../../models/fasten/user-registered-claims';
})
export class HeaderComponent implements OnInit {
current_user_claims: UserRegisteredClaims
constructor(private authService: AuthService, private router: Router) { }
backgroundJobs: BackgroundJob[] = []
constructor(private authService: AuthService, private router: Router, private fastenApi: FastenApiService) { }
ngOnInit() {
try {
@ -18,6 +21,11 @@ export class HeaderComponent implements OnInit {
this.current_user_claims = new UserRegisteredClaims()
}
this.fastenApi.getBackgroundJobs().subscribe((data) => {
this.backgroundJobs = data.filter((job) => {
return job.data?.checkpoint_data?.summary?.UpdatedResources?.length > 0
})
})
}
closeMenu(e) {

View File

@ -1,10 +1,13 @@
<div class="card h-100 d-flex align-items-center justify-content-center mt-3 mt-3 rounded-0 cursor-pointer" [ngClass]="{'card-disable': sourceInfo?.metadata?.hidden}">
<div (click)="onCardClick()" class="card-body">
<div (click)="onCardClick()" class="card-body" [class.border-left-danger]="status == 'failed'">
<div class="h-100 d-flex align-items-center">
<img [src]="'assets/sources/'+(sourceInfo?.metadata.brand_logo ? sourceInfo?.metadata?.brand_logo : sourceInfo?.metadata?.source_type+'.png')" [alt]="sourceInfo?.metadata?.display" class="img-fluid">
<div *ngIf="status == 'failed'" class="card-img-overlay">
<span class="badge badge-danger">failed</span>
</div>
</div>
<div *ngIf="status" class="progress">
<div *ngIf="status == 'authorize' || status == 'token'" class="progress">
<div [style.width]="status == 'authorize' ? '33%' : '66%'" class="bg-indigo progress-bar progress-bar-striped progress-bar-animated" role="progressbar"></div>
</div>
</div>

View File

@ -1,3 +1,7 @@
.img-fluid {
min-height:50px;
}
.border-left-danger {
border-left: 5px solid #dc3545 !important;
}

View File

@ -9,7 +9,7 @@ import {SourceListItem} from '../../pages/medical-sources/medical-sources.compon
export class MedicalSourcesCardComponent implements OnInit {
@Input() sourceInfo: SourceListItem;
@Input() status: undefined | "token" | "authorize";
@Input() status: undefined | "token" | "authorize" | "failed";
@Output() onClick = new EventEmitter<SourceListItem>()

View File

@ -93,6 +93,26 @@ export const LoadingToken: Story = {
},
};
export const Failed: Story = {
args: {
sourceInfo: {
metadata: {
// aliases?: string[]
// brand_logo?: string
category: [],
display: "Aetna",
hidden: false,
// identifiers?: {[name:string]: string}
// patient_access_description?: string
// patient_access_url?: string
platform_type: "aetna",
source_type: "aetna"
}
},
status: 'failed'
},
};
export const Hidden: Story = {
args: {
@ -112,3 +132,22 @@ export const Hidden: Story = {
}
},
};
export const CustomBrandLogo: Story = {
args: {
sourceInfo: {
metadata: {
// aliases?: string[]
brand_logo: 'bluebutton.png',
category: [],
display: "Aetna",
hidden: false,
// identifiers?: {[name:string]: string}
// patient_access_description?: string
// patient_access_url?: string
platform_type: "aetna",
source_type: "aetna"
}
}
},
};

View File

@ -1,11 +1,10 @@
<h2 class="az-content-title">Connected Sources</h2>
<div *ngIf="!loading else isLoadingTemplate" class="row">
<app-medical-sources-card class="col-sm-3 mg-b-20 px-3"
*ngFor="let sourceData of connectedSourceList"
[sourceInfo]="sourceData"
[status]="status[sourceData.source.id] || status[sourceData.metadata.source_type]"
[status]="status[sourceData.source?.id] || status[sourceData.metadata?.source_type]"
(onClick)="openModal(contentModalRef, $event)"
></app-medical-sources-card>
</div>

View File

@ -19,7 +19,7 @@ import {EventBusService} from '../../services/event-bus.service';
})
export class MedicalSourcesConnectedComponent implements OnInit {
loading: boolean = false
status: { [name: string]: undefined | "token" | "authorize" } = {}
status: { [name: string]: undefined | "token" | "authorize" | "failed" } = {}
modalSelectedSourceListItem:SourceListItem = null;
modalCloseResult = '';
@ -47,6 +47,11 @@ export class MedicalSourcesConnectedComponent implements OnInit {
forkJoin(connectedSources.map((source) => this.lighthouseApi.getLighthouseSource(source.source_type))).subscribe((connectedMetadata) => {
for(const ndx in connectedSources){
this.connectedSourceList.push({source: connectedSources[ndx], metadata: connectedMetadata[ndx]})
if(connectedSources[ndx].latest_background_job?.job_status == "STATUS_LOCKED"){
this.status[connectedSources[ndx].source_type] = "token"
} else if (connectedSources[ndx].latest_background_job?.job_status === "STATUS_FAILED") {
this.status[connectedSources[ndx].source_type] = "failed"
}
}
})
})
@ -207,6 +212,10 @@ export class MedicalSourcesConnectedComponent implements OnInit {
toastNotification.type = ToastType.Error
toastNotification.message = `An error occurred while accessing ${sourceType}: '${JSON.stringify(err)}'`
toastNotification.autohide = false
toastNotification.link = {
text: "View Details",
url: `/background-jobs`
}
this.toastService.show(toastNotification)
console.error(err)
});
@ -275,7 +284,11 @@ export class MedicalSourcesConnectedComponent implements OnInit {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
public openModal(contentModalRef, sourceListItem: SourceListItem) {
if(this.status[sourceListItem.metadata.source_type] || !sourceListItem.source || this.status[sourceListItem.source.id]){
if(
(this.status[sourceListItem.metadata.source_type] && this.status[sourceListItem.metadata.source_type] != 'failed') //if this source type is currently "loading" dont open the modal window
|| !sourceListItem.source //if there's no connected source, dont open the modal window
|| (this.status[sourceListItem.source.id] && this.status[sourceListItem.source.id] != 'failed') //if this source type is currently "loading" dont open the modal window
){
//if this source is currently "loading" dont open the modal window
return
}

View File

@ -12,6 +12,10 @@
</ng-template>
{{ toast.message }}
<ng-container *ngIf="toast.link">
<br/>
<a [routerLink]="toast.link.url" class="toast-link text-light tx-bold cursor-pointer">{{toast.link.text}}</a>
</ng-container>
</ngb-toast>
</div>

View File

@ -0,0 +1,7 @@
toast-link{
font-weight: bold;
text-decoration: underline;
:hover{
text-decoration: none;
}
}

View File

@ -0,0 +1,100 @@
import type { Meta, StoryObj } from '@storybook/angular';
import {ToastComponent} from './toast.component';
import {ToastService} from '../../services/toast.service';
import {moduleMetadata} from '@storybook/angular';
import {ToastNotification, ToastType} from '../../models/fasten/toast';
import {NgbModule} from '@ng-bootstrap/ng-bootstrap';
// More on how to set up stories at: https://storybook.js.org/docs/angular/writing-stories/introduction
const meta: Meta<ToastComponent> = {
title: 'Components/ToastComponent',
component: ToastComponent,
decorators: [
moduleMetadata({
imports: [NgbModule]
})
// applicationConfig({
// providers: [importProvidersFrom(AppModule)],
// }),
],
tags: ['autodocs'],
render: (args: ToastComponent) => ({
props: {
backgroundColor: null,
...args,
},
}),
argTypes: {
toastService: {
description: 'ToastService',
control: {
type: 'object',
}
}
},
};
export default meta;
type Story = StoryObj<ToastComponent>;
// More on writing stories with args: https://storybook.js.org/docs/angular/writing-stories/args
export const Success: Story = {
args: {
toastService: {
toasts: [{
event_date: new Date(),
autohide: false,
type: ToastType.Success,
title: 'Success',
displayClass: 'demo-static-toast bg-indigo text-light',
message: 'This is a success message'
}],
show: (toastNotification: ToastNotification) => {},
remove: (toastNotification: ToastNotification) => {},
clear: () => {}
}
}
};
export const Error: Story = {
args: {
toastService: {
toasts: [{
event_date: new Date(),
autohide: false,
type: ToastType.Error,
title: 'Error',
displayClass: 'demo-static-toast bg-danger text-light',
message: 'This is an error message'
}],
show: (toastNotification: ToastNotification) => {},
remove: (toastNotification: ToastNotification) => {},
clear: () => {}
}
}
};
export const Link: Story = {
args: {
toastService: {
toasts: [{
event_date: new Date(),
autohide: false,
type: ToastType.Error,
title: 'Error',
displayClass: 'demo-static-toast bg-danger text-light',
message: 'This is an error message',
link: {
text: 'Click here',
url: '/background-jobs'
}
}],
show: (toastNotification: ToastNotification) => {},
remove: (toastNotification: ToastNotification) => {},
clear: () => {}
}
}
};

View File

@ -0,0 +1,11 @@
export class BackgroundJob {
created_at: string
user_id: string
job_type?: 'SYNC' | 'SCHEDULED_SYNC'
data?: any
job_status?: 'STATUS_READY' | 'STATUS_LOCKED' | 'STATUS_FAILED' | 'STATUS_DONE'
locked_time?: Date
done_time?: Date
retries: number
schedule?: string
}

View File

@ -1,9 +1,11 @@
import {LighthouseSourceMetadata} from '../lighthouse/lighthouse-source-metadata';
import {BackgroundJob} from './background-job';
export class Source extends LighthouseSourceMetadata{
id?: string
user_id?: number
source_type: string
latest_background_job?: BackgroundJob
patient: string
access_token: string

View File

@ -11,4 +11,8 @@ export class ToastNotification {
type: ToastType = ToastType.Info
displayClass: string = 'demo-static-toast'
autohide: boolean = true
link?: {
text: string,
url: string
}
}

View File

@ -0,0 +1,91 @@
<div class="az-content az-content-profile">
<div class="container mn-ht-100p">
<div class="content-wrapper w-100">
<div class="row">
<div class="col-12">
<div class="card">
<div class="card-body">
<h4 class="card-title">Background Job History</h4>
<div class="row grid-margin">
<div class="col-12">
<div class="alert alert-warning" role="alert">
<strong>10 Results</strong> Only the most recent jobs are shown.
</div>
</div>
</div>
<div class="row overflow-auto">
<div class="col-12">
<table id="order-listing" class="table" cellspacing="0" width="100%">
<thead>
<tr class="text-white">
<th></th>
<th>Job Type</th>
<th>Created At</th>
<th>Status</th>
<th>Last Updated</th>
<th>Completed At</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
<tr *ngFor="let backgroundJob of backgroundJobs" [class.border-left-danger]="backgroundJob.job_status == 'STATUS_FAILED'">
<td>
<img style="max-height:30px" [src]="'assets/sources/'+(backgroundJob?.data?.source_type + '.png')" [alt]="backgroundJob?.data?.source_type" class="img-fluid">
</td>
<td>{{backgroundJob.job_type}}</td>
<td container="body" [ngbTooltip]="backgroundJob.created_at | amDateFormat:'YYYY-MM-DD HH:mm'">{{backgroundJob.created_at | amDateFormat:'LL'}}</td>
<td><label class="badge badge-pill" [ngClass]="{
'badge-secondary': backgroundJob.job_status == 'STATUS_READY',
'badge-success': backgroundJob.job_status == 'STATUS_DONE',
'badge-warning': backgroundJob.job_status == 'STATUS_LOCKED',
'badge-danger': backgroundJob.job_status == 'STATUS_FAILED'
}">{{backgroundJob.job_status}}</label></td>
<td container="body" [ngbTooltip]="backgroundJob.locked_time | amDateFormat:'YYYY-MM-DD HH:mm'">{{backgroundJob.locked_time | amTimeAgo}}</td>
<td container="body" [ngbTooltip]="backgroundJob.done_time | amDateFormat:'YYYY-MM-DD HH:mm'">{{backgroundJob.done_time | amTimeAgo}}</td>
<td>
<button (click)="openModal(content, backgroundJob)" class="btn btn-outline-indigo btn-with-icon btn-rounded"><i class="fa fa-info-circle"></i> Details</button>
</td>
</tr>
</tbody>
</table>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
</div><!-- container -->
</div><!-- az-content -->
<ng-template #content let-modal>
<div class="modal-header">
<h4 class="modal-title">[{{selectedBackgroundJob.job_type}}] {{selectedBackgroundJob.created_at | amDateFormat:'YYYY-MM-DD HH:mm'}}</h4>
<button type="button" class="btn close" aria-label="Close" (click)="modal.dismiss('Cross click')"><span aria-hidden="true">×</span></button>
</div>
<div class="modal-body">
<div class="row">
<div class="col-6"><strong>Status</strong></div>
<div class="col-6"><label class="badge badge-pill" [ngClass]="{
'badge-secondary': selectedBackgroundJob.job_status == 'STATUS_READY',
'badge-success': selectedBackgroundJob.job_status == 'STATUS_DONE',
'badge-warning': selectedBackgroundJob.job_status == 'STATUS_LOCKED',
'badge-danger': selectedBackgroundJob.job_status == 'STATUS_FAILED'
}">{{selectedBackgroundJob.job_status}}</label></div>
<div class="col-6"><strong>Last Updated</strong></div>
<div class="col-6">{{selectedBackgroundJob.locked_time | amDateFormat:'YYYY-MM-DD HH:mm'}}</div>
<div class="col-6"><strong>Completed At</strong></div>
<div class="col-6">{{selectedBackgroundJob.done_time | amDateFormat:'YYYY-MM-DD HH:mm'}}</div>
</div>
<pre><code [highlight]="selectedBackgroundJob.data | json"></code></pre>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-light" (click)="modal.close('Close click')">Close</button>
</div>
</ng-template>

View File

@ -0,0 +1,23 @@
import { ComponentFixture, TestBed } from '@angular/core/testing';
import { BackgroundJobsComponent } from './background-jobs.component';
describe('BackgroundJobsComponent', () => {
let component: BackgroundJobsComponent;
let fixture: ComponentFixture<BackgroundJobsComponent>;
beforeEach(async () => {
await TestBed.configureTestingModule({
declarations: [ BackgroundJobsComponent ]
})
.compileComponents();
fixture = TestBed.createComponent(BackgroundJobsComponent);
component = fixture.componentInstance;
fixture.detectChanges();
});
it('should create', () => {
expect(component).toBeTruthy();
});
});

View File

@ -0,0 +1,28 @@
import { Component, OnInit } from '@angular/core';
import {FastenApiService} from '../../services/fasten-api.service';
import {BackgroundJob} from '../../models/fasten/background-job';
import { NgbModal } from '@ng-bootstrap/ng-bootstrap';
@Component({
selector: 'app-background-jobs',
templateUrl: './background-jobs.component.html',
styleUrls: ['./background-jobs.component.scss']
})
export class BackgroundJobsComponent implements OnInit {
backgroundJobs: BackgroundJob[] = []
selectedBackgroundJob: BackgroundJob = null
constructor(public fastenApi: FastenApiService, private modalService: NgbModal) { }
ngOnInit(): void {
this.fastenApi.getBackgroundJobs().subscribe((jobs) => {
this.backgroundJobs = jobs
})
}
openModal(content, backgroundJob: BackgroundJob) {
this.selectedBackgroundJob = backgroundJob
this.modalService.open(content, { size: 'lg', scrollable: true });
}
}

View File

@ -24,6 +24,7 @@ import {DashboardConfig} from '../models/widget/dashboard-config';
import {DashboardWidgetQuery} from '../models/widget/dashboard-widget-query';
import {ResourceGraphResponse} from '../models/fasten/resource-graph-response';
import { fetchEventSource } from '@microsoft/fetch-event-source';
import {BackgroundJob} from '../models/fasten/background-job';
@Injectable({
providedIn: 'root'
@ -268,4 +269,27 @@ export class FastenApiService {
return of(new BinaryModel(attachmentModel));
}
}
getBackgroundJobs(jobType?: string, status?: string, page?: number): Observable<BackgroundJob[]> {
let queryParams = {}
if(jobType){
queryParams["jobType"] = jobType
}
if(status){
queryParams["status"] = status
}
if(page !== undefined){
queryParams["page"] = page
}
return this._httpClient.get<any>(`${GetEndpointAbsolutePath(globalThis.location, environment.fasten_api_endpoint_base)}/secure/jobs`, {params: queryParams})
.pipe(
map((response: ResponseWrapper) => {
console.log("RESPONSE", response)
return response.data as BackgroundJob[]
})
);
}
}

View File

@ -20,6 +20,10 @@ select > optgroup > .divider {
cursor: auto !important;
}
.border-left-danger {
border-left: 5px solid #dc3545 !important;
}
// if text is too long, we can truncate
.truncate {
white-space: nowrap;

3
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/dave/jennifer v1.6.1
github.com/dominikbraun/graph v0.15.0
github.com/dop251/goja v0.0.0-20230605162241-28ee0ee714f3
github.com/fastenhealth/fasten-sources v0.3.3
github.com/fastenhealth/fasten-sources v0.4.0
github.com/fastenhealth/gofhir-models v0.0.6
github.com/gin-gonic/gin v1.9.0
github.com/glebarez/sqlite v1.5.0
@ -111,5 +111,6 @@ require (
)
//replace github.com/fastenhealth/fasten-sources => ../fasten-sources
//
//replace github.com/fastenhealth/gofhir-models => ../gofhir-models

4
go.sum
View File

@ -197,8 +197,8 @@ github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
github.com/evanphx/json-patch v4.5.0+incompatible h1:ouOWdg56aJriqS0huScTkVXPC5IcNrDCXZ6OoTAWu7M=
github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww=
github.com/fastenhealth/fasten-sources v0.3.3 h1:JI8fuXd6Cuq21T4aATLDS26Ubdy3foALFrQ/wa1Ylc8=
github.com/fastenhealth/fasten-sources v0.3.3/go.mod h1:YxgwR6jSEU+edYqEDkm238n7HlvZdW3i04nIh7gSDaM=
github.com/fastenhealth/fasten-sources v0.4.0 h1:vkfBpjZXyvJ1+jUAaxsZlgpkSZRh7oWaR9nx9PacLrc=
github.com/fastenhealth/fasten-sources v0.4.0/go.mod h1:YxgwR6jSEU+edYqEDkm238n7HlvZdW3i04nIh7gSDaM=
github.com/fastenhealth/gofhir-models v0.0.6 h1:yJYYaV1eJtHiGEfA1rXLsyOm/9hIi6s2cGoZzGfW1tM=
github.com/fastenhealth/gofhir-models v0.0.6/go.mod h1:xB8ikGxu3bUq2b1JYV+CZpHqBaLXpOizFR0eFBCunis=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=