clean up directory crawler, fix active workers number, minor bugfixes

This commit is contained in:
Cyberes 2024-03-17 18:03:18 -06:00
parent 3f305b104b
commit c0faaab3f8
8 changed files with 18 additions and 30 deletions

View File

@ -165,7 +165,7 @@ func main() {
log.Infof("Server started on port %s", cfg.HTTPPort) log.Infof("Server started on port %s", cfg.HTTPPort)
if cliArgs.initialCrawl || cfg.InitialCrawl { if cliArgs.initialCrawl || cfg.InitialCrawl {
log.Infof(`Preforming initial crawl for "%s"`, config.GetConfig().RootDir) log.Infof(`Performing initial crawl for "%s"`, config.GetConfig().RootDir)
start := time.Now() start := time.Now()
err := cache.InitialCrawl() err := cache.InitialCrawl()
if err != nil { if err != nil {

View File

@ -34,28 +34,16 @@ type FinishedCrawl struct {
} }
type DirectoryCrawler struct { type DirectoryCrawler struct {
visited sync.Map wg sync.WaitGroup
wg sync.WaitGroup queue *queuedwalk.JobQueue
queue *queuedwalk.JobQueue
} }
func NewDirectoryCrawler(queue *queuedwalk.JobQueue) *DirectoryCrawler { func NewDirectoryCrawler(queue *queuedwalk.JobQueue) *DirectoryCrawler {
return &DirectoryCrawler{ return &DirectoryCrawler{
visited: sync.Map{}, queue: queue,
queue: queue,
} }
} }
func (dc *DirectoryCrawler) CleanupDeletedFiles(path string) {
dc.visited.Range(func(key, value interface{}) bool {
keyStr := key.(string)
if isSubpath(file.StripRootDir(path), keyStr) && value.(bool) {
sharedcache.Cache.Remove(keyStr)
}
return true
})
}
func (dc *DirectoryCrawler) addCacheItem(fullPath string, info os.FileInfo) error { func (dc *DirectoryCrawler) addCacheItem(fullPath string, info os.FileInfo) error {
strippedPath := file.StripRootDir(fullPath) strippedPath := file.StripRootDir(fullPath)
item, err := cacheitem.NewItem(fullPath, info) item, err := cacheitem.NewItem(fullPath, info)

View File

@ -11,11 +11,14 @@ func InitializeDirectoryCrawlerWorkers() *globals.DcWorkers {
if globals.DirectoryCrawlers != nil { if globals.DirectoryCrawlers != nil {
panic("DirectoryCrawlers has already been defined!") panic("DirectoryCrawlers has already been defined!")
} }
dcWorkers := workers.InitializeWorkers(directoryCrawlerWorker) dcWorkers := workers.InitializeWorkers(directoryCrawlerWorker) // *workers.CrawlWorkers
d := &globals.DcWorkers{} d := &globals.DcWorkers{}
// Copy the fields given to us by InitializeWorkers() to the global object.
d.Queue = dcWorkers.Queue d.Queue = dcWorkers.Queue
d.BusyWorkers = dcWorkers.BusyWorkers dcWorkers.BusyWorkers = &d.BusyWorkers
globals.DirectoryCrawlers = d globals.DirectoryCrawlers = d
log.Debugf("CRAWLERS - Started %d directory crawler workers.", config.GetConfig().DirectoryCrawlers) log.Debugf("CRAWLERS - Started %d directory crawler workers.", config.GetConfig().DirectoryCrawlers)
return d return d
} }
@ -24,7 +27,7 @@ func directoryCrawlerWorker(w *workers.CrawlWorkers) {
// Reminder that this worker type does not support shutdown // Reminder that this worker type does not support shutdown
for { for {
job := w.Queue.GetJob() job := w.Queue.GetJob()
atomic.AddInt32(&w.BusyWorkers, 1) atomic.AddInt32(w.BusyWorkers, 1)
err := job.Walker.ReadPathAndQueue(job.StartPath) err := job.Walker.ReadPathAndQueue(job.StartPath)
if err != nil { if err != nil {
@ -32,6 +35,6 @@ func directoryCrawlerWorker(w *workers.CrawlWorkers) {
} }
job.Walker.Wg.Done() job.Walker.Wg.Done()
atomic.AddInt32(&w.BusyWorkers, -1) atomic.AddInt32(w.BusyWorkers, -1)
} }
} }

View File

@ -11,9 +11,6 @@ import (
func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error { func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error {
relPath := file.StripRootDir(fullPath) relPath := file.StripRootDir(fullPath)
dc.visited.Store(relPath, true)
if info.Mode().IsDir() { if info.Mode().IsDir() {
dirItem, err := cacheitem.NewItem(fullPath, info) dirItem, err := cacheitem.NewItem(fullPath, info)
if err != nil { if err != nil {

View File

@ -20,7 +20,7 @@ func InitializeElasticCrawlerWorkers() *globals.DcWorkers {
deleteWorkers := workers.InitializeWorkers(elasticDeleteWorker) deleteWorkers := workers.InitializeWorkers(elasticDeleteWorker)
d := &globals.DcWorkers{} d := &globals.DcWorkers{}
d.Queue = deleteWorkers.Queue d.Queue = deleteWorkers.Queue
d.BusyWorkers = deleteWorkers.BusyWorkers deleteWorkers.BusyWorkers = &d.BusyWorkers
globals.ElasticCrawlers = d globals.ElasticCrawlers = d
log.Debugf("CRAWLERS - Started %d Elasticsearch sync workers.", config.GetConfig().ElasticsearchSyncThreads) log.Debugf("CRAWLERS - Started %d Elasticsearch sync workers.", config.GetConfig().ElasticsearchSyncThreads)
return d return d
@ -37,7 +37,7 @@ func elasticDeleteWorker(w *workers.CrawlWorkers) {
return return
} }
atomic.AddInt32(&w.BusyWorkers, 1) atomic.AddInt32(w.BusyWorkers, 1)
if job.Extra == nil { if job.Extra == nil {
// Jobs without any extras are the standard Walk jobs that add items to Elastic. // Jobs without any extras are the standard Walk jobs that add items to Elastic.
@ -64,6 +64,6 @@ func elasticDeleteWorker(w *workers.CrawlWorkers) {
panic(task) panic(task)
} }
} }
atomic.AddInt32(&w.BusyWorkers, -1) atomic.AddInt32(w.BusyWorkers, -1)
} }
} }

View File

@ -85,7 +85,7 @@ func syncElasticsearch(doFullSync bool) {
return return
case <-ticker.C: case <-ticker.C:
elapsed := time.Since(start) elapsed := time.Since(start)
logStr := "ELASTIC - Sync in progress. Elapsed: %s. Busy Elastic delete workers: %d. Elastic deletes queued: %d" logStr := "ELASTIC - Sync in progress. Elapsed: %s. Busy workers: %d. Jobs queued: %d"
log.Debugf(logStr, elapsed, globals.ElasticCrawlers.BusyWorkers, globals.ElasticCrawlers.Queue.GetQueueSize()) log.Debugf(logStr, elapsed, globals.ElasticCrawlers.BusyWorkers, globals.ElasticCrawlers.Queue.GetQueueSize())
} }
} }
@ -128,7 +128,7 @@ func syncElasticsearch(doFullSync bool) {
} }
func logElasticConnError(err error) { func logElasticConnError(err error) {
log.Errorf("ELASTIC - Failed to read the index: %s", err.Error()) log.Errorf("ELASTIC - Failed to read the index: %s", err)
} }
// EnableElasticsearchConnection tests the connection to Elastic and enables the backend if it's successful. // EnableElasticsearchConnection tests the connection to Elastic and enables the backend if it's successful.

View File

@ -13,7 +13,7 @@ import (
func getElasticSize() (int, error) { func getElasticSize() (int, error) {
keysByPath, _, err := getPathsFromIndex() keysByPath, _, err := getPathsFromIndex()
if err != nil { if err != nil {
return -1, nil return -1, err
} }
return len(keysByPath), nil return len(keysByPath), nil
} }

View File

@ -11,7 +11,7 @@ type CrawlWorkerFunc func(workerData *CrawlWorkers)
type CrawlWorkers struct { type CrawlWorkers struct {
Queue *queuedwalk.JobQueue Queue *queuedwalk.JobQueue
BusyWorkers int32 BusyWorkers *int32
WorkerFunc CrawlWorkerFunc WorkerFunc CrawlWorkerFunc
} }