move elastic crawlers to workers
This commit is contained in:
parent
17c96e45c3
commit
6377b8b6bc
|
@ -2,8 +2,8 @@ package DirectoryCrawler
|
|||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/file"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
@ -31,28 +31,22 @@ type FinishedCrawl struct {
|
|||
}
|
||||
|
||||
type DirectoryCrawler struct {
|
||||
cache *lru.Cache[string, *CacheItem.Item]
|
||||
visited sync.Map
|
||||
wg sync.WaitGroup
|
||||
mu sync.Mutex // lock for the visted map
|
||||
}
|
||||
|
||||
func NewDirectoryCrawler(cache *lru.Cache[string, *CacheItem.Item]) *DirectoryCrawler {
|
||||
func NewDirectoryCrawler() *DirectoryCrawler {
|
||||
return &DirectoryCrawler{
|
||||
cache: cache,
|
||||
visited: sync.Map{},
|
||||
}
|
||||
}
|
||||
|
||||
func (dc *DirectoryCrawler) Get(path string) (*CacheItem.Item, bool) {
|
||||
return dc.cache.Get(path)
|
||||
}
|
||||
|
||||
func (dc *DirectoryCrawler) CleanupDeletedFiles(path string) {
|
||||
dc.visited.Range(func(key, value interface{}) bool {
|
||||
keyStr := key.(string)
|
||||
if isSubpath(file.StripRootDir(path), keyStr) && value.(bool) {
|
||||
dc.cache.Remove(keyStr)
|
||||
SharedCache.Cache.Remove(keyStr)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
@ -63,7 +57,7 @@ func (dc *DirectoryCrawler) AddCacheItem(fullPath string, info os.FileInfo) {
|
|||
item := CacheItem.NewItem(fullPath, info)
|
||||
if item != nil {
|
||||
// Sometimes CacheItem.NewItem will return nil if the path fails its checks
|
||||
dc.cache.Add(strippedPath, item)
|
||||
SharedCache.Cache.Add(strippedPath, item)
|
||||
} else {
|
||||
//log.Errorf("NewItem returned nil for %s", fullPath)
|
||||
}
|
|
@ -2,6 +2,7 @@ package DirectoryCrawler
|
|||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/Workers"
|
||||
"crazyfs/config"
|
||||
"crazyfs/file"
|
||||
|
@ -36,7 +37,7 @@ func (dc *DirectoryCrawler) walkNonRecursiveFunc(fullPath string, dir os.DirEntr
|
|||
return nil
|
||||
}
|
||||
|
||||
func (dc *DirectoryCrawler) Crawl(fullPath string) error {
|
||||
func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.FileInfo, error) error) error {
|
||||
CacheItem.RetardCheck(fullPath)
|
||||
readyToStart := dc.startCrawl(fullPath)
|
||||
if !readyToStart {
|
||||
|
@ -44,6 +45,10 @@ func (dc *DirectoryCrawler) Crawl(fullPath string) error {
|
|||
}
|
||||
defer dc.endCrawl(fullPath)
|
||||
|
||||
if walkFunc == nil {
|
||||
walkFunc = dc.walkRecursiveFunc
|
||||
}
|
||||
|
||||
info, err := os.Lstat(fullPath)
|
||||
if os.IsNotExist(err) {
|
||||
// If the path doesn't exist, just silently exit
|
||||
|
@ -54,20 +59,14 @@ func (dc *DirectoryCrawler) Crawl(fullPath string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
//if !config.FollowSymlinks && info.Mode()&os.ModeSymlink > 0 {
|
||||
// msg := fmt.Sprintf("CRAWL - tried to crawl a symlink (not allowed in config): %s", fullPath)
|
||||
// log.Warnf(msg)
|
||||
// return errors.New(msg)
|
||||
//}
|
||||
//relPath := file.StripRootDir(fullPath)
|
||||
|
||||
relPath := file.StripRootDir(fullPath)
|
||||
|
||||
dc.cache.Remove(relPath)
|
||||
//SharedCache.Cache.Remove(relPath)
|
||||
|
||||
if info.IsDir() {
|
||||
// Get a list of all keys in the cache that belong to this directory
|
||||
keys := make([]string, 0)
|
||||
for _, key := range dc.cache.Keys() {
|
||||
for _, key := range SharedCache.Cache.Keys() {
|
||||
if isSubpath(fullPath, key) {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
@ -75,18 +74,15 @@ func (dc *DirectoryCrawler) Crawl(fullPath string) error {
|
|||
|
||||
// Remove all entries in the cache that belong to this directory, so we can start fresh.
|
||||
for _, key := range keys {
|
||||
dc.cache.Remove(key)
|
||||
SharedCache.Cache.Remove(key)
|
||||
}
|
||||
|
||||
// If the path is a directory, start a walk
|
||||
err := Workers.Walk(fullPath, config.FollowSymlinks, dc.walkRecursiveFunc)
|
||||
err := Workers.Walk(fullPath, config.FollowSymlinks, walkFunc)
|
||||
if err != nil {
|
||||
log.Errorf("CRAWLER - crawl for %s failed: %s", fullPath, err)
|
||||
}
|
||||
|
||||
// TODO: don't think this is needed since we remove all the children of this item
|
||||
// After crawling, remove any keys that are still in the list (these are items that were not found on the filesystem)
|
||||
//dc.CleanupDeletedFiles(fullPath)
|
||||
} else {
|
||||
// If the path is a file, add it to the cache directly
|
||||
dc.AddCacheItem(fullPath, info)
|
||||
|
@ -121,12 +117,12 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item,
|
|||
var item *CacheItem.Item
|
||||
relPath := file.StripRootDir(fullPath)
|
||||
|
||||
dc.cache.Remove(relPath)
|
||||
SharedCache.Cache.Remove(relPath)
|
||||
|
||||
if info.IsDir() {
|
||||
// Get a list of all keys in the cache that belong to this directory
|
||||
keys := make([]string, 0)
|
||||
for _, key := range dc.cache.Keys() {
|
||||
for _, key := range SharedCache.Cache.Keys() {
|
||||
if isSubpath(fullPath, key) {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
@ -134,7 +130,7 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item,
|
|||
|
||||
// Remove all entries in the cache that belong to this directory so we can start fresh
|
||||
for _, key := range keys {
|
||||
dc.cache.Remove(key)
|
||||
SharedCache.Cache.Remove(key)
|
||||
}
|
||||
|
||||
err := filepath.WalkDir(fullPath, dc.walkNonRecursiveFunc)
|
||||
|
@ -142,7 +138,7 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item,
|
|||
log.Errorf("CRAWLER - non-recursive crawl for %s failed: %s", fullPath, err)
|
||||
return nil, err
|
||||
}
|
||||
item, _ = dc.cache.Get(relPath)
|
||||
item, _ = SharedCache.Cache.Get(relPath)
|
||||
} else {
|
||||
item = CacheItem.NewItem(fullPath, info)
|
||||
dc.AddCacheItem(fullPath, info)
|
|
@ -2,6 +2,7 @@ package DirectoryCrawler
|
|||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/config"
|
||||
"crazyfs/file"
|
||||
"os"
|
||||
|
@ -27,7 +28,7 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error
|
|||
}
|
||||
|
||||
// Add the directory to the cache after all of its children have been processed
|
||||
dc.cache.Add(relPath, dirItem)
|
||||
SharedCache.Cache.Add(relPath, dirItem)
|
||||
|
||||
// If the directory is not the root directory, update the parent directory's Children field
|
||||
// This block of code ensures that the parent directory's Children field is always up-to-date with
|
||||
|
@ -36,7 +37,7 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error
|
|||
if fullPath != config.GetConfig().RootDir {
|
||||
parentDir := filepath.Dir(fullPath)
|
||||
strippedParentDir := file.StripRootDir(parentDir)
|
||||
parentItem, found := dc.cache.Get(strippedParentDir)
|
||||
parentItem, found := SharedCache.Cache.Get(strippedParentDir)
|
||||
if found {
|
||||
// Remove the old version of the directory from the parent's Children field
|
||||
newChildren, foundOldDir := removeOldDir(parentItem.Children, relPath)
|
||||
|
@ -46,7 +47,7 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error
|
|||
parentItem.Children = append(newChildren, relPath)
|
||||
}
|
||||
// Update the parent directory in the cache
|
||||
dc.cache.Add(strippedParentDir, parentItem)
|
||||
SharedCache.Cache.Add(strippedParentDir, parentItem)
|
||||
}
|
||||
}
|
||||
} else {
|
|
@ -2,10 +2,10 @@ package ResponseItem
|
|||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/cache/DirectoryCrawler"
|
||||
"crazyfs/DirectoryCrawler"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/config"
|
||||
"crazyfs/logging"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/sirupsen/logrus"
|
||||
"path/filepath"
|
||||
)
|
||||
|
@ -31,7 +31,7 @@ type ResponseItem struct {
|
|||
Cached int64 `json:"cached"`
|
||||
}
|
||||
|
||||
func NewResponseItem(cacheItem *CacheItem.Item, sharedCache *lru.Cache[string, *CacheItem.Item]) *ResponseItem {
|
||||
func NewResponseItem(cacheItem *CacheItem.Item) *ResponseItem {
|
||||
item := &ResponseItem{
|
||||
Path: cacheItem.Path,
|
||||
Name: cacheItem.Name,
|
||||
|
@ -50,7 +50,7 @@ func NewResponseItem(cacheItem *CacheItem.Item, sharedCache *lru.Cache[string, *
|
|||
if len(cacheItem.Children) > 0 { // avoid a null entry for the children key in the JSON
|
||||
var children []*CacheItem.Item
|
||||
for _, child := range cacheItem.Children {
|
||||
childItem, found := sharedCache.Get(child)
|
||||
childItem, found := SharedCache.Cache.Get(child)
|
||||
|
||||
// Do a quick crawl since the path could have been modfied since the last crawl.
|
||||
// This also be triggered if we encounter a broken symlink. We don't check for broken symlinks when scanning
|
||||
|
@ -58,7 +58,7 @@ func NewResponseItem(cacheItem *CacheItem.Item, sharedCache *lru.Cache[string, *
|
|||
if !found {
|
||||
log.Debugf("CRAWLER - %s not in cache, crawling", child)
|
||||
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
||||
item, err := dc.CrawlNoRecursion(filepath.Join(config.GetConfig().RootDir, child))
|
||||
if err != nil {
|
||||
log.Errorf("NewResponseItem - CrawlNoRecursion - %s", err)
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
package SharedCache
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/logging"
|
||||
"errors"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var Cache *lru.Cache[string, *CacheItem.Item]
|
||||
var cacheCreated bool
|
||||
var log *logrus.Logger
|
||||
|
||||
func init() {
|
||||
log = logging.GetLogger()
|
||||
}
|
||||
|
||||
func NewCache(size int) error {
|
||||
if cacheCreated {
|
||||
return errors.New("cache has already been created")
|
||||
}
|
||||
cache, err := lru.New[string, *CacheItem.Item](size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
Cache = cache
|
||||
cacheCreated = true
|
||||
return nil
|
||||
}
|
|
@ -2,20 +2,20 @@ package helpers
|
|||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/DirectoryCrawler"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/cache"
|
||||
"crazyfs/cache/DirectoryCrawler"
|
||||
"github.com/hashicorp/golang-lru/v2"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// HandleFileNotFound if the data is not in the cache, start a new crawler
|
||||
func HandleFileNotFound(relPath string, fullPath string, sharedCache *lru.Cache[string, *CacheItem.Item], w http.ResponseWriter) *CacheItem.Item {
|
||||
func HandleFileNotFound(relPath string, fullPath string, w http.ResponseWriter) *CacheItem.Item {
|
||||
// TODO: implement some sort of backoff or delay for repeated calls to recache the same path.
|
||||
|
||||
log.Debugf("CRAWLER - %s not in cache, crawling", fullPath)
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
||||
|
||||
// Check if this is a symlink. We do this before CrawlNoRecursion() because we want to tell the end user that
|
||||
// we're not going to resolve this symlink.
|
||||
|
@ -34,9 +34,9 @@ func HandleFileNotFound(relPath string, fullPath string, sharedCache *lru.Cache[
|
|||
// a chance to kick of a recursive crawl.
|
||||
go func() {
|
||||
log.Debugf("Starting background recursive crawl for %s", fullPath)
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
||||
start := time.Now()
|
||||
err := dc.Crawl(fullPath)
|
||||
err := dc.Crawl(fullPath, nil)
|
||||
if err != nil {
|
||||
log.Errorf("LIST - background recursive crawl failed: %s", err)
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ func HandleFileNotFound(relPath string, fullPath string, sharedCache *lru.Cache[
|
|||
}
|
||||
|
||||
// Try to get the data from the cache again.
|
||||
item, found := sharedCache.Get(relPath)
|
||||
item, found := SharedCache.Cache.Get(relPath)
|
||||
if !found {
|
||||
// TODO: let's not re-check the disk if the file is still not in the cache. Instead, let's just assume that it doesn't exist.
|
||||
ReturnFake404Msg("path not found", w)
|
||||
|
@ -82,6 +82,6 @@ func HandleFileNotFound(relPath string, fullPath string, sharedCache *lru.Cache[
|
|||
Return500Msg(w)
|
||||
return nil
|
||||
}
|
||||
cache.CheckAndRecache(fullPath, sharedCache)
|
||||
cache.CheckAndRecache(fullPath)
|
||||
return item
|
||||
}
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
package helpers
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/file"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
kzip "github.com/klauspost/compress/zip"
|
||||
"io"
|
||||
"net/http"
|
||||
|
@ -48,16 +47,16 @@ func ZipHandlerCompress(dirPath string, w http.ResponseWriter, r *http.Request)
|
|||
log.Errorf("ZIPSTREM - failed to close zipwriter: %s", err)
|
||||
}
|
||||
}
|
||||
func ZipHandlerCompressMultiple(paths []string, w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func ZipHandlerCompressMultiple(paths []string, w http.ResponseWriter, r *http.Request) {
|
||||
zipWriter := kzip.NewWriter(w)
|
||||
// Walk through each file and add it to the zip
|
||||
for _, fullPath := range paths {
|
||||
relPath := file.StripRootDir(fullPath)
|
||||
|
||||
// Try to get the data from the cache
|
||||
item, found := sharedCache.Get(relPath)
|
||||
item, found := SharedCache.Cache.Get(relPath)
|
||||
if !found {
|
||||
item = HandleFileNotFound(relPath, fullPath, sharedCache, w)
|
||||
item = HandleFileNotFound(relPath, fullPath, w)
|
||||
}
|
||||
if item == nil {
|
||||
// The errors have already been handled in handleFileNotFound() so we're good to just exit
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/api/routes"
|
||||
"crazyfs/api/routes/admin"
|
||||
"crazyfs/api/routes/client"
|
||||
|
@ -10,7 +9,6 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/gorilla/mux"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
|
@ -23,7 +21,7 @@ type Route struct {
|
|||
|
||||
type Routes []Route
|
||||
|
||||
type AppHandler func(http.ResponseWriter, *http.Request, *lru.Cache[string, *CacheItem.Item])
|
||||
type AppHandler func(http.ResponseWriter, *http.Request)
|
||||
|
||||
var httpRoutes = Routes{
|
||||
Route{
|
||||
|
@ -102,7 +100,7 @@ func setHeaders(next http.Handler) http.Handler {
|
|||
})
|
||||
}
|
||||
|
||||
func NewRouter(sharedCache *lru.Cache[string, *CacheItem.Item]) *mux.Router {
|
||||
func NewRouter() *mux.Router {
|
||||
r := mux.NewRouter().StrictSlash(true)
|
||||
for _, route := range httpRoutes {
|
||||
var handler http.Handler
|
||||
|
@ -111,7 +109,7 @@ func NewRouter(sharedCache *lru.Cache[string, *CacheItem.Item]) *mux.Router {
|
|||
currentRoute := route
|
||||
|
||||
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
currentRoute.HandlerFunc(w, r, sharedCache)
|
||||
currentRoute.HandlerFunc(w, r)
|
||||
})
|
||||
handler = setHeaders(handler)
|
||||
handler = logging.LogRequest(handler)
|
||||
|
@ -137,7 +135,7 @@ func NewRouter(sharedCache *lru.Cache[string, *CacheItem.Item]) *mux.Router {
|
|||
}
|
||||
|
||||
func wrongMethod(expectedMethod string, next AppHandler) AppHandler {
|
||||
return func(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
|
|
|
@ -1,18 +1,17 @@
|
|||
package routes
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/api/helpers"
|
||||
"crazyfs/config"
|
||||
"crazyfs/file"
|
||||
"fmt"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func Download(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func Download(w http.ResponseWriter, r *http.Request) {
|
||||
if helpers.CheckInitialCrawl() {
|
||||
helpers.HandleRejectDuringInitialCrawl(w)
|
||||
return
|
||||
|
@ -46,7 +45,7 @@ func Download(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[str
|
|||
}
|
||||
|
||||
// Multiple files, zip them
|
||||
helpers.ZipHandlerCompressMultiple(cleanPaths, w, r, sharedCache)
|
||||
helpers.ZipHandlerCompressMultiple(cleanPaths, w, r)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -67,9 +66,9 @@ func Download(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[str
|
|||
}
|
||||
|
||||
// Try to get the data from the cache
|
||||
item, found := sharedCache.Get(relPath)
|
||||
item, found := SharedCache.Cache.Get(relPath)
|
||||
if !found {
|
||||
item = helpers.HandleFileNotFound(relPath, fullPath, sharedCache, w)
|
||||
item = helpers.HandleFileNotFound(relPath, fullPath, w)
|
||||
}
|
||||
if item == nil {
|
||||
// The errors have already been handled in handleFileNotFound() so we're good to just exit
|
||||
|
@ -105,7 +104,7 @@ func Download(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[str
|
|||
// GetMimeType() returns an empty string if it was a directory.
|
||||
// Update the CacheItem's MIME in the sharedCache.
|
||||
item.MimeType = &mimeType
|
||||
sharedCache.Add(relPath, item)
|
||||
SharedCache.Cache.Add(relPath, item)
|
||||
} else {
|
||||
log.Errorf("Download.go failed to match a condition when checking a file's MIME - %s", fullPath)
|
||||
helpers.Return500Msg(w)
|
||||
|
@ -147,7 +146,7 @@ func Download(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[str
|
|||
// Open the file
|
||||
openFile, err := os.Open(fullPath)
|
||||
if err != nil {
|
||||
sharedCache.Remove(relPath) // remove it from the cache
|
||||
SharedCache.Cache.Remove(relPath) // remove it from the cache
|
||||
helpers.ReturnFake404Msg("file missing from disk, cache out of date", w)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -3,15 +3,15 @@ package routes
|
|||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/ResponseItem"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/api/helpers"
|
||||
"crazyfs/config"
|
||||
"crazyfs/file"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func ListDir(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func ListDir(w http.ResponseWriter, r *http.Request) {
|
||||
if helpers.CheckInitialCrawl() {
|
||||
helpers.HandleRejectDuringInitialCrawl(w)
|
||||
return
|
||||
|
@ -46,16 +46,16 @@ func ListDir(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[stri
|
|||
|
||||
relPath := file.StripRootDir(fullPath)
|
||||
// Try to get the data from the cache
|
||||
cacheItem, found := sharedCache.Get(relPath)
|
||||
cacheItem, found := SharedCache.Cache.Get(relPath)
|
||||
if !found {
|
||||
cacheItem = helpers.HandleFileNotFound(relPath, fullPath, sharedCache, w)
|
||||
cacheItem = helpers.HandleFileNotFound(relPath, fullPath, w)
|
||||
}
|
||||
if cacheItem == nil {
|
||||
return // The errors have already been handled in handleFileNotFound() so we're good to just exit
|
||||
}
|
||||
|
||||
// Create a copy of the cached Item so we don't modify the Item in the cache
|
||||
item := ResponseItem.NewResponseItem(cacheItem, sharedCache)
|
||||
// Create a copy of the cached Item, so we don't modify the Item in the cache
|
||||
item := ResponseItem.NewResponseItem(cacheItem)
|
||||
|
||||
// Get the MIME type of the file if the 'mime' argument is present
|
||||
mime := r.URL.Query().Get("mime")
|
||||
|
@ -79,7 +79,7 @@ func ListDir(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[stri
|
|||
// Update the original cached CacheItem's MIME in the sharedCache
|
||||
cacheItem.MimeType = &mimeType
|
||||
cacheItem.Extension = &ext
|
||||
sharedCache.Add(relPath, cacheItem) // take the address of CacheItem
|
||||
SharedCache.Cache.Add(relPath, cacheItem) // take the address of CacheItem
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"crazyfs/elastic"
|
||||
"crazyfs/logging"
|
||||
"encoding/json"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
@ -16,7 +15,7 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
func SearchFile(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func SearchFile(w http.ResponseWriter, r *http.Request) {
|
||||
if helpers.CheckInitialCrawl() {
|
||||
helpers.HandleRejectDuringInitialCrawl(w)
|
||||
return
|
||||
|
@ -143,7 +142,7 @@ func SearchFile(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[s
|
|||
results = append(results, items...)
|
||||
}
|
||||
} else {
|
||||
results = cache.SearchLRU(queryString, excludeElements, limitResults, sharedCache)
|
||||
results = cache.SearchLRU(queryString, excludeElements, limitResults)
|
||||
}
|
||||
|
||||
if folderSorting == "folders" {
|
||||
|
|
|
@ -2,7 +2,7 @@ package routes
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/api/helpers"
|
||||
"crazyfs/cache"
|
||||
"crazyfs/config"
|
||||
|
@ -10,7 +10,6 @@ import (
|
|||
"crazyfs/logging"
|
||||
"fmt"
|
||||
"github.com/disintegration/imaging"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/nfnt/resize"
|
||||
"strconv"
|
||||
|
||||
|
@ -22,7 +21,7 @@ import (
|
|||
"strings"
|
||||
)
|
||||
|
||||
func Thumbnail(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func Thumbnail(w http.ResponseWriter, r *http.Request) {
|
||||
if cache.InitialCrawlInProgress && !config.GetConfig().HttpAllowDuringInitialCrawl {
|
||||
helpers.HandleRejectDuringInitialCrawl(w)
|
||||
returnDummyPNG(w)
|
||||
|
@ -63,9 +62,9 @@ func Thumbnail(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[st
|
|||
}
|
||||
|
||||
// Try to get the data from the cache
|
||||
item, found := sharedCache.Get(relPath)
|
||||
item, found := SharedCache.Cache.Get(relPath)
|
||||
if !found {
|
||||
item = helpers.HandleFileNotFound(relPath, fullPath, sharedCache, w)
|
||||
item = helpers.HandleFileNotFound(relPath, fullPath, w)
|
||||
}
|
||||
if item == nil {
|
||||
returnDummyPNG(w)
|
||||
|
@ -91,7 +90,7 @@ func Thumbnail(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[st
|
|||
// Update the CacheItem's MIME in the sharedCache
|
||||
item.MimeType = &mimeType
|
||||
item.Extension = &ext
|
||||
sharedCache.Add(relPath, item)
|
||||
SharedCache.Cache.Add(relPath, item)
|
||||
|
||||
// Check if the file is an image
|
||||
if !strings.HasPrefix(mimeType, "image/") {
|
||||
|
|
|
@ -1,18 +1,17 @@
|
|||
package admin
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/api/helpers"
|
||||
"crazyfs/config"
|
||||
"crazyfs/elastic"
|
||||
"crypto/sha256"
|
||||
"crypto/subtle"
|
||||
"encoding/json"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func AdminCacheInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func AdminCacheInfo(w http.ResponseWriter, r *http.Request) {
|
||||
username, password, ok := r.BasicAuth()
|
||||
if ok {
|
||||
usernameHash := sha256.Sum256([]byte(username))
|
||||
|
@ -26,7 +25,7 @@ func AdminCacheInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cac
|
|||
helpers.Return401Msg("unauthorized", w)
|
||||
return
|
||||
} else {
|
||||
cacheLen := sharedCache.Len()
|
||||
cacheLen := SharedCache.Cache.Len()
|
||||
|
||||
response := map[string]interface{}{
|
||||
"cachedItems": cacheLen,
|
||||
|
|
|
@ -1,19 +1,17 @@
|
|||
package admin
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/DirectoryCrawler"
|
||||
"crazyfs/Workers"
|
||||
"crazyfs/api/helpers"
|
||||
"crazyfs/cache/DirectoryCrawler"
|
||||
"crazyfs/config"
|
||||
"crypto/sha256"
|
||||
"crypto/subtle"
|
||||
"encoding/json"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func AdminCrawlsInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func AdminCrawlsInfo(w http.ResponseWriter, r *http.Request) {
|
||||
username, password, ok := r.BasicAuth()
|
||||
if ok {
|
||||
usernameHash := sha256.Sum256([]byte(username))
|
||||
|
|
|
@ -1,18 +1,16 @@
|
|||
package admin
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/api/helpers"
|
||||
"crazyfs/cache"
|
||||
"crazyfs/config"
|
||||
"crazyfs/file"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func AdminReCache(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func AdminReCache(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
helpers.Return400Msg("this is a POST endpoint", w)
|
||||
return
|
||||
|
@ -48,7 +46,7 @@ func AdminReCache(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache
|
|||
"error": nil,
|
||||
}
|
||||
// Check and re-cache the directory
|
||||
err = cache.Recache(fullPath, sharedCache)
|
||||
err = cache.Recache(fullPath)
|
||||
if err != nil {
|
||||
response["message"] = fmt.Sprintf("recache failed")
|
||||
response["error"] = err.Error()
|
||||
|
|
|
@ -1,17 +1,15 @@
|
|||
package admin
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/api/helpers"
|
||||
"crazyfs/config"
|
||||
"crazyfs/logging"
|
||||
"crypto/sha256"
|
||||
"crypto/subtle"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func AdminSysInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func AdminSysInfo(w http.ResponseWriter, r *http.Request) {
|
||||
username, password, ok := r.BasicAuth()
|
||||
if ok {
|
||||
usernameHash := sha256.Sum256([]byte(username))
|
||||
|
|
|
@ -1,17 +1,15 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/DirectoryCrawler"
|
||||
"crazyfs/api/helpers"
|
||||
"crazyfs/cache"
|
||||
"crazyfs/cache/DirectoryCrawler"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// TODO: show the time the initial crawl started
|
||||
|
||||
func HealthCheck(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func HealthCheck(w http.ResponseWriter, r *http.Request) {
|
||||
response := map[string]interface{}{}
|
||||
response["scanRunning"] = DirectoryCrawler.GetTotalActiveCrawls() > 0
|
||||
response["initialScanRunning"] = cache.InitialCrawlInProgress
|
||||
|
|
|
@ -1,14 +1,12 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/api/helpers"
|
||||
"crazyfs/config"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func RestrictedDownloadPaths(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func RestrictedDownloadPaths(w http.ResponseWriter, r *http.Request) {
|
||||
response := config.GetConfig().RestrictedDownloadPaths
|
||||
helpers.WriteJsonResponse(response, false, w, r)
|
||||
}
|
||||
|
|
|
@ -1,36 +1,27 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/DirectoryCrawler"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/Workers"
|
||||
"crazyfs/cache/DirectoryCrawler"
|
||||
"crazyfs/config"
|
||||
"crazyfs/logging"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/sirupsen/logrus"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var log *logrus.Logger
|
||||
|
||||
func init() {
|
||||
log = logging.GetLogger()
|
||||
}
|
||||
|
||||
func StartCrawler(sharedCache *lru.Cache[string, *CacheItem.Item]) error {
|
||||
func StartCrawler() error {
|
||||
var wg sync.WaitGroup
|
||||
crawlerChan := make(chan struct{}, config.GetConfig().DirectoryCrawlers)
|
||||
|
||||
go startCrawl(sharedCache, &wg, crawlerChan)
|
||||
go startCrawl(&wg, crawlerChan)
|
||||
|
||||
ticker := time.NewTicker(60 * time.Second)
|
||||
go logCacheStatus("CACHE STATUS", ticker, sharedCache, log.Debugf)
|
||||
go logCacheStatus("CACHE STATUS", ticker, log.Debugf)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func startCrawl(sharedCache *lru.Cache[string, *CacheItem.Item], wg *sync.WaitGroup, crawlerChan chan struct{}) {
|
||||
func startCrawl(wg *sync.WaitGroup, crawlerChan chan struct{}) {
|
||||
ticker := time.NewTicker(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
|
@ -41,26 +32,26 @@ func startCrawl(sharedCache *lru.Cache[string, *CacheItem.Item], wg *sync.WaitGr
|
|||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
||||
log.Infoln("CRAWLER - Starting a crawl...")
|
||||
start := time.Now()
|
||||
err := dc.Crawl(config.GetConfig().RootDir)
|
||||
err := dc.Crawl(config.GetConfig().RootDir, nil)
|
||||
if err != nil {
|
||||
log.Warnf("CRAWLER - Crawl failed: %s", err)
|
||||
} else {
|
||||
duration := time.Since(start).Round(time.Second)
|
||||
log.Infof("CRAWLER - Crawl completed in %s", duration)
|
||||
log.Debugf("%d/%d items in the cache.", config.GetConfig().CacheSize, len(sharedCache.Keys()))
|
||||
log.Debugf("%d/%d items in the cache.", config.GetConfig().CacheSize, len(SharedCache.Cache.Keys()))
|
||||
}
|
||||
<-crawlerChan
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func logCacheStatus(msg string, ticker *time.Ticker, sharedCache *lru.Cache[string, *CacheItem.Item], logFn func(format string, args ...interface{})) {
|
||||
func logCacheStatus(msg string, ticker *time.Ticker, logFn func(format string, args ...interface{})) {
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
logFn("%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d.",
|
||||
msg, len(sharedCache.Keys()), config.GetConfig().CacheSize, Workers.BusyWorkers, Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls())
|
||||
msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, Workers.BusyWorkers, Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"crazyfs/logging"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var log *logrus.Logger
|
||||
|
||||
func init() {
|
||||
log = logging.GetLogger()
|
||||
}
|
|
@ -1,11 +1,9 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/cache/DirectoryCrawler"
|
||||
"crazyfs/DirectoryCrawler"
|
||||
"crazyfs/config"
|
||||
"crazyfs/logging"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -15,18 +13,18 @@ func init() {
|
|||
InitialCrawlInProgress = false
|
||||
}
|
||||
|
||||
func InitialCrawl(sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func InitialCrawl() {
|
||||
log = logging.GetLogger()
|
||||
|
||||
log.Infof("INITIAL CRAWL - starting the crawl for %s", config.GetConfig().RootDir)
|
||||
|
||||
ticker := time.NewTicker(3 * time.Second)
|
||||
go logCacheStatus("INITIAL CRAWL", ticker, sharedCache, log.Infof)
|
||||
go logCacheStatus("INITIAL CRAWL", ticker, log.Infof)
|
||||
|
||||
InitialCrawlInProgress = true
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
||||
//start := time.Now()
|
||||
err := dc.Crawl(config.GetConfig().RootDir)
|
||||
err := dc.Crawl(config.GetConfig().RootDir, nil)
|
||||
if err != nil {
|
||||
log.Errorf("LIST - background recursive crawl failed: %s", err)
|
||||
}
|
||||
|
|
|
@ -2,11 +2,11 @@ package cache
|
|||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/cache/DirectoryCrawler"
|
||||
"crazyfs/DirectoryCrawler"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/config"
|
||||
"crazyfs/file"
|
||||
"errors"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
@ -18,15 +18,15 @@ func InitRecacheSemaphore(limit int) {
|
|||
sem = make(chan struct{}, limit)
|
||||
}
|
||||
|
||||
func CheckAndRecache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
item, found := sharedCache.Get(path)
|
||||
func CheckAndRecache(path string) {
|
||||
item, found := SharedCache.Cache.Get(path)
|
||||
if found && time.Now().UnixNano()/int64(time.Millisecond)-item.Cached > int64(config.GetConfig().CacheTime)*60*1000 {
|
||||
log.Debugf("Re-caching: %s", path)
|
||||
sem <- struct{}{} // acquire a token
|
||||
go func() {
|
||||
defer func() { <-sem }() // release the token when done
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
|
||||
err := dc.Crawl(path)
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
||||
err := dc.Crawl(path, nil)
|
||||
if err != nil {
|
||||
log.Errorf("RECACHE ERROR: %s", err.Error())
|
||||
return
|
||||
|
@ -35,8 +35,8 @@ func CheckAndRecache(path string, sharedCache *lru.Cache[string, *CacheItem.Item
|
|||
}
|
||||
}
|
||||
|
||||
func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) error {
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
|
||||
func Recache(path string) error {
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
||||
if dc.IsCrawlActive(path) {
|
||||
return errors.New("rejecting crawl, already in progress for this path")
|
||||
}
|
||||
|
@ -46,8 +46,8 @@ func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) error
|
|||
sem <- struct{}{} // acquire a token
|
||||
go func() {
|
||||
defer func() { <-sem }() // release the token when done
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
|
||||
err := dc.Crawl(path)
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
||||
err := dc.Crawl(path, nil)
|
||||
if err != nil {
|
||||
log.Errorf("RECACHE ERROR: %s", err.Error())
|
||||
return
|
||||
|
@ -56,7 +56,7 @@ func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) error
|
|||
// Get the parent directory from the cache
|
||||
parentDir := filepath.Dir(path)
|
||||
parentDirRel := file.StripRootDir(parentDir)
|
||||
parentItem, found := sharedCache.Get(parentDirRel)
|
||||
parentItem, found := SharedCache.Cache.Get(parentDirRel)
|
||||
if found {
|
||||
// Remove the old subdirectory from the parent directory's Children field
|
||||
for i, child := range parentItem.Children {
|
||||
|
@ -85,7 +85,7 @@ func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) error
|
|||
// Assign the newChildren slice to the Children field
|
||||
parentItem.Children = newChildren
|
||||
// Update the parent directory in the cache
|
||||
sharedCache.Add(parentDir, parentItem)
|
||||
SharedCache.Cache.Add(parentDir, parentItem)
|
||||
}
|
||||
} else if !CacheItem.PathOutsideRoot(parentDir) {
|
||||
// If the parent directory isn't in the cache, crawl it
|
||||
|
|
|
@ -3,13 +3,13 @@ package cache
|
|||
import (
|
||||
"bytes"
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/config"
|
||||
"encoding/gob"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func SearchLRU(queryString string, excludeElements []string, limitResults int, sharedCache *lru.Cache[string, *CacheItem.Item]) []*CacheItem.Item {
|
||||
func SearchLRU(queryString string, excludeElements []string, limitResults int) []*CacheItem.Item {
|
||||
results := make([]*CacheItem.Item, 0)
|
||||
|
||||
const maxGoroutines = 100
|
||||
|
@ -17,10 +17,10 @@ func SearchLRU(queryString string, excludeElements []string, limitResults int, s
|
|||
// Create a buffered channel as a semaphore
|
||||
sem := make(chan struct{}, maxGoroutines)
|
||||
|
||||
resultsChan := make(chan *CacheItem.Item, len(sharedCache.Keys()))
|
||||
resultsChan := make(chan *CacheItem.Item, len(SharedCache.Cache.Keys()))
|
||||
|
||||
for _, key := range sharedCache.Keys() {
|
||||
searchKey(key, queryString, excludeElements, sem, resultsChan, sharedCache)
|
||||
for _, key := range SharedCache.Cache.Keys() {
|
||||
searchKey(key, queryString, excludeElements, sem, resultsChan)
|
||||
}
|
||||
|
||||
// Wait for all goroutines to finish
|
||||
|
@ -28,7 +28,7 @@ func SearchLRU(queryString string, excludeElements []string, limitResults int, s
|
|||
sem <- struct{}{}
|
||||
}
|
||||
|
||||
for range sharedCache.Keys() {
|
||||
for range SharedCache.Cache.Keys() {
|
||||
item := <-resultsChan
|
||||
if item != nil {
|
||||
results = append(results, item)
|
||||
|
@ -41,7 +41,7 @@ func SearchLRU(queryString string, excludeElements []string, limitResults int, s
|
|||
return results
|
||||
}
|
||||
|
||||
func searchKey(key string, queryString string, excludeElements []string, sem chan struct{}, resultsChan chan *CacheItem.Item, sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func searchKey(key string, queryString string, excludeElements []string, sem chan struct{}, resultsChan chan *CacheItem.Item) {
|
||||
// Acquire a token
|
||||
sem <- struct{}{}
|
||||
|
||||
|
@ -49,7 +49,7 @@ func searchKey(key string, queryString string, excludeElements []string, sem cha
|
|||
// Release the token at the end
|
||||
defer func() { <-sem }()
|
||||
|
||||
cacheItem, found := sharedCache.Get(key)
|
||||
cacheItem, found := SharedCache.Cache.Get(key)
|
||||
if !found {
|
||||
resultsChan <- nil
|
||||
return
|
||||
|
|
|
@ -38,7 +38,6 @@ type Config struct {
|
|||
ElasticsearchIndex string
|
||||
ElasticsearchSyncThreads int
|
||||
ElasticsearchExcludePatterns []string
|
||||
ElasticsearchAllowConcurrentSyncs bool
|
||||
ElasticsearchFullSyncOnStart bool
|
||||
ElasticsearchDefaultQueryField string
|
||||
HTTPRealIPHeader string
|
||||
|
@ -137,7 +136,6 @@ func SetConfig(configFile string) (*Config, error) {
|
|||
ElasticsearchIndex: viper.GetString("elasticsearch_index"),
|
||||
ElasticsearchSyncThreads: viper.GetInt("elasticsearch_sync_threads"),
|
||||
ElasticsearchExcludePatterns: viper.GetStringSlice("elasticsearch_exclude_patterns"),
|
||||
ElasticsearchAllowConcurrentSyncs: viper.GetBool("elasticsearch_allow_concurrent_syncs"),
|
||||
ElasticsearchFullSyncOnStart: viper.GetBool("elasticsearch_full_sync_on_start"),
|
||||
ElasticsearchDefaultQueryField: viper.GetString("elasticsearch_default_query_field"),
|
||||
HTTPRealIPHeader: viper.GetString("http_real_ip_header"),
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/Workers"
|
||||
"crazyfs/api"
|
||||
"crazyfs/cache"
|
||||
|
@ -12,7 +12,6 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"github.com/elastic/go-elasticsearch/v8"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/sirupsen/logrus"
|
||||
"net/http"
|
||||
//_ "net/http/pprof" // for profiling
|
||||
|
@ -84,7 +83,7 @@ func main() {
|
|||
log.Fatalf("Failed to load config file: %s", err)
|
||||
}
|
||||
|
||||
sharedCache, err := lru.New[string, *CacheItem.Item](cfg.CacheSize)
|
||||
err = SharedCache.NewCache(cfg.CacheSize)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
@ -99,7 +98,7 @@ func main() {
|
|||
cache.InitRecacheSemaphore(cfg.CacheRecacheCrawlerLimit)
|
||||
|
||||
// Start the webserver before doing the long crawl
|
||||
r := api.NewRouter(sharedCache)
|
||||
r := api.NewRouter()
|
||||
//log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", cfg.HTTPPort), r))
|
||||
go func() {
|
||||
err := http.ListenAndServe(fmt.Sprintf(":%s", cfg.HTTPPort), r)
|
||||
|
@ -112,14 +111,14 @@ func main() {
|
|||
if cliArgs.initialCrawl || cfg.InitialCrawl {
|
||||
log.Infoln("Preforming initial crawl...")
|
||||
start := time.Now()
|
||||
cache.InitialCrawl(sharedCache)
|
||||
cache.InitialCrawl()
|
||||
duration := time.Since(start).Round(time.Second)
|
||||
keys := sharedCache.Keys()
|
||||
keys := SharedCache.Cache.Keys()
|
||||
config.InitialCrawlElapsed = int(duration)
|
||||
log.Infof("Initial crawl completed in %s. %d items added to the cache.", duration, len(keys))
|
||||
}
|
||||
|
||||
err = cache.StartCrawler(sharedCache)
|
||||
err = cache.StartCrawler()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to start timed crawler process: %s", err)
|
||||
}
|
||||
|
@ -144,7 +143,7 @@ func main() {
|
|||
elastic.ElasticClient = es
|
||||
|
||||
if cfg.ElasticsearchSyncEnable && !cliArgs.disableElasticSync {
|
||||
go elastic.ElasticsearchThread(sharedCache)
|
||||
go elastic.ElasticsearchThread()
|
||||
log.Info("Started the background Elasticsearch sync thread.")
|
||||
} else {
|
||||
log.Info("The background Elasticsearch sync thread is disabled.")
|
||||
|
|
|
@ -1,15 +1,12 @@
|
|||
package elastic
|
||||
|
||||
import (
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/DirectoryCrawler"
|
||||
"crazyfs/config"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func ElasticsearchThread(sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func ElasticsearchThread() {
|
||||
createCrazyfsIndex()
|
||||
|
||||
// Test connection to Elastic.
|
||||
|
@ -20,95 +17,38 @@ func ElasticsearchThread(sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
|||
}
|
||||
log.Infof(`ELASTIC - index "%s" contains %d items.`, config.GetConfig().ElasticsearchIndex, esSize)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
sem := make(chan bool, config.GetConfig().ElasticsearchSyncThreads)
|
||||
|
||||
// Run a partial sync at startup, unless configured to run a full one.
|
||||
syncElasticsearch(sharedCache, &wg, sem, config.GetConfig().ElasticsearchFullSyncOnStart)
|
||||
syncElasticsearch()
|
||||
|
||||
ticker := time.NewTicker(time.Duration(config.GetConfig().ElasticsearchSyncInterval) * time.Second)
|
||||
fullSyncTicker := time.NewTicker(time.Duration(config.GetConfig().ElasticsearchFullSyncInterval) * time.Second)
|
||||
|
||||
var mutex sync.Mutex
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if !config.GetConfig().ElasticsearchAllowConcurrentSyncs {
|
||||
mutex.Lock()
|
||||
}
|
||||
syncElasticsearch(sharedCache, &wg, sem, false)
|
||||
if !config.GetConfig().ElasticsearchAllowConcurrentSyncs {
|
||||
mutex.Unlock()
|
||||
}
|
||||
case <-fullSyncTicker.C:
|
||||
if !config.GetConfig().ElasticsearchAllowConcurrentSyncs {
|
||||
mutex.Lock()
|
||||
}
|
||||
syncElasticsearch(sharedCache, &wg, sem, true)
|
||||
if !config.GetConfig().ElasticsearchAllowConcurrentSyncs {
|
||||
mutex.Unlock()
|
||||
}
|
||||
syncElasticsearch()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: make this use workers instead of starting a million threads
|
||||
// TODO: have the workers exit when the sync job is finished
|
||||
func syncElasticsearch(sharedCache *lru.Cache[string, *CacheItem.Item], wg *sync.WaitGroup, sem chan bool, fullSync bool) {
|
||||
var syncType string
|
||||
var esContents []string
|
||||
if fullSync {
|
||||
ElasticRefreshSyncRunning = true
|
||||
syncType = "full refresh"
|
||||
} else {
|
||||
ElasticNewSyncRunning = true
|
||||
syncType = "refresh"
|
||||
func syncElasticsearch() {
|
||||
log.Infof("ELASTIC - started syncing.")
|
||||
start := time.Now()
|
||||
|
||||
var err error
|
||||
esContents, err = getPathsFromIndex()
|
||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
||||
err := dc.Crawl(config.GetConfig().RootDir, addToElasticsearch)
|
||||
if err != nil {
|
||||
log.Errorf("ELASTIC - Failed to read the index: %s", err)
|
||||
log.Errorf("ELASTIC - crawl failed: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
log.Infof("ELASTIC - starting a %s sync.", syncType)
|
||||
|
||||
start := time.Now()
|
||||
for _, key := range sharedCache.Keys() {
|
||||
wg.Add(1)
|
||||
go func(key string) {
|
||||
defer wg.Done()
|
||||
sem <- true
|
||||
cacheItem, found := sharedCache.Get(key)
|
||||
if !found {
|
||||
log.Fatalf(`ELASTICSEARCH - Could not fetch item "%s" from the LRU cache!`, key)
|
||||
} else {
|
||||
if !shouldExclude(key, config.GetConfig().ElasticsearchExcludePatterns) {
|
||||
if fullSync {
|
||||
addToElasticsearch(cacheItem)
|
||||
} else if !slices.Contains(esContents, key) {
|
||||
addToElasticsearch(cacheItem)
|
||||
}
|
||||
} else {
|
||||
deleteFromElasticsearch(key) // clean up
|
||||
}
|
||||
}
|
||||
<-sem
|
||||
}(key)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// TODO: use workers for this
|
||||
log.Debugln("ELASTIC - Checking for removed items...")
|
||||
removeStaleItemsFromElasticsearch(sharedCache)
|
||||
|
||||
if fullSync {
|
||||
ElasticRefreshSyncRunning = false
|
||||
} else {
|
||||
ElasticNewSyncRunning = false
|
||||
}
|
||||
removeStaleItemsFromElasticsearch()
|
||||
|
||||
duration := time.Since(start)
|
||||
log.Infof("ELASTIC - %s sync finished in %s", syncType, duration)
|
||||
log.Infof("ELASTIC - sync finished in %s", duration)
|
||||
}
|
||||
|
||||
func logElasticConnError(err error) {
|
||||
|
|
|
@ -4,12 +4,30 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/config"
|
||||
"crazyfs/file"
|
||||
"encoding/json"
|
||||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||||
"os"
|
||||
)
|
||||
|
||||
func addToElasticsearch(item *CacheItem.Item) {
|
||||
func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) error {
|
||||
key := file.StripRootDir(fullPath)
|
||||
cacheItem, found := SharedCache.Cache.Get(key)
|
||||
if !found {
|
||||
log.Fatalf(`ELASTICSEARCH - Could not fetch item "%s" from the LRU cache!`, key)
|
||||
} else {
|
||||
if !shouldExclude(key, config.GetConfig().ElasticsearchExcludePatterns) {
|
||||
preformAddToElasticsearch(cacheItem)
|
||||
} else {
|
||||
deleteFromElasticsearch(key) // clean up
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func preformAddToElasticsearch(item *CacheItem.Item) {
|
||||
log.Debugf(`ELASTIC - Adding: "%s"`, item.Path)
|
||||
prepareCacheItem(item)
|
||||
data, err := json.Marshal(item)
|
||||
|
@ -42,10 +60,10 @@ func addToElasticsearch(item *CacheItem.Item) {
|
|||
|
||||
// prepareCacheItem is used to get an item ready to insert into Elastic.
|
||||
func prepareCacheItem(item *CacheItem.Item) {
|
||||
// We don't care about the children and this field's length may cause issues.
|
||||
// We don't care about the children, and this field's length may cause issues.
|
||||
item.Children = nil
|
||||
|
||||
// Length of this one also may cause issues.
|
||||
// The length of this one also may cause issues.
|
||||
item.Content = ""
|
||||
|
||||
// Don't need to return anything since `item` is a pointer.
|
||||
|
|
|
@ -2,15 +2,14 @@ package elastic
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crazyfs/CacheItem"
|
||||
"crazyfs/SharedCache"
|
||||
"crazyfs/config"
|
||||
"encoding/json"
|
||||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func removeStaleItemsFromElasticsearch(sharedCache *lru.Cache[string, *CacheItem.Item]) {
|
||||
func removeStaleItemsFromElasticsearch() {
|
||||
// Retrieve all keys from Elasticsearch
|
||||
keys, err := getPathsFromIndex()
|
||||
if err != nil {
|
||||
|
@ -39,7 +38,7 @@ func removeStaleItemsFromElasticsearch(sharedCache *lru.Cache[string, *CacheItem
|
|||
wg.Done()
|
||||
}()
|
||||
|
||||
if _, ok := sharedCache.Get(key); !ok {
|
||||
if _, ok := SharedCache.Cache.Get(key); !ok {
|
||||
// If a key does not exist in the LRU cache, delete it from Elasticsearch
|
||||
deleteFromElasticsearch(key)
|
||||
log.Debugf(`ELASTIC - Removed key "%s"`, key)
|
||||
|
|
Loading…
Reference in New Issue