fix some data races

This commit is contained in:
Cyberes 2024-02-03 19:46:07 -07:00
parent 665a2e8c18
commit e1e6e2cbc2
11 changed files with 60 additions and 43 deletions

View File

@ -15,8 +15,8 @@ const maxFinishedCrawls = 100
var activeCrawls = make(map[string]*ActiveCrawl) var activeCrawls = make(map[string]*ActiveCrawl)
var finishedCrawls = make([]FinishedCrawl, 0, maxFinishedCrawls) var finishedCrawls = make([]FinishedCrawl, 0, maxFinishedCrawls)
var activeCrawlsMutex = &sync.Mutex{} var activeCrawlsMutex = &sync.RWMutex{}
var finishedCrawlsMutex = &sync.Mutex{} var finishedCrawlsMutex = &sync.RWMutex{}
type ActiveCrawl struct { type ActiveCrawl struct {
Path string `json:"path"` Path string `json:"path"`
@ -35,7 +35,6 @@ type FinishedCrawl struct {
type DirectoryCrawler struct { type DirectoryCrawler struct {
visited sync.Map visited sync.Map
wg sync.WaitGroup wg sync.WaitGroup
mu sync.Mutex // lock for the visted map
} }
func NewDirectoryCrawler() *DirectoryCrawler { func NewDirectoryCrawler() *DirectoryCrawler {
@ -104,26 +103,28 @@ func (dc *DirectoryCrawler) startCrawl(path string, function string) bool {
return true return true
} }
func (dc *DirectoryCrawler) endCrawl(path string) { func (dc *DirectoryCrawler) endCrawl(path string, function string) {
activeCrawlsMutex.Lock() activeCrawlsMutex.Lock()
finishedCrawlsMutex.Lock()
defer activeCrawlsMutex.Unlock() defer activeCrawlsMutex.Unlock()
finishedCrawlsMutex.Lock()
defer finishedCrawlsMutex.Unlock() defer finishedCrawlsMutex.Unlock()
if len(finishedCrawls) >= maxFinishedCrawls { if len(finishedCrawls) >= maxFinishedCrawls {
finishedCrawls = finishedCrawls[1:] finishedCrawls = finishedCrawls[1:]
} }
finishedCrawls = append(finishedCrawls, FinishedCrawl{ if activeCrawl, ok := activeCrawls[path]; ok && activeCrawl.Function == function {
Path: path, finishedCrawls = append(finishedCrawls, FinishedCrawl{
Start: activeCrawls[path].Start, Path: path,
Elapsed: int64(time.Since(time.Unix(activeCrawls[path].Start, 0)).Seconds()), Start: activeCrawl.Start,
Function: activeCrawls[path].Function, Elapsed: int64(time.Since(time.Unix(activeCrawl.Start, 0)).Seconds()),
}) Function: activeCrawl.Function,
delete(activeCrawls, path) })
delete(activeCrawls, path)
}
} }
func (dc *DirectoryCrawler) IsCrawlActive(path string, function *string) bool { func (dc *DirectoryCrawler) IsCrawlActive(path string, function *string) bool {
activeCrawlsMutex.Lock() activeCrawlsMutex.RLock()
defer activeCrawlsMutex.Unlock() defer activeCrawlsMutex.RUnlock()
if crawl, active := activeCrawls[path]; active { if crawl, active := activeCrawls[path]; active {
return crawl.Function == *function return crawl.Function == *function
} }
@ -131,23 +132,29 @@ func (dc *DirectoryCrawler) IsCrawlActive(path string, function *string) bool {
} }
func GetActiveCrawls() map[string]*ActiveCrawl { func GetActiveCrawls() map[string]*ActiveCrawl {
activeCrawlsMutex.Lock() activeCrawlsMutex.RLock()
defer activeCrawlsMutex.Unlock() defer activeCrawlsMutex.RUnlock()
activeCrawlsCopy := make(map[string]*ActiveCrawl)
for path := range activeCrawls { for path := range activeCrawls {
a := activeCrawls[path] a := activeCrawls[path]
a.Elapsed = int64(time.Since(time.Unix(a.Start, 0)).Seconds()) a.Elapsed = int64(time.Since(time.Unix(a.Start, 0)).Seconds())
activeCrawlsCopy[path] = a
} }
return activeCrawls return activeCrawlsCopy
} }
func GetFinishedCrawls() []FinishedCrawl { func GetFinishedCrawls() []FinishedCrawl {
finishedCrawlsMutex.Lock() finishedCrawlsMutex.RLock()
defer finishedCrawlsMutex.Unlock() defer finishedCrawlsMutex.RUnlock()
return finishedCrawls finishedCrawlsCopy := make([]FinishedCrawl, 0, maxFinishedCrawls)
for k, v := range finishedCrawls {
finishedCrawlsCopy[k] = v
}
return finishedCrawlsCopy
} }
func GetTotalActiveCrawls() int { func GetTotalActiveCrawls() int {
finishedCrawlsMutex.Lock() activeCrawlsMutex.RLock()
defer finishedCrawlsMutex.Unlock() defer activeCrawlsMutex.RUnlock()
return len(activeCrawls) return len(activeCrawls)
} }

View File

@ -54,13 +54,13 @@ func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.File
fullName := fn.Name() fullName := fn.Name()
parts := strings.Split(fullName, ".") parts := strings.Split(fullName, ".")
funcName := parts[len(parts)-1] funcName := parts[len(parts)-1]
cleanFuncName := strings.TrimSuffix(funcName, "-fm") cleanFuncName := strings.TrimSuffix(funcName, "Func-fm")
readyToStart := dc.startCrawl(fullPath, strings.TrimSuffix(cleanFuncName, "Func")) readyToStart := dc.startCrawl(fullPath, cleanFuncName)
if !readyToStart { if !readyToStart {
return errors.New(fmt.Sprintf(`rejecting crawl, already in progress for "%s"`, fullPath)) return errors.New(fmt.Sprintf(`rejecting crawl, already in progress for "%s"`, fullPath))
} }
defer dc.endCrawl(fullPath) defer dc.endCrawl(fullPath, cleanFuncName)
info, err := os.Lstat(fullPath) info, err := os.Lstat(fullPath)
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -110,7 +110,7 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item,
if !readyToStart { if !readyToStart {
return nil, errors.New(fmt.Sprintf(`rejecting crawl, already in progress for "%s"`, fullPath)) return nil, errors.New(fmt.Sprintf(`rejecting crawl, already in progress for "%s"`, fullPath))
} }
defer dc.endCrawl(fullPath) defer dc.endCrawl(fullPath, "walkNonRecursive")
info, err := os.Lstat(fullPath) info, err := os.Lstat(fullPath)
if os.IsNotExist(err) { if os.IsNotExist(err) {

View File

@ -33,10 +33,9 @@ type ResponseItem struct {
} }
func NewResponseItem(cacheItem *CacheItem.Item) *ResponseItem { func NewResponseItem(cacheItem *CacheItem.Item) *ResponseItem {
var debugChildItem *CacheItem.Item
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.Fatalf("Recovered from panic: %s - %s - %s - %s", r, cacheItem.Path, debugChildItem, debug.Stack()) log.Fatalf("Recovered from panic: %s - %+v - %s", r, cacheItem, debug.Stack())
} }
}() }()
@ -60,15 +59,11 @@ func NewResponseItem(cacheItem *CacheItem.Item) *ResponseItem {
for _, child := range cacheItem.Children { for _, child := range cacheItem.Children {
childItem, found := SharedCache.Cache.Get(child) childItem, found := SharedCache.Cache.Get(child)
// TODO: remove // Do a quick crawl since the path could have been modified, since the last crawl.
debugChildItem = childItem
// Do a quick crawl since the path could have been modfied since the last crawl.
// This also be triggered if we encounter a broken symlink. We don't check for broken symlinks when scanning // This also be triggered if we encounter a broken symlink. We don't check for broken symlinks when scanning
// because that would be an extra os.Lstat() call in processPath(). // because that would be an extra os.Lstat() call in processPath().
if !found { if !found {
log.Debugf("CRAWLER - %s not in cache, crawling", child) log.Debugf("CRAWLER - %s not in cache, crawling", child)
dc := DirectoryCrawler.NewDirectoryCrawler() dc := DirectoryCrawler.NewDirectoryCrawler()
item, err := dc.CrawlNoRecursion(filepath.Join(config.GetConfig().RootDir, child)) item, err := dc.CrawlNoRecursion(filepath.Join(config.GetConfig().RootDir, child))
if err != nil { if err != nil {

View File

@ -15,7 +15,7 @@ type WalkJob struct {
// WalkJobQueue is the Queue that workers pull jobs from. // WalkJobQueue is the Queue that workers pull jobs from.
type WalkJobQueue struct { type WalkJobQueue struct {
jobs []WalkJob jobs []WalkJob
mutex sync.Mutex mutex sync.RWMutex
cond *sync.Cond cond *sync.Cond
} }
@ -28,8 +28,8 @@ func NewJobQueue() *WalkJobQueue {
// AddJob adds a job to the queue and signals the workers so they know to pick it up. // AddJob adds a job to the queue and signals the workers so they know to pick it up.
func (q *WalkJobQueue) AddJob(job WalkJob) { func (q *WalkJobQueue) AddJob(job WalkJob) {
q.mutex.Lock() q.mutex.Lock()
defer q.mutex.Unlock()
q.jobs = append(q.jobs, job) q.jobs = append(q.jobs, job)
q.mutex.Unlock()
q.cond.Signal() q.cond.Signal()
} }
@ -50,5 +50,7 @@ func (q *WalkJobQueue) GetJob() WalkJob {
// GetQueueSize returns the size of the queue. // GetQueueSize returns the size of the queue.
func (q *WalkJobQueue) GetQueueSize() int { func (q *WalkJobQueue) GetQueueSize() int {
q.mutex.RLock()
defer q.mutex.RUnlock()
return len(q.jobs) return len(q.jobs)
} }

View File

@ -25,16 +25,15 @@ func InitializeWorkers() {
// worker processes jobs forever. // worker processes jobs forever.
func worker() { func worker() {
for { for {
// Get a job from the queue. This is thread-safe because `GetJob()` locks the queue while reading.
job := Queue.GetJob() job := Queue.GetJob()
atomic.AddInt32(&BusyWorkers, 1) // increment the number of active Workers atomic.AddInt32(&BusyWorkers, 1)
err := job.Walker.processPath(job.StartPath) err := job.Walker.processPath(job.StartPath)
if err != nil { if err != nil {
log.Warnf("Workers - %s - %s", job.StartPath, err) log.Warnf("Workers - %s - %s", job.StartPath, err)
} }
job.Walker.wg.Done() // decrement the WaitGroup counter job.Walker.wg.Done()
atomic.AddInt32(&BusyWorkers, -1) // decrement the number of active Workers atomic.AddInt32(&BusyWorkers, -1)
} }
} }

View File

@ -9,6 +9,7 @@ import (
"crypto/sha256" "crypto/sha256"
"crypto/subtle" "crypto/subtle"
"net/http" "net/http"
"sync/atomic"
) )
func AdminCrawlsInfo(w http.ResponseWriter, r *http.Request) { func AdminCrawlsInfo(w http.ResponseWriter, r *http.Request) {
@ -41,7 +42,7 @@ func AdminCrawlsInfo(w http.ResponseWriter, r *http.Request) {
"finished": DirectoryCrawler.GetFinishedCrawls(), "finished": DirectoryCrawler.GetFinishedCrawls(),
}, },
"crawlWorkers": map[string]interface{}{ "crawlWorkers": map[string]interface{}{
"busy": Workers.BusyWorkers, "busy": atomic.LoadInt32(&Workers.BusyWorkers),
"alive": config.GetConfig().DirectoryCrawlers, "alive": config.GetConfig().DirectoryCrawlers,
}, },
"queue": map[string]interface{}{ "queue": map[string]interface{}{

4
src/build.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/bash
mkdir -p ../dist
go build -v -trimpath -ldflags "-s -w" -tags "$(date -u)" -o ../dist/crazyfs

View File

@ -7,6 +7,7 @@ import (
"crazyfs/config" "crazyfs/config"
"crazyfs/elastic" "crazyfs/elastic"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -55,11 +56,11 @@ func logCacheStatus(msg string, ticker *time.Ticker, logFn func(format string, a
if !config.GetConfig().ElasticsearchSyncEnable || InitialCrawlInProgress { if !config.GetConfig().ElasticsearchSyncEnable || InitialCrawlInProgress {
logStr := "%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d." logStr := "%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d."
logFn(logStr, logFn(logStr,
msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, Workers.BusyWorkers, Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls()) msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&Workers.BusyWorkers), Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls())
} else { } else {
logStr := "%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d. Busy Elastic delete workers: %d. Elastic deletes queued: %d" logStr := "%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d. Busy Elastic delete workers: %d. Elastic deletes queued: %d"
logFn(logStr, logFn(logStr,
msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, Workers.BusyWorkers, Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls(), elastic.BusyWorkers, elastic.Queue.GetQueueSize()) msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&Workers.BusyWorkers), Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls(), elastic.BusyWorkers, elastic.Queue.GetQueueSize())
} }
} }
} }

View File

@ -4,11 +4,15 @@ import (
"crazyfs/DirectoryCrawler" "crazyfs/DirectoryCrawler"
"crazyfs/config" "crazyfs/config"
"crazyfs/logging" "crazyfs/logging"
"sync"
"time" "time"
) )
var InitialCrawlInProgress bool var InitialCrawlInProgress bool
// InitialCrawlLock is used only when the initial crawl is in progress to pause the program execution until it finishes.
var InitialCrawlLock sync.RWMutex
func init() { func init() {
InitialCrawlInProgress = false InitialCrawlInProgress = false
} }
@ -21,6 +25,7 @@ func InitialCrawl() {
ticker := time.NewTicker(3 * time.Second) ticker := time.NewTicker(3 * time.Second)
go logCacheStatus("INITIAL CRAWL", ticker, log.Infof) go logCacheStatus("INITIAL CRAWL", ticker, log.Infof)
InitialCrawlLock.Lock()
InitialCrawlInProgress = true InitialCrawlInProgress = true
dc := DirectoryCrawler.NewDirectoryCrawler() dc := DirectoryCrawler.NewDirectoryCrawler()
//start := time.Now() //start := time.Now()
@ -29,6 +34,7 @@ func InitialCrawl() {
log.Errorf("LIST - background recursive crawl failed: %s", err) log.Errorf("LIST - background recursive crawl failed: %s", err)
} }
InitialCrawlInProgress = false InitialCrawlInProgress = false
InitialCrawlLock.Unlock()
ticker.Stop() ticker.Stop()
//log.Infof("INITIAL CRAWL - finished the initial crawl in %s", time.Since(start).Round(time.Second)) //log.Infof("INITIAL CRAWL - finished the initial crawl in %s", time.Since(start).Round(time.Second))
} }

View File

@ -110,10 +110,12 @@ func main() {
// This could take a minute, so we do this in the background while we crawl. // This could take a minute, so we do this in the background while we crawl.
go func() { go func() {
elastic.EnableElasticsearchConnection() elastic.EnableElasticsearchConnection()
cache.InitialCrawlLock.RLock()
for cache.InitialCrawlInProgress { for cache.InitialCrawlInProgress {
// Sleep while the initial crawl is running. // Sleep while the initial crawl is running.
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
cache.InitialCrawlLock.RUnlock()
if !cliArgs.disableElasticSync || !cfg.ElasticsearchSyncEnable { if !cliArgs.disableElasticSync || !cfg.ElasticsearchSyncEnable {
go elastic.SyncThread() go elastic.SyncThread()
log.Info("Started the background Elasticsearch sync thread.") log.Info("Started the background Elasticsearch sync thread.")

View File

@ -10,7 +10,7 @@ import (
) )
func startRemoveStaleItemsFromElasticsearch(pathsByKey map[string]string) { func startRemoveStaleItemsFromElasticsearch(pathsByKey map[string]string) {
log.Debugln("ELASTIC - Checking for removed items...") log.Debugln("ELASTIC - Checking for deleted items that need to be removed from Elastic...")
// TODO: use waitgroups here so we know when all the jobs are done and we can erase globalKeysByPath and globalPathsByKey // TODO: use waitgroups here so we know when all the jobs are done and we can erase globalKeysByPath and globalPathsByKey