move elastic sync to workers instead of threads, parallel elastic delete sync, reimplement partial elastic sync

This commit is contained in:
Cyberes 2023-12-13 14:21:47 -07:00
parent d16eaf614e
commit e9db83f09b
11 changed files with 216 additions and 101 deletions

View File

@ -5,6 +5,7 @@ import (
"crazyfs/Workers"
"crazyfs/api/helpers"
"crazyfs/config"
"crazyfs/elastic"
"crypto/sha256"
"crypto/subtle"
"net/http"
@ -37,6 +38,13 @@ func AdminCrawlsInfo(w http.ResponseWriter, r *http.Request) {
"size": Workers.Queue.GetQueueSize(),
},
"initialCrawlElapsed": config.InitialCrawlElapsed,
"elastic": map[string]interface{}{
"busy": elastic.BusyWorkers,
"alive": config.GetConfig().ElasticsearchSyncThreads,
"queue": map[string]interface{}{
"size": elastic.Queue.GetQueueSize(),
},
},
}
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("Content-Type", "application/json")

10
src/cache/crawler.go vendored
View File

@ -5,6 +5,7 @@ import (
"crazyfs/SharedCache"
"crazyfs/Workers"
"crazyfs/config"
"crazyfs/elastic"
"sync"
"time"
)
@ -51,7 +52,14 @@ func startCrawl(wg *sync.WaitGroup, crawlerChan chan struct{}) {
func logCacheStatus(msg string, ticker *time.Ticker, logFn func(format string, args ...interface{})) {
defer ticker.Stop()
for range ticker.C {
logFn("%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d.",
if !config.GetConfig().ElasticsearchSyncEnable {
logStr := "%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d."
logFn(logStr,
msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, Workers.BusyWorkers, Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls())
} else {
logStr := "%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d. Busy Elastic sync workers: %d. Elastic sync queued: %d"
logFn(logStr,
msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, Workers.BusyWorkers, Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls(), elastic.BusyWorkers, elastic.Queue.GetQueueSize())
}
}
}

View File

@ -143,7 +143,7 @@ func main() {
elastic.ElasticClient = es
if cfg.ElasticsearchSyncEnable && !cliArgs.disableElasticSync {
go elastic.ElasticsearchThread()
go elastic.SyncThread()
log.Info("Started the background Elasticsearch sync thread.")
} else {
log.Info("The background Elasticsearch sync thread is disabled.")

View File

@ -0,0 +1,49 @@
package elastic
import "sync"
// More or less like the other queue implementation.
type DeleteJob struct {
Key string
}
type DeleteJobQueue struct {
jobs []DeleteJob
mutex sync.Mutex
cond *sync.Cond
}
func NewJobQueue() *DeleteJobQueue {
q := &DeleteJobQueue{}
q.cond = sync.NewCond(&q.mutex)
return q
}
// AddJob adds a job to the queue and signals the workers so they know to pick it up.
func (q *DeleteJobQueue) AddJob(job DeleteJob) {
q.mutex.Lock()
q.jobs = append(q.jobs, job)
q.mutex.Unlock()
q.cond.Signal()
}
// GetJob is how a worker pulls a job from the queue.
func (q *DeleteJobQueue) GetJob() DeleteJob {
q.mutex.Lock()
defer q.mutex.Unlock()
for len(q.jobs) == 0 {
q.cond.Wait()
}
job := q.jobs[0]
q.jobs = q.jobs[1:]
return job
}
// GetQueueSize returns the size of the queue.
func (q *DeleteJobQueue) GetQueueSize() int {
return len(q.jobs)
}

View File

@ -0,0 +1,35 @@
package elastic
import (
"crazyfs/SharedCache"
"crazyfs/config"
"sync/atomic"
)
// BusyWorkers is an atomic counter for the number of active Workers
var BusyWorkers int32
// InitializeWorkers starts the number of Workers defined by the config.
func InitializeWorkers() {
Queue = NewJobQueue()
for n := 1; n <= config.GetConfig().ElasticsearchSyncThreads; n++ {
go worker()
}
log.Debugf("ELASTIC - Started %d sync workers.", config.GetConfig().ElasticsearchSyncThreads)
}
// worker processes jobs forever.
func worker() {
for {
job := Queue.GetJob()
atomic.AddInt32(&BusyWorkers, 1)
if _, ok := SharedCache.Cache.Get(job.Key); !ok {
// If a key does not exist in the LRU cache, delete it from Elasticsearch
deleteFromElasticsearch(job.Key)
log.Debugf(`ELASTIC - Removed key "%s"`, job.Key)
}
atomic.AddInt32(&BusyWorkers, -1)
}
}

View File

@ -0,0 +1,94 @@
package elastic
import (
"crazyfs/DirectoryCrawler"
"crazyfs/config"
"sync"
"time"
)
var Queue *DeleteJobQueue
var syncLock sync.Mutex
func SyncThread() {
Queue = NewJobQueue()
InitializeWorkers()
createCrazyfsIndex()
// Test connection to Elastic.
esSize, err := getElasticSize()
if err != nil {
logElasticConnError(err)
return
}
log.Infof(`ELASTIC - index "%s" contains %d items.`, config.GetConfig().ElasticsearchIndex, esSize)
// Run a partial sync at startup, unless configured to run a full one.
syncElasticsearch(false)
ticker := time.NewTicker(time.Duration(config.GetConfig().ElasticsearchSyncInterval) * time.Second)
fullSyncTicker := time.NewTicker(time.Duration(config.GetConfig().ElasticsearchFullSyncInterval) * time.Second)
for {
select {
case <-ticker.C:
syncElasticsearch(false)
case <-fullSyncTicker.C:
syncElasticsearch(true)
}
}
}
// TODO: make this use workers instead of starting a million threads
// TODO: have the workers exit when the sync job is finished
func syncElasticsearch(doFullSync bool) {
// Only one sync at a time. Also helps to prevent races with the global variables.
syncLock.Lock()
var syncType string
if fullSync {
ElasticRefreshSyncRunning = true
syncType = "full refresh"
} else {
ElasticNewSyncRunning = true
syncType = "refresh"
}
log.Infof("ELASTIC - started a %s sync.", syncType)
start := time.Now()
startRemoveStaleItemsFromElasticsearch()
// Set global variables for the workers to read.
fullSync = doFullSync
var err error
existingKeys, err = getPathsFromIndex()
if err != nil {
log.Errorf("ELASTIC - Error retrieving keys from Elasticsearch: %s", err)
return
}
dc := DirectoryCrawler.NewDirectoryCrawler()
err = dc.Crawl(config.GetConfig().RootDir, addToElasticsearch)
if err != nil {
log.Errorf("ELASTIC - crawl failed: %s", err)
return
}
duration := time.Since(start)
log.Infof("ELASTIC - %s sync finished in %s", syncType, duration)
syncLock.Unlock()
}
func logElasticConnError(err error) {
log.Errorf("ELASTIC - Failed to read the index: %s", err)
LogElasticQuit()
}
func LogElasticQuit() {
log.Errorln("ELASTIC - background thread exiting, Elastic indexing and search will not be available.")
}

View File

@ -1,62 +0,0 @@
package elastic
import (
"crazyfs/DirectoryCrawler"
"crazyfs/config"
"time"
)
func ElasticsearchThread() {
createCrazyfsIndex()
// Test connection to Elastic.
esSize, err := getElasticSize()
if err != nil {
logElasticConnError(err)
return
}
log.Infof(`ELASTIC - index "%s" contains %d items.`, config.GetConfig().ElasticsearchIndex, esSize)
// Run a partial sync at startup, unless configured to run a full one.
syncElasticsearch()
ticker := time.NewTicker(time.Duration(config.GetConfig().ElasticsearchSyncInterval) * time.Second)
for {
select {
case <-ticker.C:
syncElasticsearch()
}
}
}
// TODO: make this use workers instead of starting a million threads
// TODO: have the workers exit when the sync job is finished
func syncElasticsearch() {
log.Infof("ELASTIC - started syncing.")
start := time.Now()
dc := DirectoryCrawler.NewDirectoryCrawler()
err := dc.Crawl(config.GetConfig().RootDir, addToElasticsearch)
if err != nil {
log.Errorf("ELASTIC - crawl failed: %s", err)
return
}
// TODO: use workers for this
log.Debugln("ELASTIC - Checking for removed items...")
removeStaleItemsFromElasticsearch()
duration := time.Since(start)
log.Infof("ELASTIC - sync finished in %s", duration)
}
func logElasticConnError(err error) {
log.Errorf("ELASTIC - Failed to read the index: %s", err)
LogElasticQuit()
}
func LogElasticQuit() {
log.Errorln("ELASTIC - background thread exiting, Elastic indexing and search will not be available.")
}

View File

@ -10,8 +10,17 @@ import (
"encoding/json"
"github.com/elastic/go-elasticsearch/v8/esapi"
"os"
"slices"
)
// existingKeys is a global variable called by the Walker callback: addToElasticsearch().
// It is set only by syncElasticsearch() when a sync is started. Only one sync can run at a time.
// A global is needed since there is no way to pass variables like this to the workers.
var existingKeys []string
// fullSync is another global variable accessed by the workers and set by syncElasticsearch()
var fullSync bool
func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) error {
key := file.StripRootDir(fullPath)
cacheItem, found := SharedCache.Cache.Get(key)
@ -19,9 +28,11 @@ func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) er
log.Fatalf(`ELASTICSEARCH - Could not fetch item "%s" from the LRU cache!`, key)
} else {
if !shouldExclude(key, config.GetConfig().ElasticsearchExcludePatterns) {
if fullSync {
preformAddToElasticsearch(cacheItem)
} else {
deleteFromElasticsearch(key) // clean up
} else if !slices.Contains(existingKeys, key) {
preformAddToElasticsearch(cacheItem)
}
}
}
return nil

View File

@ -2,14 +2,12 @@ package elastic
import (
"context"
"crazyfs/SharedCache"
"crazyfs/config"
"encoding/json"
"github.com/elastic/go-elasticsearch/v8/esapi"
"sync"
)
func removeStaleItemsFromElasticsearch() {
func startRemoveStaleItemsFromElasticsearch() {
// Retrieve all keys from Elasticsearch
keys, err := getPathsFromIndex()
if err != nil {
@ -17,37 +15,12 @@ func removeStaleItemsFromElasticsearch() {
return
}
// Create a buffered channel as a semaphore
sem := make(chan struct{}, config.GetConfig().ElasticsearchSyncThreads)
log.Debugln("ELASTIC - Checking for removed items...")
// Create a wait group to wait for all goroutines to finish
var wg sync.WaitGroup
// For each key in Elasticsearch, check if it exists in the LRU cache
// For each key in Elasticsearch, create a job to check (and remove it if the key no longer exists in the cache).
for _, key := range keys {
// Increment the wait group counter
wg.Add(1)
// Acquire a semaphore
sem <- struct{}{}
go func(key string) {
// Ensure the semaphore is released and the wait group counter is decremented when the goroutine finishes
defer func() {
<-sem
wg.Done()
}()
if _, ok := SharedCache.Cache.Get(key); !ok {
// If a key does not exist in the LRU cache, delete it from Elasticsearch
deleteFromElasticsearch(key)
log.Debugf(`ELASTIC - Removed key "%s"`, key)
go Queue.AddJob(DeleteJob{Key: key})
}
}(key)
}
// Wait for all goroutines to finish
wg.Wait()
}
func deleteFromElasticsearch(key string) {

View File

@ -22,7 +22,7 @@ func getPathsFromIndex() ([]string, error) {
// This may take a bit if the index is very large, so avoid calling this.
// Print a debug message so the user doesn't think we're frozen.
log.Debugln("Fetching indexed paths from Elasticsearch...")
log.Debugln("ELASTIC - Fetching indexed paths from Elasticsearch...")
var paths []string
var r map[string]interface{}

View File

@ -1,4 +1,3 @@
- Some way for the add to elastic callback to skip existing keys if not doing a full search
Later:
- Add a wildcard option to restricted_download_paths to block all sub-directories