From 6377b8b6bcf306bab3292496039ff100a355822c Mon Sep 17 00:00:00 2001 From: Cyberes Date: Tue, 12 Dec 2023 17:26:39 -0700 Subject: [PATCH] move elastic crawlers to workers --- .../DirectoryCrawler/DirectoryCrawler.go | 14 +-- src/{cache => }/DirectoryCrawler/crawl.go | 34 +++---- .../vars.go => DirectoryCrawler/init.go} | 0 src/{cache => }/DirectoryCrawler/process.go | 7 +- src/ResponseItem/ResponseItem.go | 10 +- src/SharedCache/SharedCache.go | 30 ++++++ src/api/helpers/HandleFileNotFound.go | 16 ++-- src/api/helpers/zipstream.go | 9 +- src/api/httpRoutes.go | 10 +- src/api/routes/Download.go | 15 ++- src/api/routes/List.go | 14 +-- src/api/routes/Search.go | 5 +- src/api/routes/Thumbnail.go | 11 +-- src/api/routes/admin/AdminCacheInfo.go | 7 +- src/api/routes/admin/AdminCrawlsInfo.go | 6 +- src/api/routes/admin/AdminRecache.go | 6 +- src/api/routes/admin/AdminSysInfo.go | 4 +- src/api/routes/client/Health.go | 6 +- src/api/routes/client/Restricted.go | 4 +- src/cache/crawler.go | 31 +++---- src/cache/init.go | 12 +++ src/cache/initial.go | 12 +-- src/cache/recache.go | 24 ++--- src/cache/search.go | 16 ++-- src/config/config.go | 2 - src/crazyfs.go | 15 ++- src/elastic/ElasticsearchThread.go | 92 ++++--------------- src/elastic/add.go | 24 ++++- src/elastic/delete.go | 7 +- 29 files changed, 201 insertions(+), 242 deletions(-) rename src/{cache => }/DirectoryCrawler/DirectoryCrawler.go (90%) rename src/{cache => }/DirectoryCrawler/crawl.go (82%) rename src/{cache/DirectoryCrawler/vars.go => DirectoryCrawler/init.go} (100%) rename src/{cache => }/DirectoryCrawler/process.go (90%) create mode 100644 src/SharedCache/SharedCache.go create mode 100644 src/cache/init.go diff --git a/src/cache/DirectoryCrawler/DirectoryCrawler.go b/src/DirectoryCrawler/DirectoryCrawler.go similarity index 90% rename from src/cache/DirectoryCrawler/DirectoryCrawler.go rename to src/DirectoryCrawler/DirectoryCrawler.go index dc4330f..ff3f9bb 100644 --- a/src/cache/DirectoryCrawler/DirectoryCrawler.go +++ b/src/DirectoryCrawler/DirectoryCrawler.go @@ -2,8 +2,8 @@ package DirectoryCrawler import ( "crazyfs/CacheItem" + "crazyfs/SharedCache" "crazyfs/file" - lru "github.com/hashicorp/golang-lru/v2" "os" "path/filepath" "strings" @@ -31,28 +31,22 @@ type FinishedCrawl struct { } type DirectoryCrawler struct { - cache *lru.Cache[string, *CacheItem.Item] visited sync.Map wg sync.WaitGroup mu sync.Mutex // lock for the visted map } -func NewDirectoryCrawler(cache *lru.Cache[string, *CacheItem.Item]) *DirectoryCrawler { +func NewDirectoryCrawler() *DirectoryCrawler { return &DirectoryCrawler{ - cache: cache, visited: sync.Map{}, } } -func (dc *DirectoryCrawler) Get(path string) (*CacheItem.Item, bool) { - return dc.cache.Get(path) -} - func (dc *DirectoryCrawler) CleanupDeletedFiles(path string) { dc.visited.Range(func(key, value interface{}) bool { keyStr := key.(string) if isSubpath(file.StripRootDir(path), keyStr) && value.(bool) { - dc.cache.Remove(keyStr) + SharedCache.Cache.Remove(keyStr) } return true }) @@ -63,7 +57,7 @@ func (dc *DirectoryCrawler) AddCacheItem(fullPath string, info os.FileInfo) { item := CacheItem.NewItem(fullPath, info) if item != nil { // Sometimes CacheItem.NewItem will return nil if the path fails its checks - dc.cache.Add(strippedPath, item) + SharedCache.Cache.Add(strippedPath, item) } else { //log.Errorf("NewItem returned nil for %s", fullPath) } diff --git a/src/cache/DirectoryCrawler/crawl.go b/src/DirectoryCrawler/crawl.go similarity index 82% rename from src/cache/DirectoryCrawler/crawl.go rename to src/DirectoryCrawler/crawl.go index c456aff..8b4350a 100644 --- a/src/cache/DirectoryCrawler/crawl.go +++ b/src/DirectoryCrawler/crawl.go @@ -2,6 +2,7 @@ package DirectoryCrawler import ( "crazyfs/CacheItem" + "crazyfs/SharedCache" "crazyfs/Workers" "crazyfs/config" "crazyfs/file" @@ -36,7 +37,7 @@ func (dc *DirectoryCrawler) walkNonRecursiveFunc(fullPath string, dir os.DirEntr return nil } -func (dc *DirectoryCrawler) Crawl(fullPath string) error { +func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.FileInfo, error) error) error { CacheItem.RetardCheck(fullPath) readyToStart := dc.startCrawl(fullPath) if !readyToStart { @@ -44,6 +45,10 @@ func (dc *DirectoryCrawler) Crawl(fullPath string) error { } defer dc.endCrawl(fullPath) + if walkFunc == nil { + walkFunc = dc.walkRecursiveFunc + } + info, err := os.Lstat(fullPath) if os.IsNotExist(err) { // If the path doesn't exist, just silently exit @@ -54,20 +59,14 @@ func (dc *DirectoryCrawler) Crawl(fullPath string) error { return err } - //if !config.FollowSymlinks && info.Mode()&os.ModeSymlink > 0 { - // msg := fmt.Sprintf("CRAWL - tried to crawl a symlink (not allowed in config): %s", fullPath) - // log.Warnf(msg) - // return errors.New(msg) - //} + //relPath := file.StripRootDir(fullPath) - relPath := file.StripRootDir(fullPath) - - dc.cache.Remove(relPath) + //SharedCache.Cache.Remove(relPath) if info.IsDir() { // Get a list of all keys in the cache that belong to this directory keys := make([]string, 0) - for _, key := range dc.cache.Keys() { + for _, key := range SharedCache.Cache.Keys() { if isSubpath(fullPath, key) { keys = append(keys, key) } @@ -75,18 +74,15 @@ func (dc *DirectoryCrawler) Crawl(fullPath string) error { // Remove all entries in the cache that belong to this directory, so we can start fresh. for _, key := range keys { - dc.cache.Remove(key) + SharedCache.Cache.Remove(key) } // If the path is a directory, start a walk - err := Workers.Walk(fullPath, config.FollowSymlinks, dc.walkRecursiveFunc) + err := Workers.Walk(fullPath, config.FollowSymlinks, walkFunc) if err != nil { log.Errorf("CRAWLER - crawl for %s failed: %s", fullPath, err) } - // TODO: don't think this is needed since we remove all the children of this item - // After crawling, remove any keys that are still in the list (these are items that were not found on the filesystem) - //dc.CleanupDeletedFiles(fullPath) } else { // If the path is a file, add it to the cache directly dc.AddCacheItem(fullPath, info) @@ -121,12 +117,12 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item, var item *CacheItem.Item relPath := file.StripRootDir(fullPath) - dc.cache.Remove(relPath) + SharedCache.Cache.Remove(relPath) if info.IsDir() { // Get a list of all keys in the cache that belong to this directory keys := make([]string, 0) - for _, key := range dc.cache.Keys() { + for _, key := range SharedCache.Cache.Keys() { if isSubpath(fullPath, key) { keys = append(keys, key) } @@ -134,7 +130,7 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item, // Remove all entries in the cache that belong to this directory so we can start fresh for _, key := range keys { - dc.cache.Remove(key) + SharedCache.Cache.Remove(key) } err := filepath.WalkDir(fullPath, dc.walkNonRecursiveFunc) @@ -142,7 +138,7 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item, log.Errorf("CRAWLER - non-recursive crawl for %s failed: %s", fullPath, err) return nil, err } - item, _ = dc.cache.Get(relPath) + item, _ = SharedCache.Cache.Get(relPath) } else { item = CacheItem.NewItem(fullPath, info) dc.AddCacheItem(fullPath, info) diff --git a/src/cache/DirectoryCrawler/vars.go b/src/DirectoryCrawler/init.go similarity index 100% rename from src/cache/DirectoryCrawler/vars.go rename to src/DirectoryCrawler/init.go diff --git a/src/cache/DirectoryCrawler/process.go b/src/DirectoryCrawler/process.go similarity index 90% rename from src/cache/DirectoryCrawler/process.go rename to src/DirectoryCrawler/process.go index 2794ad0..52962cc 100644 --- a/src/cache/DirectoryCrawler/process.go +++ b/src/DirectoryCrawler/process.go @@ -2,6 +2,7 @@ package DirectoryCrawler import ( "crazyfs/CacheItem" + "crazyfs/SharedCache" "crazyfs/config" "crazyfs/file" "os" @@ -27,7 +28,7 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error } // Add the directory to the cache after all of its children have been processed - dc.cache.Add(relPath, dirItem) + SharedCache.Cache.Add(relPath, dirItem) // If the directory is not the root directory, update the parent directory's Children field // This block of code ensures that the parent directory's Children field is always up-to-date with @@ -36,7 +37,7 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error if fullPath != config.GetConfig().RootDir { parentDir := filepath.Dir(fullPath) strippedParentDir := file.StripRootDir(parentDir) - parentItem, found := dc.cache.Get(strippedParentDir) + parentItem, found := SharedCache.Cache.Get(strippedParentDir) if found { // Remove the old version of the directory from the parent's Children field newChildren, foundOldDir := removeOldDir(parentItem.Children, relPath) @@ -46,7 +47,7 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error parentItem.Children = append(newChildren, relPath) } // Update the parent directory in the cache - dc.cache.Add(strippedParentDir, parentItem) + SharedCache.Cache.Add(strippedParentDir, parentItem) } } } else { diff --git a/src/ResponseItem/ResponseItem.go b/src/ResponseItem/ResponseItem.go index 146c7bc..dc48744 100644 --- a/src/ResponseItem/ResponseItem.go +++ b/src/ResponseItem/ResponseItem.go @@ -2,10 +2,10 @@ package ResponseItem import ( "crazyfs/CacheItem" - "crazyfs/cache/DirectoryCrawler" + "crazyfs/DirectoryCrawler" + "crazyfs/SharedCache" "crazyfs/config" "crazyfs/logging" - lru "github.com/hashicorp/golang-lru/v2" "github.com/sirupsen/logrus" "path/filepath" ) @@ -31,7 +31,7 @@ type ResponseItem struct { Cached int64 `json:"cached"` } -func NewResponseItem(cacheItem *CacheItem.Item, sharedCache *lru.Cache[string, *CacheItem.Item]) *ResponseItem { +func NewResponseItem(cacheItem *CacheItem.Item) *ResponseItem { item := &ResponseItem{ Path: cacheItem.Path, Name: cacheItem.Name, @@ -50,7 +50,7 @@ func NewResponseItem(cacheItem *CacheItem.Item, sharedCache *lru.Cache[string, * if len(cacheItem.Children) > 0 { // avoid a null entry for the children key in the JSON var children []*CacheItem.Item for _, child := range cacheItem.Children { - childItem, found := sharedCache.Get(child) + childItem, found := SharedCache.Cache.Get(child) // Do a quick crawl since the path could have been modfied since the last crawl. // This also be triggered if we encounter a broken symlink. We don't check for broken symlinks when scanning @@ -58,7 +58,7 @@ func NewResponseItem(cacheItem *CacheItem.Item, sharedCache *lru.Cache[string, * if !found { log.Debugf("CRAWLER - %s not in cache, crawling", child) - dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache) + dc := DirectoryCrawler.NewDirectoryCrawler() item, err := dc.CrawlNoRecursion(filepath.Join(config.GetConfig().RootDir, child)) if err != nil { log.Errorf("NewResponseItem - CrawlNoRecursion - %s", err) diff --git a/src/SharedCache/SharedCache.go b/src/SharedCache/SharedCache.go new file mode 100644 index 0000000..f044c95 --- /dev/null +++ b/src/SharedCache/SharedCache.go @@ -0,0 +1,30 @@ +package SharedCache + +import ( + "crazyfs/CacheItem" + "crazyfs/logging" + "errors" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/sirupsen/logrus" +) + +var Cache *lru.Cache[string, *CacheItem.Item] +var cacheCreated bool +var log *logrus.Logger + +func init() { + log = logging.GetLogger() +} + +func NewCache(size int) error { + if cacheCreated { + return errors.New("cache has already been created") + } + cache, err := lru.New[string, *CacheItem.Item](size) + if err != nil { + return err + } + Cache = cache + cacheCreated = true + return nil +} diff --git a/src/api/helpers/HandleFileNotFound.go b/src/api/helpers/HandleFileNotFound.go index 93eab6f..967d717 100644 --- a/src/api/helpers/HandleFileNotFound.go +++ b/src/api/helpers/HandleFileNotFound.go @@ -2,20 +2,20 @@ package helpers import ( "crazyfs/CacheItem" + "crazyfs/DirectoryCrawler" + "crazyfs/SharedCache" "crazyfs/cache" - "crazyfs/cache/DirectoryCrawler" - "github.com/hashicorp/golang-lru/v2" "net/http" "os" "time" ) // HandleFileNotFound if the data is not in the cache, start a new crawler -func HandleFileNotFound(relPath string, fullPath string, sharedCache *lru.Cache[string, *CacheItem.Item], w http.ResponseWriter) *CacheItem.Item { +func HandleFileNotFound(relPath string, fullPath string, w http.ResponseWriter) *CacheItem.Item { // TODO: implement some sort of backoff or delay for repeated calls to recache the same path. log.Debugf("CRAWLER - %s not in cache, crawling", fullPath) - dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache) + dc := DirectoryCrawler.NewDirectoryCrawler() // Check if this is a symlink. We do this before CrawlNoRecursion() because we want to tell the end user that // we're not going to resolve this symlink. @@ -34,9 +34,9 @@ func HandleFileNotFound(relPath string, fullPath string, sharedCache *lru.Cache[ // a chance to kick of a recursive crawl. go func() { log.Debugf("Starting background recursive crawl for %s", fullPath) - dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache) + dc := DirectoryCrawler.NewDirectoryCrawler() start := time.Now() - err := dc.Crawl(fullPath) + err := dc.Crawl(fullPath, nil) if err != nil { log.Errorf("LIST - background recursive crawl failed: %s", err) } @@ -55,7 +55,7 @@ func HandleFileNotFound(relPath string, fullPath string, sharedCache *lru.Cache[ } // Try to get the data from the cache again. - item, found := sharedCache.Get(relPath) + item, found := SharedCache.Cache.Get(relPath) if !found { // TODO: let's not re-check the disk if the file is still not in the cache. Instead, let's just assume that it doesn't exist. ReturnFake404Msg("path not found", w) @@ -82,6 +82,6 @@ func HandleFileNotFound(relPath string, fullPath string, sharedCache *lru.Cache[ Return500Msg(w) return nil } - cache.CheckAndRecache(fullPath, sharedCache) + cache.CheckAndRecache(fullPath) return item } diff --git a/src/api/helpers/zipstream.go b/src/api/helpers/zipstream.go index 3c09d15..86a33d7 100644 --- a/src/api/helpers/zipstream.go +++ b/src/api/helpers/zipstream.go @@ -1,9 +1,8 @@ package helpers import ( - "crazyfs/CacheItem" + "crazyfs/SharedCache" "crazyfs/file" - lru "github.com/hashicorp/golang-lru/v2" kzip "github.com/klauspost/compress/zip" "io" "net/http" @@ -48,16 +47,16 @@ func ZipHandlerCompress(dirPath string, w http.ResponseWriter, r *http.Request) log.Errorf("ZIPSTREM - failed to close zipwriter: %s", err) } } -func ZipHandlerCompressMultiple(paths []string, w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { +func ZipHandlerCompressMultiple(paths []string, w http.ResponseWriter, r *http.Request) { zipWriter := kzip.NewWriter(w) // Walk through each file and add it to the zip for _, fullPath := range paths { relPath := file.StripRootDir(fullPath) // Try to get the data from the cache - item, found := sharedCache.Get(relPath) + item, found := SharedCache.Cache.Get(relPath) if !found { - item = HandleFileNotFound(relPath, fullPath, sharedCache, w) + item = HandleFileNotFound(relPath, fullPath, w) } if item == nil { // The errors have already been handled in handleFileNotFound() so we're good to just exit diff --git a/src/api/httpRoutes.go b/src/api/httpRoutes.go index 47fde29..e0d03a7 100644 --- a/src/api/httpRoutes.go +++ b/src/api/httpRoutes.go @@ -1,7 +1,6 @@ package api import ( - "crazyfs/CacheItem" "crazyfs/api/routes" "crazyfs/api/routes/admin" "crazyfs/api/routes/client" @@ -10,7 +9,6 @@ import ( "encoding/json" "fmt" "github.com/gorilla/mux" - lru "github.com/hashicorp/golang-lru/v2" "net/http" ) @@ -23,7 +21,7 @@ type Route struct { type Routes []Route -type AppHandler func(http.ResponseWriter, *http.Request, *lru.Cache[string, *CacheItem.Item]) +type AppHandler func(http.ResponseWriter, *http.Request) var httpRoutes = Routes{ Route{ @@ -102,7 +100,7 @@ func setHeaders(next http.Handler) http.Handler { }) } -func NewRouter(sharedCache *lru.Cache[string, *CacheItem.Item]) *mux.Router { +func NewRouter() *mux.Router { r := mux.NewRouter().StrictSlash(true) for _, route := range httpRoutes { var handler http.Handler @@ -111,7 +109,7 @@ func NewRouter(sharedCache *lru.Cache[string, *CacheItem.Item]) *mux.Router { currentRoute := route handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - currentRoute.HandlerFunc(w, r, sharedCache) + currentRoute.HandlerFunc(w, r) }) handler = setHeaders(handler) handler = logging.LogRequest(handler) @@ -137,7 +135,7 @@ func NewRouter(sharedCache *lru.Cache[string, *CacheItem.Item]) *mux.Router { } func wrongMethod(expectedMethod string, next AppHandler) AppHandler { - return func(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { + return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusBadRequest) json.NewEncoder(w).Encode(map[string]interface{}{ diff --git a/src/api/routes/Download.go b/src/api/routes/Download.go index 5d649c9..56c3570 100644 --- a/src/api/routes/Download.go +++ b/src/api/routes/Download.go @@ -1,18 +1,17 @@ package routes import ( - "crazyfs/CacheItem" + "crazyfs/SharedCache" "crazyfs/api/helpers" "crazyfs/config" "crazyfs/file" "fmt" - lru "github.com/hashicorp/golang-lru/v2" "net/http" "os" "strings" ) -func Download(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { +func Download(w http.ResponseWriter, r *http.Request) { if helpers.CheckInitialCrawl() { helpers.HandleRejectDuringInitialCrawl(w) return @@ -46,7 +45,7 @@ func Download(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[str } // Multiple files, zip them - helpers.ZipHandlerCompressMultiple(cleanPaths, w, r, sharedCache) + helpers.ZipHandlerCompressMultiple(cleanPaths, w, r) return } @@ -67,9 +66,9 @@ func Download(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[str } // Try to get the data from the cache - item, found := sharedCache.Get(relPath) + item, found := SharedCache.Cache.Get(relPath) if !found { - item = helpers.HandleFileNotFound(relPath, fullPath, sharedCache, w) + item = helpers.HandleFileNotFound(relPath, fullPath, w) } if item == nil { // The errors have already been handled in handleFileNotFound() so we're good to just exit @@ -105,7 +104,7 @@ func Download(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[str // GetMimeType() returns an empty string if it was a directory. // Update the CacheItem's MIME in the sharedCache. item.MimeType = &mimeType - sharedCache.Add(relPath, item) + SharedCache.Cache.Add(relPath, item) } else { log.Errorf("Download.go failed to match a condition when checking a file's MIME - %s", fullPath) helpers.Return500Msg(w) @@ -147,7 +146,7 @@ func Download(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[str // Open the file openFile, err := os.Open(fullPath) if err != nil { - sharedCache.Remove(relPath) // remove it from the cache + SharedCache.Cache.Remove(relPath) // remove it from the cache helpers.ReturnFake404Msg("file missing from disk, cache out of date", w) return } diff --git a/src/api/routes/List.go b/src/api/routes/List.go index 9c41a07..bce820c 100644 --- a/src/api/routes/List.go +++ b/src/api/routes/List.go @@ -3,15 +3,15 @@ package routes import ( "crazyfs/CacheItem" "crazyfs/ResponseItem" + "crazyfs/SharedCache" "crazyfs/api/helpers" "crazyfs/config" "crazyfs/file" - lru "github.com/hashicorp/golang-lru/v2" "net/http" "strconv" ) -func ListDir(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { +func ListDir(w http.ResponseWriter, r *http.Request) { if helpers.CheckInitialCrawl() { helpers.HandleRejectDuringInitialCrawl(w) return @@ -46,16 +46,16 @@ func ListDir(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[stri relPath := file.StripRootDir(fullPath) // Try to get the data from the cache - cacheItem, found := sharedCache.Get(relPath) + cacheItem, found := SharedCache.Cache.Get(relPath) if !found { - cacheItem = helpers.HandleFileNotFound(relPath, fullPath, sharedCache, w) + cacheItem = helpers.HandleFileNotFound(relPath, fullPath, w) } if cacheItem == nil { return // The errors have already been handled in handleFileNotFound() so we're good to just exit } - // Create a copy of the cached Item so we don't modify the Item in the cache - item := ResponseItem.NewResponseItem(cacheItem, sharedCache) + // Create a copy of the cached Item, so we don't modify the Item in the cache + item := ResponseItem.NewResponseItem(cacheItem) // Get the MIME type of the file if the 'mime' argument is present mime := r.URL.Query().Get("mime") @@ -79,7 +79,7 @@ func ListDir(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[stri // Update the original cached CacheItem's MIME in the sharedCache cacheItem.MimeType = &mimeType cacheItem.Extension = &ext - sharedCache.Add(relPath, cacheItem) // take the address of CacheItem + SharedCache.Cache.Add(relPath, cacheItem) // take the address of CacheItem } } } diff --git a/src/api/routes/Search.go b/src/api/routes/Search.go index e718cdf..af47751 100644 --- a/src/api/routes/Search.go +++ b/src/api/routes/Search.go @@ -8,7 +8,6 @@ import ( "crazyfs/elastic" "crazyfs/logging" "encoding/json" - lru "github.com/hashicorp/golang-lru/v2" "net/http" "sort" "strconv" @@ -16,7 +15,7 @@ import ( "time" ) -func SearchFile(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { +func SearchFile(w http.ResponseWriter, r *http.Request) { if helpers.CheckInitialCrawl() { helpers.HandleRejectDuringInitialCrawl(w) return @@ -143,7 +142,7 @@ func SearchFile(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[s results = append(results, items...) } } else { - results = cache.SearchLRU(queryString, excludeElements, limitResults, sharedCache) + results = cache.SearchLRU(queryString, excludeElements, limitResults) } if folderSorting == "folders" { diff --git a/src/api/routes/Thumbnail.go b/src/api/routes/Thumbnail.go index d776245..4e07409 100644 --- a/src/api/routes/Thumbnail.go +++ b/src/api/routes/Thumbnail.go @@ -2,7 +2,7 @@ package routes import ( "bytes" - "crazyfs/CacheItem" + "crazyfs/SharedCache" "crazyfs/api/helpers" "crazyfs/cache" "crazyfs/config" @@ -10,7 +10,6 @@ import ( "crazyfs/logging" "fmt" "github.com/disintegration/imaging" - lru "github.com/hashicorp/golang-lru/v2" "github.com/nfnt/resize" "strconv" @@ -22,7 +21,7 @@ import ( "strings" ) -func Thumbnail(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { +func Thumbnail(w http.ResponseWriter, r *http.Request) { if cache.InitialCrawlInProgress && !config.GetConfig().HttpAllowDuringInitialCrawl { helpers.HandleRejectDuringInitialCrawl(w) returnDummyPNG(w) @@ -63,9 +62,9 @@ func Thumbnail(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[st } // Try to get the data from the cache - item, found := sharedCache.Get(relPath) + item, found := SharedCache.Cache.Get(relPath) if !found { - item = helpers.HandleFileNotFound(relPath, fullPath, sharedCache, w) + item = helpers.HandleFileNotFound(relPath, fullPath, w) } if item == nil { returnDummyPNG(w) @@ -91,7 +90,7 @@ func Thumbnail(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[st // Update the CacheItem's MIME in the sharedCache item.MimeType = &mimeType item.Extension = &ext - sharedCache.Add(relPath, item) + SharedCache.Cache.Add(relPath, item) // Check if the file is an image if !strings.HasPrefix(mimeType, "image/") { diff --git a/src/api/routes/admin/AdminCacheInfo.go b/src/api/routes/admin/AdminCacheInfo.go index 5655cbf..12eda67 100644 --- a/src/api/routes/admin/AdminCacheInfo.go +++ b/src/api/routes/admin/AdminCacheInfo.go @@ -1,18 +1,17 @@ package admin import ( - "crazyfs/CacheItem" + "crazyfs/SharedCache" "crazyfs/api/helpers" "crazyfs/config" "crazyfs/elastic" "crypto/sha256" "crypto/subtle" "encoding/json" - lru "github.com/hashicorp/golang-lru/v2" "net/http" ) -func AdminCacheInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { +func AdminCacheInfo(w http.ResponseWriter, r *http.Request) { username, password, ok := r.BasicAuth() if ok { usernameHash := sha256.Sum256([]byte(username)) @@ -26,7 +25,7 @@ func AdminCacheInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cac helpers.Return401Msg("unauthorized", w) return } else { - cacheLen := sharedCache.Len() + cacheLen := SharedCache.Cache.Len() response := map[string]interface{}{ "cachedItems": cacheLen, diff --git a/src/api/routes/admin/AdminCrawlsInfo.go b/src/api/routes/admin/AdminCrawlsInfo.go index cfcc570..77d5a2e 100644 --- a/src/api/routes/admin/AdminCrawlsInfo.go +++ b/src/api/routes/admin/AdminCrawlsInfo.go @@ -1,19 +1,17 @@ package admin import ( - "crazyfs/CacheItem" + "crazyfs/DirectoryCrawler" "crazyfs/Workers" "crazyfs/api/helpers" - "crazyfs/cache/DirectoryCrawler" "crazyfs/config" "crypto/sha256" "crypto/subtle" "encoding/json" - lru "github.com/hashicorp/golang-lru/v2" "net/http" ) -func AdminCrawlsInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { +func AdminCrawlsInfo(w http.ResponseWriter, r *http.Request) { username, password, ok := r.BasicAuth() if ok { usernameHash := sha256.Sum256([]byte(username)) diff --git a/src/api/routes/admin/AdminRecache.go b/src/api/routes/admin/AdminRecache.go index 04a450f..e553fa1 100644 --- a/src/api/routes/admin/AdminRecache.go +++ b/src/api/routes/admin/AdminRecache.go @@ -1,18 +1,16 @@ package admin import ( - "crazyfs/CacheItem" "crazyfs/api/helpers" "crazyfs/cache" "crazyfs/config" "crazyfs/file" "encoding/json" "fmt" - lru "github.com/hashicorp/golang-lru/v2" "net/http" ) -func AdminReCache(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { +func AdminReCache(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { helpers.Return400Msg("this is a POST endpoint", w) return @@ -48,7 +46,7 @@ func AdminReCache(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache "error": nil, } // Check and re-cache the directory - err = cache.Recache(fullPath, sharedCache) + err = cache.Recache(fullPath) if err != nil { response["message"] = fmt.Sprintf("recache failed") response["error"] = err.Error() diff --git a/src/api/routes/admin/AdminSysInfo.go b/src/api/routes/admin/AdminSysInfo.go index 1d04636..1446575 100644 --- a/src/api/routes/admin/AdminSysInfo.go +++ b/src/api/routes/admin/AdminSysInfo.go @@ -1,17 +1,15 @@ package admin import ( - "crazyfs/CacheItem" "crazyfs/api/helpers" "crazyfs/config" "crazyfs/logging" "crypto/sha256" "crypto/subtle" - lru "github.com/hashicorp/golang-lru/v2" "net/http" ) -func AdminSysInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { +func AdminSysInfo(w http.ResponseWriter, r *http.Request) { username, password, ok := r.BasicAuth() if ok { usernameHash := sha256.Sum256([]byte(username)) diff --git a/src/api/routes/client/Health.go b/src/api/routes/client/Health.go index cd0a7af..cd44809 100644 --- a/src/api/routes/client/Health.go +++ b/src/api/routes/client/Health.go @@ -1,17 +1,15 @@ package client import ( - "crazyfs/CacheItem" + "crazyfs/DirectoryCrawler" "crazyfs/api/helpers" "crazyfs/cache" - "crazyfs/cache/DirectoryCrawler" - lru "github.com/hashicorp/golang-lru/v2" "net/http" ) // TODO: show the time the initial crawl started -func HealthCheck(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { +func HealthCheck(w http.ResponseWriter, r *http.Request) { response := map[string]interface{}{} response["scanRunning"] = DirectoryCrawler.GetTotalActiveCrawls() > 0 response["initialScanRunning"] = cache.InitialCrawlInProgress diff --git a/src/api/routes/client/Restricted.go b/src/api/routes/client/Restricted.go index 36823bd..f4ed9d9 100644 --- a/src/api/routes/client/Restricted.go +++ b/src/api/routes/client/Restricted.go @@ -1,14 +1,12 @@ package client import ( - "crazyfs/CacheItem" "crazyfs/api/helpers" "crazyfs/config" - lru "github.com/hashicorp/golang-lru/v2" "net/http" ) -func RestrictedDownloadPaths(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { +func RestrictedDownloadPaths(w http.ResponseWriter, r *http.Request) { response := config.GetConfig().RestrictedDownloadPaths helpers.WriteJsonResponse(response, false, w, r) } diff --git a/src/cache/crawler.go b/src/cache/crawler.go index 76e6222..0120ea6 100644 --- a/src/cache/crawler.go +++ b/src/cache/crawler.go @@ -1,36 +1,27 @@ package cache import ( - "crazyfs/CacheItem" + "crazyfs/DirectoryCrawler" + "crazyfs/SharedCache" "crazyfs/Workers" - "crazyfs/cache/DirectoryCrawler" "crazyfs/config" - "crazyfs/logging" - lru "github.com/hashicorp/golang-lru/v2" - "github.com/sirupsen/logrus" "sync" "time" ) -var log *logrus.Logger - -func init() { - log = logging.GetLogger() -} - -func StartCrawler(sharedCache *lru.Cache[string, *CacheItem.Item]) error { +func StartCrawler() error { var wg sync.WaitGroup crawlerChan := make(chan struct{}, config.GetConfig().DirectoryCrawlers) - go startCrawl(sharedCache, &wg, crawlerChan) + go startCrawl(&wg, crawlerChan) ticker := time.NewTicker(60 * time.Second) - go logCacheStatus("CACHE STATUS", ticker, sharedCache, log.Debugf) + go logCacheStatus("CACHE STATUS", ticker, log.Debugf) return nil } -func startCrawl(sharedCache *lru.Cache[string, *CacheItem.Item], wg *sync.WaitGroup, crawlerChan chan struct{}) { +func startCrawl(wg *sync.WaitGroup, crawlerChan chan struct{}) { ticker := time.NewTicker(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second) defer ticker.Stop() @@ -41,26 +32,26 @@ func startCrawl(sharedCache *lru.Cache[string, *CacheItem.Item], wg *sync.WaitGr wg.Add(1) go func() { defer wg.Done() - dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache) + dc := DirectoryCrawler.NewDirectoryCrawler() log.Infoln("CRAWLER - Starting a crawl...") start := time.Now() - err := dc.Crawl(config.GetConfig().RootDir) + err := dc.Crawl(config.GetConfig().RootDir, nil) if err != nil { log.Warnf("CRAWLER - Crawl failed: %s", err) } else { duration := time.Since(start).Round(time.Second) log.Infof("CRAWLER - Crawl completed in %s", duration) - log.Debugf("%d/%d items in the cache.", config.GetConfig().CacheSize, len(sharedCache.Keys())) + log.Debugf("%d/%d items in the cache.", config.GetConfig().CacheSize, len(SharedCache.Cache.Keys())) } <-crawlerChan }() } } -func logCacheStatus(msg string, ticker *time.Ticker, sharedCache *lru.Cache[string, *CacheItem.Item], logFn func(format string, args ...interface{})) { +func logCacheStatus(msg string, ticker *time.Ticker, logFn func(format string, args ...interface{})) { defer ticker.Stop() for range ticker.C { logFn("%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d.", - msg, len(sharedCache.Keys()), config.GetConfig().CacheSize, Workers.BusyWorkers, Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls()) + msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, Workers.BusyWorkers, Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls()) } } diff --git a/src/cache/init.go b/src/cache/init.go new file mode 100644 index 0000000..d528517 --- /dev/null +++ b/src/cache/init.go @@ -0,0 +1,12 @@ +package cache + +import ( + "crazyfs/logging" + "github.com/sirupsen/logrus" +) + +var log *logrus.Logger + +func init() { + log = logging.GetLogger() +} diff --git a/src/cache/initial.go b/src/cache/initial.go index 35ef9ba..e61626b 100644 --- a/src/cache/initial.go +++ b/src/cache/initial.go @@ -1,11 +1,9 @@ package cache import ( - "crazyfs/CacheItem" - "crazyfs/cache/DirectoryCrawler" + "crazyfs/DirectoryCrawler" "crazyfs/config" "crazyfs/logging" - lru "github.com/hashicorp/golang-lru/v2" "time" ) @@ -15,18 +13,18 @@ func init() { InitialCrawlInProgress = false } -func InitialCrawl(sharedCache *lru.Cache[string, *CacheItem.Item]) { +func InitialCrawl() { log = logging.GetLogger() log.Infof("INITIAL CRAWL - starting the crawl for %s", config.GetConfig().RootDir) ticker := time.NewTicker(3 * time.Second) - go logCacheStatus("INITIAL CRAWL", ticker, sharedCache, log.Infof) + go logCacheStatus("INITIAL CRAWL", ticker, log.Infof) InitialCrawlInProgress = true - dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache) + dc := DirectoryCrawler.NewDirectoryCrawler() //start := time.Now() - err := dc.Crawl(config.GetConfig().RootDir) + err := dc.Crawl(config.GetConfig().RootDir, nil) if err != nil { log.Errorf("LIST - background recursive crawl failed: %s", err) } diff --git a/src/cache/recache.go b/src/cache/recache.go index aecf007..bdbf0a0 100644 --- a/src/cache/recache.go +++ b/src/cache/recache.go @@ -2,11 +2,11 @@ package cache import ( "crazyfs/CacheItem" - "crazyfs/cache/DirectoryCrawler" + "crazyfs/DirectoryCrawler" + "crazyfs/SharedCache" "crazyfs/config" "crazyfs/file" "errors" - lru "github.com/hashicorp/golang-lru/v2" "os" "path/filepath" "time" @@ -18,15 +18,15 @@ func InitRecacheSemaphore(limit int) { sem = make(chan struct{}, limit) } -func CheckAndRecache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) { - item, found := sharedCache.Get(path) +func CheckAndRecache(path string) { + item, found := SharedCache.Cache.Get(path) if found && time.Now().UnixNano()/int64(time.Millisecond)-item.Cached > int64(config.GetConfig().CacheTime)*60*1000 { log.Debugf("Re-caching: %s", path) sem <- struct{}{} // acquire a token go func() { defer func() { <-sem }() // release the token when done - dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache) - err := dc.Crawl(path) + dc := DirectoryCrawler.NewDirectoryCrawler() + err := dc.Crawl(path, nil) if err != nil { log.Errorf("RECACHE ERROR: %s", err.Error()) return @@ -35,8 +35,8 @@ func CheckAndRecache(path string, sharedCache *lru.Cache[string, *CacheItem.Item } } -func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) error { - dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache) +func Recache(path string) error { + dc := DirectoryCrawler.NewDirectoryCrawler() if dc.IsCrawlActive(path) { return errors.New("rejecting crawl, already in progress for this path") } @@ -46,8 +46,8 @@ func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) error sem <- struct{}{} // acquire a token go func() { defer func() { <-sem }() // release the token when done - dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache) - err := dc.Crawl(path) + dc := DirectoryCrawler.NewDirectoryCrawler() + err := dc.Crawl(path, nil) if err != nil { log.Errorf("RECACHE ERROR: %s", err.Error()) return @@ -56,7 +56,7 @@ func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) error // Get the parent directory from the cache parentDir := filepath.Dir(path) parentDirRel := file.StripRootDir(parentDir) - parentItem, found := sharedCache.Get(parentDirRel) + parentItem, found := SharedCache.Cache.Get(parentDirRel) if found { // Remove the old subdirectory from the parent directory's Children field for i, child := range parentItem.Children { @@ -85,7 +85,7 @@ func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) error // Assign the newChildren slice to the Children field parentItem.Children = newChildren // Update the parent directory in the cache - sharedCache.Add(parentDir, parentItem) + SharedCache.Cache.Add(parentDir, parentItem) } } else if !CacheItem.PathOutsideRoot(parentDir) { // If the parent directory isn't in the cache, crawl it diff --git a/src/cache/search.go b/src/cache/search.go index 28e1b03..afa25d7 100644 --- a/src/cache/search.go +++ b/src/cache/search.go @@ -3,13 +3,13 @@ package cache import ( "bytes" "crazyfs/CacheItem" + "crazyfs/SharedCache" "crazyfs/config" "encoding/gob" - lru "github.com/hashicorp/golang-lru/v2" "strings" ) -func SearchLRU(queryString string, excludeElements []string, limitResults int, sharedCache *lru.Cache[string, *CacheItem.Item]) []*CacheItem.Item { +func SearchLRU(queryString string, excludeElements []string, limitResults int) []*CacheItem.Item { results := make([]*CacheItem.Item, 0) const maxGoroutines = 100 @@ -17,10 +17,10 @@ func SearchLRU(queryString string, excludeElements []string, limitResults int, s // Create a buffered channel as a semaphore sem := make(chan struct{}, maxGoroutines) - resultsChan := make(chan *CacheItem.Item, len(sharedCache.Keys())) + resultsChan := make(chan *CacheItem.Item, len(SharedCache.Cache.Keys())) - for _, key := range sharedCache.Keys() { - searchKey(key, queryString, excludeElements, sem, resultsChan, sharedCache) + for _, key := range SharedCache.Cache.Keys() { + searchKey(key, queryString, excludeElements, sem, resultsChan) } // Wait for all goroutines to finish @@ -28,7 +28,7 @@ func SearchLRU(queryString string, excludeElements []string, limitResults int, s sem <- struct{}{} } - for range sharedCache.Keys() { + for range SharedCache.Cache.Keys() { item := <-resultsChan if item != nil { results = append(results, item) @@ -41,7 +41,7 @@ func SearchLRU(queryString string, excludeElements []string, limitResults int, s return results } -func searchKey(key string, queryString string, excludeElements []string, sem chan struct{}, resultsChan chan *CacheItem.Item, sharedCache *lru.Cache[string, *CacheItem.Item]) { +func searchKey(key string, queryString string, excludeElements []string, sem chan struct{}, resultsChan chan *CacheItem.Item) { // Acquire a token sem <- struct{}{} @@ -49,7 +49,7 @@ func searchKey(key string, queryString string, excludeElements []string, sem cha // Release the token at the end defer func() { <-sem }() - cacheItem, found := sharedCache.Get(key) + cacheItem, found := SharedCache.Cache.Get(key) if !found { resultsChan <- nil return diff --git a/src/config/config.go b/src/config/config.go index 790ec9c..5948df5 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -38,7 +38,6 @@ type Config struct { ElasticsearchIndex string ElasticsearchSyncThreads int ElasticsearchExcludePatterns []string - ElasticsearchAllowConcurrentSyncs bool ElasticsearchFullSyncOnStart bool ElasticsearchDefaultQueryField string HTTPRealIPHeader string @@ -137,7 +136,6 @@ func SetConfig(configFile string) (*Config, error) { ElasticsearchIndex: viper.GetString("elasticsearch_index"), ElasticsearchSyncThreads: viper.GetInt("elasticsearch_sync_threads"), ElasticsearchExcludePatterns: viper.GetStringSlice("elasticsearch_exclude_patterns"), - ElasticsearchAllowConcurrentSyncs: viper.GetBool("elasticsearch_allow_concurrent_syncs"), ElasticsearchFullSyncOnStart: viper.GetBool("elasticsearch_full_sync_on_start"), ElasticsearchDefaultQueryField: viper.GetString("elasticsearch_default_query_field"), HTTPRealIPHeader: viper.GetString("http_real_ip_header"), diff --git a/src/crazyfs.go b/src/crazyfs.go index 15a1bec..80fe170 100644 --- a/src/crazyfs.go +++ b/src/crazyfs.go @@ -1,7 +1,7 @@ package main import ( - "crazyfs/CacheItem" + "crazyfs/SharedCache" "crazyfs/Workers" "crazyfs/api" "crazyfs/cache" @@ -12,7 +12,6 @@ import ( "flag" "fmt" "github.com/elastic/go-elasticsearch/v8" - lru "github.com/hashicorp/golang-lru/v2" "github.com/sirupsen/logrus" "net/http" //_ "net/http/pprof" // for profiling @@ -84,7 +83,7 @@ func main() { log.Fatalf("Failed to load config file: %s", err) } - sharedCache, err := lru.New[string, *CacheItem.Item](cfg.CacheSize) + err = SharedCache.NewCache(cfg.CacheSize) if err != nil { log.Fatal(err) } @@ -99,7 +98,7 @@ func main() { cache.InitRecacheSemaphore(cfg.CacheRecacheCrawlerLimit) // Start the webserver before doing the long crawl - r := api.NewRouter(sharedCache) + r := api.NewRouter() //log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", cfg.HTTPPort), r)) go func() { err := http.ListenAndServe(fmt.Sprintf(":%s", cfg.HTTPPort), r) @@ -112,14 +111,14 @@ func main() { if cliArgs.initialCrawl || cfg.InitialCrawl { log.Infoln("Preforming initial crawl...") start := time.Now() - cache.InitialCrawl(sharedCache) + cache.InitialCrawl() duration := time.Since(start).Round(time.Second) - keys := sharedCache.Keys() + keys := SharedCache.Cache.Keys() config.InitialCrawlElapsed = int(duration) log.Infof("Initial crawl completed in %s. %d items added to the cache.", duration, len(keys)) } - err = cache.StartCrawler(sharedCache) + err = cache.StartCrawler() if err != nil { log.Fatalf("Failed to start timed crawler process: %s", err) } @@ -144,7 +143,7 @@ func main() { elastic.ElasticClient = es if cfg.ElasticsearchSyncEnable && !cliArgs.disableElasticSync { - go elastic.ElasticsearchThread(sharedCache) + go elastic.ElasticsearchThread() log.Info("Started the background Elasticsearch sync thread.") } else { log.Info("The background Elasticsearch sync thread is disabled.") diff --git a/src/elastic/ElasticsearchThread.go b/src/elastic/ElasticsearchThread.go index 5ce3b81..428ec54 100644 --- a/src/elastic/ElasticsearchThread.go +++ b/src/elastic/ElasticsearchThread.go @@ -1,15 +1,12 @@ package elastic import ( - "crazyfs/CacheItem" + "crazyfs/DirectoryCrawler" "crazyfs/config" - lru "github.com/hashicorp/golang-lru/v2" - "slices" - "sync" "time" ) -func ElasticsearchThread(sharedCache *lru.Cache[string, *CacheItem.Item]) { +func ElasticsearchThread() { createCrazyfsIndex() // Test connection to Elastic. @@ -20,95 +17,38 @@ func ElasticsearchThread(sharedCache *lru.Cache[string, *CacheItem.Item]) { } log.Infof(`ELASTIC - index "%s" contains %d items.`, config.GetConfig().ElasticsearchIndex, esSize) - var wg sync.WaitGroup - sem := make(chan bool, config.GetConfig().ElasticsearchSyncThreads) - // Run a partial sync at startup, unless configured to run a full one. - syncElasticsearch(sharedCache, &wg, sem, config.GetConfig().ElasticsearchFullSyncOnStart) + syncElasticsearch() ticker := time.NewTicker(time.Duration(config.GetConfig().ElasticsearchSyncInterval) * time.Second) - fullSyncTicker := time.NewTicker(time.Duration(config.GetConfig().ElasticsearchFullSyncInterval) * time.Second) - var mutex sync.Mutex for { select { case <-ticker.C: - if !config.GetConfig().ElasticsearchAllowConcurrentSyncs { - mutex.Lock() - } - syncElasticsearch(sharedCache, &wg, sem, false) - if !config.GetConfig().ElasticsearchAllowConcurrentSyncs { - mutex.Unlock() - } - case <-fullSyncTicker.C: - if !config.GetConfig().ElasticsearchAllowConcurrentSyncs { - mutex.Lock() - } - syncElasticsearch(sharedCache, &wg, sem, true) - if !config.GetConfig().ElasticsearchAllowConcurrentSyncs { - mutex.Unlock() - } + syncElasticsearch() } } } // TODO: make this use workers instead of starting a million threads // TODO: have the workers exit when the sync job is finished -func syncElasticsearch(sharedCache *lru.Cache[string, *CacheItem.Item], wg *sync.WaitGroup, sem chan bool, fullSync bool) { - var syncType string - var esContents []string - if fullSync { - ElasticRefreshSyncRunning = true - syncType = "full refresh" - } else { - ElasticNewSyncRunning = true - syncType = "refresh" - - var err error - esContents, err = getPathsFromIndex() - if err != nil { - log.Errorf("ELASTIC - Failed to read the index: %s", err) - return - } - } - log.Infof("ELASTIC - starting a %s sync.", syncType) - +func syncElasticsearch() { + log.Infof("ELASTIC - started syncing.") start := time.Now() - for _, key := range sharedCache.Keys() { - wg.Add(1) - go func(key string) { - defer wg.Done() - sem <- true - cacheItem, found := sharedCache.Get(key) - if !found { - log.Fatalf(`ELASTICSEARCH - Could not fetch item "%s" from the LRU cache!`, key) - } else { - if !shouldExclude(key, config.GetConfig().ElasticsearchExcludePatterns) { - if fullSync { - addToElasticsearch(cacheItem) - } else if !slices.Contains(esContents, key) { - addToElasticsearch(cacheItem) - } - } else { - deleteFromElasticsearch(key) // clean up - } - } - <-sem - }(key) - } - wg.Wait() + dc := DirectoryCrawler.NewDirectoryCrawler() + err := dc.Crawl(config.GetConfig().RootDir, addToElasticsearch) + if err != nil { + log.Errorf("ELASTIC - crawl failed: %s", err) + return + } + + // TODO: use workers for this log.Debugln("ELASTIC - Checking for removed items...") - removeStaleItemsFromElasticsearch(sharedCache) - - if fullSync { - ElasticRefreshSyncRunning = false - } else { - ElasticNewSyncRunning = false - } + removeStaleItemsFromElasticsearch() duration := time.Since(start) - log.Infof("ELASTIC - %s sync finished in %s", syncType, duration) + log.Infof("ELASTIC - sync finished in %s", duration) } func logElasticConnError(err error) { diff --git a/src/elastic/add.go b/src/elastic/add.go index ad18277..9e7a925 100644 --- a/src/elastic/add.go +++ b/src/elastic/add.go @@ -4,12 +4,30 @@ import ( "bytes" "context" "crazyfs/CacheItem" + "crazyfs/SharedCache" "crazyfs/config" + "crazyfs/file" "encoding/json" "github.com/elastic/go-elasticsearch/v8/esapi" + "os" ) -func addToElasticsearch(item *CacheItem.Item) { +func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) error { + key := file.StripRootDir(fullPath) + cacheItem, found := SharedCache.Cache.Get(key) + if !found { + log.Fatalf(`ELASTICSEARCH - Could not fetch item "%s" from the LRU cache!`, key) + } else { + if !shouldExclude(key, config.GetConfig().ElasticsearchExcludePatterns) { + preformAddToElasticsearch(cacheItem) + } else { + deleteFromElasticsearch(key) // clean up + } + } + return nil +} + +func preformAddToElasticsearch(item *CacheItem.Item) { log.Debugf(`ELASTIC - Adding: "%s"`, item.Path) prepareCacheItem(item) data, err := json.Marshal(item) @@ -42,10 +60,10 @@ func addToElasticsearch(item *CacheItem.Item) { // prepareCacheItem is used to get an item ready to insert into Elastic. func prepareCacheItem(item *CacheItem.Item) { - // We don't care about the children and this field's length may cause issues. + // We don't care about the children, and this field's length may cause issues. item.Children = nil - // Length of this one also may cause issues. + // The length of this one also may cause issues. item.Content = "" // Don't need to return anything since `item` is a pointer. diff --git a/src/elastic/delete.go b/src/elastic/delete.go index 6e8ac63..5d87e54 100644 --- a/src/elastic/delete.go +++ b/src/elastic/delete.go @@ -2,15 +2,14 @@ package elastic import ( "context" - "crazyfs/CacheItem" + "crazyfs/SharedCache" "crazyfs/config" "encoding/json" "github.com/elastic/go-elasticsearch/v8/esapi" - lru "github.com/hashicorp/golang-lru/v2" "sync" ) -func removeStaleItemsFromElasticsearch(sharedCache *lru.Cache[string, *CacheItem.Item]) { +func removeStaleItemsFromElasticsearch() { // Retrieve all keys from Elasticsearch keys, err := getPathsFromIndex() if err != nil { @@ -39,7 +38,7 @@ func removeStaleItemsFromElasticsearch(sharedCache *lru.Cache[string, *CacheItem wg.Done() }() - if _, ok := sharedCache.Get(key); !ok { + if _, ok := SharedCache.Cache.Get(key); !ok { // If a key does not exist in the LRU cache, delete it from Elasticsearch deleteFromElasticsearch(key) log.Debugf(`ELASTIC - Removed key "%s"`, key)