package handler import ( "context" "encoding/json" "fmt" "github.com/fastenhealth/fasten-onprem/backend/pkg" "github.com/fastenhealth/fasten-onprem/backend/pkg/database" "github.com/fastenhealth/fasten-onprem/backend/pkg/models" "github.com/fastenhealth/fasten-sources/clients/factory" sourceModels "github.com/fastenhealth/fasten-sources/clients/models" sourcePkg "github.com/fastenhealth/fasten-sources/pkg" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" "net/http" "strconv" "time" ) // BackgroundJobSyncResources is a background job that syncs all FHIR resource for a given source // It is a blocking function that will return only when the sync is complete or has failed // It will create a background job and associate it with the source // It will also update the access token and refresh token if they have been updated // It will return the sync summary and error if any // TODO: run in background thread, or use https://gobyexample.com/tickers // TODO: use goroutine to truely run in the background (how will that work with DatabaseRepository, is that thread safe?) Mutex needed? func BackgroundJobSyncResources(parentContext context.Context, logger *logrus.Entry, databaseRepo database.DatabaseRepository, sourceCred *models.SourceCredential) (sourceModels.UpsertSummary, error) { var resultErr error var backgroundJob *models.BackgroundJob //Begin Sync JobStatus update process //1. Check if the source is already syncing if sourceCred.LatestBackgroundJob != nil && sourceCred.LatestBackgroundJob.JobStatus == pkg.BackgroundJobStatusLocked { logger.Errorln("Sync operation already in progress, cannot continue.") return sourceModels.UpsertSummary{}, fmt.Errorf("sync operation already in progress, cannot continue") } //since there's no sync in progress, lets create a new background job //2. Create a new background job backgroundJob = models.NewSyncBackgroundJob(*sourceCred) err := databaseRepo.CreateBackgroundJob(parentContext, backgroundJob) if err != nil { resultErr = fmt.Errorf("an error occurred while creating background job: %w", err) logger.Errorln(resultErr) return sourceModels.UpsertSummary{}, resultErr } backgroundJobContext := CreateBackgroundJobContext(parentContext, backgroundJob.ID.String()) //3. Update the source with the background job id sourceCred.LatestBackgroundJobID = &backgroundJob.ID err = databaseRepo.UpdateSource(backgroundJobContext, sourceCred) if err != nil { logger.Warn("An error occurred while registering background job id with source, ignoring", err) //we can safely ignore this error, because we'll be updating the status of the background job again later } // after creating the client, we should do a bulk import sourceClient, err := factory.GetSourceClient(sourcePkg.GetFastenLighthouseEnv(), sourceCred.SourceType, backgroundJobContext, logger, sourceCred) if err != nil { resultErr = fmt.Errorf("an error occurred while initializing hub client using source credential: %w", err) logger.Errorln(resultErr) return sourceModels.UpsertSummary{}, resultErr } // BEGIN FINALIZER defer func() { //finalizer function - update the sync status to completed (or failed depending on the error status) if sourceCred == nil { logger.Errorln("sync status finalizer unable to complete, SourceCredential is null, ignoring", err) return } else { //since we're finished with the sync (no matter the final status), we can clear the active background job id sourceCred.LatestBackgroundJobID = nil //this will also update the AccessToken & RefreshToken if they have been updated err := databaseRepo.UpdateSource(backgroundJobContext, sourceCred) if err != nil { logger.Errorln("sync status finalizer failed updating source, ignoring", err) } } //update the backgroundJob status to completed or failed if backgroundJob == nil { logger.Errorln("sync status finalizer unable to complete, BackgroundJob is null, ignoring", err) return } else { //first, try to update the background job with the latest data updatedBackgroundJob, err := databaseRepo.GetBackgroundJob(backgroundJobContext, backgroundJob.ID.String()) if err == nil { //replace the current background job, with the updated one. backgroundJob = updatedBackgroundJob } if resultErr == nil { backgroundJob.JobStatus = pkg.BackgroundJobStatusDone } else { //if there's an error that we need to store, lets unmarshal the data from the backgroundjob var backgroundJobSyncData models.BackgroundJobSyncData if backgroundJob.Data != nil { err = json.Unmarshal(backgroundJob.Data, &backgroundJobSyncData) } //ensure there's a map to store the error data if backgroundJobSyncData.ErrorData == nil { backgroundJobSyncData.ErrorData = map[string]interface{}{} } backgroundJobSyncData.ErrorData["final"] = resultErr.Error() //marshal the new background job data backgroundJob.Data, err = json.Marshal(backgroundJobSyncData) backgroundJob.JobStatus = pkg.BackgroundJobStatusFailed } now := time.Now() backgroundJob.DoneTime = &now backgroundJob.LockedTime = nil err = databaseRepo.UpdateBackgroundJob(backgroundJobContext, backgroundJob) if err != nil { logger.Errorln("sync status finalizer failed updating background job, ignoring", err) } } }() // END FINALIZER summary, err := sourceClient.SyncAll(databaseRepo) if err != nil { resultErr = fmt.Errorf("an error occurred while bulk importing resources from source: %w", err) logger.Errorln(resultErr) return summary, resultErr } //update source incase the access token/refresh token has been updated sourceCredential := sourceClient.GetSourceCredential() sourceCredentialConcrete, ok := sourceCredential.(*models.SourceCredential) if !ok { resultErr = fmt.Errorf("an error occurred while updating source credential, source credential is not of type *models.SourceCredential") logger.Errorln(resultErr) return summary, resultErr } sourceCred = sourceCredentialConcrete //updated credentials will be saved by the finalizer return summary, resultErr } // Handlers func ListBackgroundJobs(c *gin.Context) { logger := c.MustGet(pkg.ContextKeyTypeLogger).(*logrus.Entry) databaseRepo := c.MustGet(pkg.ContextKeyTypeDatabase).(database.DatabaseRepository) backgroundJobQueryOptions := models.BackgroundJobQueryOptions{ Limit: pkg.ResourceListPageSize, } if len(c.Query("jobType")) > 0 { jobType := pkg.BackgroundJobType(c.Query("jobType")) backgroundJobQueryOptions.JobType = &jobType } if len(c.Query("status")) > 0 { status := pkg.BackgroundJobStatus(c.Query("status")) backgroundJobQueryOptions.Status = &status } if len(c.Query("page")) > 0 { pageNumb, err := strconv.Atoi(c.Query("page")) if err != nil { logger.Errorln("An error occurred while calculating page number", err) c.JSON(http.StatusInternalServerError, gin.H{"success": false}) return } backgroundJobQueryOptions.Offset = pageNumb * backgroundJobQueryOptions.Limit } backgroundJobs, err := databaseRepo.ListBackgroundJobs(c, backgroundJobQueryOptions) if err != nil { logger.Errorln("An error occurred while retrieving resources", err) c.JSON(http.StatusInternalServerError, gin.H{"success": false}) return } c.JSON(http.StatusOK, gin.H{"success": true, "data": backgroundJobs}) } // Utilities func GetBackgroundContext(ginContext *gin.Context) context.Context { return context.WithValue(context.Background(), pkg.ContextKeyTypeAuthUsername, ginContext.Value(pkg.ContextKeyTypeAuthUsername).(string)) } func CreateBackgroundJobContext(parentContext context.Context, backgroundJobId string) context.Context { return context.WithValue(parentContext, pkg.ContextKeyTypeBackgroundJobID, backgroundJobId) }