redo the job queue again and reorganize the workers
This commit is contained in:
parent
88fd63bfb9
commit
8a21665e83
|
@ -3,15 +3,13 @@ package CacheItem
|
||||||
import (
|
import (
|
||||||
"crazyfs/config"
|
"crazyfs/config"
|
||||||
"crazyfs/file"
|
"crazyfs/file"
|
||||||
"fmt"
|
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewItem(fullPath string, info os.FileInfo) *Item {
|
func NewItem(fullPath string, info os.FileInfo) *Item {
|
||||||
RetardCheck(fullPath)
|
file.RetardCheck(fullPath)
|
||||||
if config.GetConfig().CachePrintNew {
|
if config.GetConfig().CachePrintNew {
|
||||||
log.Debugf("CACHEITEM:New - New cache item: %s", fullPath)
|
log.Debugf("CACHEITEM:New - New cache item: %s", fullPath)
|
||||||
}
|
}
|
||||||
|
@ -86,14 +84,3 @@ func NewItem(fullPath string, info os.FileInfo) *Item {
|
||||||
Encoding: &encoding,
|
Encoding: &encoding,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func PathOutsideRoot(fullPath string) bool {
|
|
||||||
return !strings.HasPrefix(fullPath, config.GetConfig().RootDir)
|
|
||||||
}
|
|
||||||
|
|
||||||
// RetardCheck makes sure we never do anything outside the root dir.
|
|
||||||
func RetardCheck(fullPath string) {
|
|
||||||
if PathOutsideRoot(fullPath) {
|
|
||||||
panic(fmt.Sprintf("NewItem was not passed an absolute path. The path must start with the RootDir (%s). Failing path: %s", config.GetConfig().RootDir, fullPath))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
package Crawlers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crazyfs/Workers"
|
||||||
|
"crazyfs/config"
|
||||||
|
"crazyfs/globals"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
func InitializeDirectoryCrawlerWorkers() *globals.DcWorkers {
|
||||||
|
if globals.DirectoryCrawlers != nil {
|
||||||
|
panic("DirectoryCrawlers has already been defined!")
|
||||||
|
}
|
||||||
|
workers := Workers.InitializeWorkers(directoryCrawlerWorker)
|
||||||
|
d := &globals.DcWorkers{}
|
||||||
|
d.Queue = workers.Queue
|
||||||
|
d.BusyWorkers = workers.BusyWorkers
|
||||||
|
globals.DirectoryCrawlers = d
|
||||||
|
log.Debugf("CRAWLERS - Started %d directory crawler workers.", config.GetConfig().DirectoryCrawlers)
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
func directoryCrawlerWorker(w *Workers.CrawlWorkers) {
|
||||||
|
for {
|
||||||
|
job := w.Queue.GetJob()
|
||||||
|
atomic.AddInt32(&w.BusyWorkers, 1)
|
||||||
|
|
||||||
|
err := job.Walker.ProcessPath(job.StartPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("WORKER - %s - %s", job.StartPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
job.Walker.Wg.Done()
|
||||||
|
atomic.AddInt32(&w.BusyWorkers, -1)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,44 @@
|
||||||
|
package Crawlers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crazyfs/SharedCache"
|
||||||
|
"crazyfs/Workers"
|
||||||
|
"crazyfs/config"
|
||||||
|
"crazyfs/elastic"
|
||||||
|
"crazyfs/globals"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
func InitializeElasticCrawlerWorkers() *globals.DcWorkers {
|
||||||
|
if globals.ElasticCrawlers != nil {
|
||||||
|
panic("ElasticCrawlers has already been defined!")
|
||||||
|
}
|
||||||
|
workers := Workers.InitializeWorkers(elasticDeleteWorker)
|
||||||
|
d := &globals.DcWorkers{}
|
||||||
|
d.Queue = workers.Queue
|
||||||
|
d.BusyWorkers = workers.BusyWorkers
|
||||||
|
globals.ElasticCrawlers = d
|
||||||
|
log.Debugf("CRAWLERS - Started %d elastic sync workers.", config.GetConfig().ElasticsearchSyncThreads)
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
func elasticDeleteWorker(w *Workers.CrawlWorkers) {
|
||||||
|
for {
|
||||||
|
job := w.Queue.GetJob()
|
||||||
|
atomic.AddInt32(&w.BusyWorkers, 1)
|
||||||
|
|
||||||
|
if _, ok := SharedCache.Cache.Get(job.StartPath); !ok {
|
||||||
|
// If a key in Elastic does not exist in the LRU cache, delete it from Elastic.
|
||||||
|
e := *job.Extra
|
||||||
|
key := e["key"].(string)
|
||||||
|
err := elastic.DeleteFromElasticsearch(key)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf(`ELASTIC:Delete - Error deleting key "%s" - %s`, key, err)
|
||||||
|
} else {
|
||||||
|
log.Debugf(`ELASTIC:Delete - Deleted path: "%s"`, job.StartPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic.AddInt32(&w.BusyWorkers, -1)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
package Crawlers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crazyfs/logging"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log *logrus.Logger
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
log = logging.GetLogger()
|
||||||
|
}
|
|
@ -3,6 +3,7 @@ package DirectoryCrawler
|
||||||
import (
|
import (
|
||||||
"crazyfs/CacheItem"
|
"crazyfs/CacheItem"
|
||||||
"crazyfs/SharedCache"
|
"crazyfs/SharedCache"
|
||||||
|
"crazyfs/Walk"
|
||||||
"crazyfs/file"
|
"crazyfs/file"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -35,11 +36,13 @@ type FinishedCrawl struct {
|
||||||
type DirectoryCrawler struct {
|
type DirectoryCrawler struct {
|
||||||
visited sync.Map
|
visited sync.Map
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
queue *Walk.JobQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDirectoryCrawler() *DirectoryCrawler {
|
func NewDirectoryCrawler(queue *Walk.JobQueue) *DirectoryCrawler {
|
||||||
return &DirectoryCrawler{
|
return &DirectoryCrawler{
|
||||||
visited: sync.Map{},
|
visited: sync.Map{},
|
||||||
|
queue: queue,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ package DirectoryCrawler
|
||||||
import (
|
import (
|
||||||
"crazyfs/CacheItem"
|
"crazyfs/CacheItem"
|
||||||
"crazyfs/SharedCache"
|
"crazyfs/SharedCache"
|
||||||
"crazyfs/Workers"
|
"crazyfs/Walk"
|
||||||
"crazyfs/config"
|
"crazyfs/config"
|
||||||
"crazyfs/file"
|
"crazyfs/file"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -16,7 +16,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (dc *DirectoryCrawler) walkRecursiveFunc(fullPath string, info os.FileInfo, err error) error {
|
func (dc *DirectoryCrawler) walkRecursiveFunc(fullPath string, info os.FileInfo, err error) error {
|
||||||
CacheItem.RetardCheck(fullPath)
|
file.RetardCheck(fullPath)
|
||||||
processErr := dc.processPath(fullPath, info)
|
processErr := dc.processPath(fullPath, info)
|
||||||
if processErr != nil {
|
if processErr != nil {
|
||||||
log.Errorf(`CRAWLER:walkRecursiveFunc - failed on "%s": %s`, fullPath, processErr)
|
log.Errorf(`CRAWLER:walkRecursiveFunc - failed on "%s": %s`, fullPath, processErr)
|
||||||
|
@ -26,7 +26,7 @@ func (dc *DirectoryCrawler) walkRecursiveFunc(fullPath string, info os.FileInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DirectoryCrawler) walkNonRecursiveFunc(fullPath string, dir os.DirEntry, err error) error {
|
func (dc *DirectoryCrawler) walkNonRecursiveFunc(fullPath string, dir os.DirEntry, err error) error {
|
||||||
CacheItem.RetardCheck(fullPath)
|
file.RetardCheck(fullPath)
|
||||||
info, infoErr := dir.Info()
|
info, infoErr := dir.Info()
|
||||||
if infoErr != nil {
|
if infoErr != nil {
|
||||||
log.Errorf(`CRAWLER:walkNonRecursiveFunc - Get info failed on "%s": %s`, fullPath, infoErr)
|
log.Errorf(`CRAWLER:walkNonRecursiveFunc - Get info failed on "%s": %s`, fullPath, infoErr)
|
||||||
|
@ -41,7 +41,7 @@ func (dc *DirectoryCrawler) walkNonRecursiveFunc(fullPath string, dir os.DirEntr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.FileInfo, error) error) error {
|
func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.FileInfo, error) error) error {
|
||||||
CacheItem.RetardCheck(fullPath)
|
file.RetardCheck(fullPath)
|
||||||
|
|
||||||
// Set default value.
|
// Set default value.
|
||||||
if walkFunc == nil {
|
if walkFunc == nil {
|
||||||
|
@ -93,7 +93,7 @@ func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.File
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the path is a directory, start a walk
|
// If the path is a directory, start a walk
|
||||||
err := Workers.Walk(fullPath, config.FollowSymlinks, walkFunc)
|
err := Walk.Walk(fullPath, config.FollowSymlinks, walkFunc, dc.queue)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf(`CRAWLER:Crawl - Crawl for "%s" failed: %s`, fullPath, err)
|
log.Errorf(`CRAWLER:Crawl - Crawl for "%s" failed: %s`, fullPath, err)
|
||||||
}
|
}
|
||||||
|
@ -106,7 +106,7 @@ func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.File
|
||||||
|
|
||||||
// CrawlNoRecursion this function crawls a file or directory and does not recurse into any subdirectories. Also returns the result of the crawl.
|
// CrawlNoRecursion this function crawls a file or directory and does not recurse into any subdirectories. Also returns the result of the crawl.
|
||||||
func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item, error) {
|
func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item, error) {
|
||||||
CacheItem.RetardCheck(fullPath)
|
file.RetardCheck(fullPath)
|
||||||
|
|
||||||
// TODO: check if symlink and reject if it is
|
// TODO: check if symlink and reject if it is
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"crazyfs/DirectoryCrawler"
|
"crazyfs/DirectoryCrawler"
|
||||||
"crazyfs/SharedCache"
|
"crazyfs/SharedCache"
|
||||||
"crazyfs/config"
|
"crazyfs/config"
|
||||||
|
"crazyfs/globals"
|
||||||
"crazyfs/logging"
|
"crazyfs/logging"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -84,7 +85,7 @@ func NewResponseItem(cacheItem *CacheItem.Item) *ResponseItem {
|
||||||
crawlRelPath := filepath.Join(config.GetConfig().RootDir, child)
|
crawlRelPath := filepath.Join(config.GetConfig().RootDir, child)
|
||||||
// TODO: when does this get triggered?
|
// TODO: when does this get triggered?
|
||||||
log.Debugf(`NewResponseItem:Crawl - Not in cache, crawling: "%s" ("%s")`, child, crawlRelPath)
|
log.Debugf(`NewResponseItem:Crawl - Not in cache, crawling: "%s" ("%s")`, child, crawlRelPath)
|
||||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
|
||||||
item, err := dc.CrawlNoRecursion(crawlRelPath)
|
item, err := dc.CrawlNoRecursion(crawlRelPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("NewResponseItem:Crawl - %s", err)
|
log.Errorf("NewResponseItem:Crawl - %s", err)
|
||||||
|
|
|
@ -1,12 +1,18 @@
|
||||||
package Workers
|
package Walk
|
||||||
|
|
||||||
import "path/filepath"
|
import (
|
||||||
|
"crazyfs/file"
|
||||||
|
"path/filepath"
|
||||||
|
)
|
||||||
|
|
||||||
// Walk is a wrapper function for the Walker object that mimics the behavior of filepath.Walk, and doesn't follow symlinks.
|
// Walk is a wrapper function for the Walker object that mimics the behavior of filepath.Walk, and doesn't follow symlinks.
|
||||||
func Walk(root string, followSymlinks bool, walkFn filepath.WalkFunc) error {
|
func Walk(root string, followSymlinks bool, walkFn filepath.WalkFunc, queue *JobQueue) error {
|
||||||
|
file.RetardCheck(root)
|
||||||
w := Walker{
|
w := Walker{
|
||||||
root: root,
|
root: root,
|
||||||
followSymlinks: followSymlinks,
|
followSymlinks: followSymlinks,
|
||||||
|
walkFunc: walkFn,
|
||||||
|
queue: queue,
|
||||||
}
|
}
|
||||||
return w.Walk("", walkFn)
|
return w.Walk("", walkFn)
|
||||||
}
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
package Walk
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/enriquebris/goconcurrentqueue"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This is a queue implementation that doesn't rely on channels. This way of doing things should be more memory-efficient.
|
||||||
|
|
||||||
|
// Job is an individual job passed to the Workers.
|
||||||
|
type Job struct {
|
||||||
|
StartPath string
|
||||||
|
Walker *Walker // A pointer to the shared Walker object is passed as well.
|
||||||
|
Extra *map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// JobQueue is the Queue that workers pull jobs from.
|
||||||
|
type JobQueue struct {
|
||||||
|
fifo *goconcurrentqueue.FIFO
|
||||||
|
|
||||||
|
// Use our own condition to notify workers since `DequeueOrWaitForNextElement()` is limited to max 1000.
|
||||||
|
mutex sync.RWMutex
|
||||||
|
cond *sync.Cond
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewJobQueue() *JobQueue {
|
||||||
|
q := &JobQueue{}
|
||||||
|
q.fifo = goconcurrentqueue.NewFIFO()
|
||||||
|
q.cond = sync.NewCond(&q.mutex)
|
||||||
|
return q
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddJob adds a job to the queue and signals the workers so they know to pick it up.
|
||||||
|
func (q *JobQueue) AddJob(job Job) {
|
||||||
|
err := q.fifo.Enqueue(job)
|
||||||
|
if err != nil {
|
||||||
|
// Some sort of logic error or timeout occurred.
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
q.cond.Signal()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetJob is how a worker pulls a job from the queue.
|
||||||
|
func (q *JobQueue) GetJob() Job {
|
||||||
|
q.mutex.Lock()
|
||||||
|
defer q.mutex.Unlock()
|
||||||
|
for q.GetQueueSize() == 0 {
|
||||||
|
q.cond.Wait()
|
||||||
|
}
|
||||||
|
job, err := q.fifo.DequeueOrWaitForNextElement()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return job.(Job)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetQueueSize returns the size of the queue.
|
||||||
|
func (q *JobQueue) GetQueueSize() int {
|
||||||
|
return q.fifo.GetLen()
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package Workers
|
package Walk
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -15,20 +15,21 @@ var ErrNotDir = errors.New("not a directory")
|
||||||
|
|
||||||
// Walker is constructed for each Walk() function invocation.
|
// Walker is constructed for each Walk() function invocation.
|
||||||
type Walker struct {
|
type Walker struct {
|
||||||
wg sync.WaitGroup // used to sync the progress of all workers.
|
Wg sync.WaitGroup // used to sync the progress of all workers.
|
||||||
root string
|
root string
|
||||||
followSymlinks bool
|
followSymlinks bool
|
||||||
walkFunc filepath.WalkFunc
|
walkFunc filepath.WalkFunc
|
||||||
|
queue *JobQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
// addJob increments the job counter and pushes the path to the job queue.
|
// addJob increments the job counter and pushes the path to the job queue.
|
||||||
func (w *Walker) addJob(job WalkJob) {
|
func (w *Walker) addJob(job Job) {
|
||||||
w.wg.Add(1)
|
w.Wg.Add(1)
|
||||||
Queue.AddJob(job)
|
w.queue.AddJob(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
// processPath processes one path.
|
// ProcessPath processes one path.
|
||||||
func (w *Walker) processPath(relPath string) error {
|
func (w *Walker) ProcessPath(relPath string) error {
|
||||||
fullPath := filepath.Join(w.root, relPath)
|
fullPath := filepath.Join(w.root, relPath)
|
||||||
names, err := readDirNames(fullPath)
|
names, err := readDirNames(fullPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -59,7 +60,7 @@ func (w *Walker) processPath(relPath string) error {
|
||||||
|
|
||||||
// If this child is a directory, add it to the queue then move on.
|
// If this child is a directory, add it to the queue then move on.
|
||||||
if err == nil && info.Mode().IsDir() {
|
if err == nil && info.Mode().IsDir() {
|
||||||
w.addJob(WalkJob{
|
w.addJob(Job{
|
||||||
StartPath: subPath,
|
StartPath: subPath,
|
||||||
Walker: w,
|
Walker: w,
|
||||||
})
|
})
|
||||||
|
@ -73,8 +74,6 @@ func (w *Walker) processPath(relPath string) error {
|
||||||
func (w *Walker) Walk(relPath string, walkFn filepath.WalkFunc) error {
|
func (w *Walker) Walk(relPath string, walkFn filepath.WalkFunc) error {
|
||||||
// TODO: compare with filepath.WalkDir()
|
// TODO: compare with filepath.WalkDir()
|
||||||
|
|
||||||
w.walkFunc = walkFn
|
|
||||||
|
|
||||||
// Parse the beginning path.
|
// Parse the beginning path.
|
||||||
fullPath := filepath.Join(w.root, relPath)
|
fullPath := filepath.Join(w.root, relPath)
|
||||||
info, err := w.lstat(relPath)
|
info, err := w.lstat(relPath)
|
||||||
|
@ -95,12 +94,12 @@ func (w *Walker) Walk(relPath string, walkFn filepath.WalkFunc) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let the workers handle everything else.
|
// Let the workers handle everything else.
|
||||||
w.addJob(WalkJob{
|
w.addJob(Job{
|
||||||
StartPath: relPath,
|
StartPath: relPath,
|
||||||
Walker: w,
|
Walker: w,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Wait for the workers to finish reading the file system.
|
// Wait for the workers to finish reading the file system.
|
||||||
w.wg.Wait()
|
w.Wg.Wait()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package Workers
|
package Walk
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
|
@ -0,0 +1,12 @@
|
||||||
|
package Walk
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crazyfs/logging"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log *logrus.Logger
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
log = logging.GetLogger()
|
||||||
|
}
|
|
@ -1,56 +0,0 @@
|
||||||
package Workers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
// This is a queue implementation that doesn't rely on channels. This way of doing things should be more memory-efficient.
|
|
||||||
|
|
||||||
// WalkJob is an individual job passed to the Workers.
|
|
||||||
type WalkJob struct {
|
|
||||||
StartPath string
|
|
||||||
Walker *Walker // A pointer to the shared Walker object is passed as well.
|
|
||||||
}
|
|
||||||
|
|
||||||
// WalkJobQueue is the Queue that workers pull jobs from.
|
|
||||||
type WalkJobQueue struct {
|
|
||||||
jobs []WalkJob
|
|
||||||
mutex sync.RWMutex
|
|
||||||
cond *sync.Cond
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewJobQueue() *WalkJobQueue {
|
|
||||||
q := &WalkJobQueue{}
|
|
||||||
q.cond = sync.NewCond(&q.mutex)
|
|
||||||
return q
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddJob adds a job to the queue and signals the workers so they know to pick it up.
|
|
||||||
func (q *WalkJobQueue) AddJob(job WalkJob) {
|
|
||||||
q.mutex.Lock()
|
|
||||||
defer q.mutex.Unlock()
|
|
||||||
q.jobs = append(q.jobs, job)
|
|
||||||
q.cond.Signal()
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetJob is how a worker pulls a job from the queue.
|
|
||||||
func (q *WalkJobQueue) GetJob() WalkJob {
|
|
||||||
q.mutex.Lock()
|
|
||||||
defer q.mutex.Unlock()
|
|
||||||
|
|
||||||
for len(q.jobs) == 0 {
|
|
||||||
q.cond.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
job := q.jobs[0]
|
|
||||||
q.jobs = q.jobs[1:]
|
|
||||||
|
|
||||||
return job
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetQueueSize returns the size of the queue.
|
|
||||||
func (q *WalkJobQueue) GetQueueSize() int {
|
|
||||||
q.mutex.RLock()
|
|
||||||
defer q.mutex.RUnlock()
|
|
||||||
return len(q.jobs)
|
|
||||||
}
|
|
|
@ -1,39 +1,28 @@
|
||||||
package Workers
|
package Workers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crazyfs/Walk"
|
||||||
"crazyfs/config"
|
"crazyfs/config"
|
||||||
"sync/atomic"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// worker.go holds the worker function for `Walk()`.
|
// worker.go holds the worker function for `Walk()`.
|
||||||
|
|
||||||
// Queue is the global walk job queue.
|
type CrawlWorkerFunc func(workerData *CrawlWorkers)
|
||||||
var Queue *WalkJobQueue
|
|
||||||
|
|
||||||
// BusyWorkers is an atomic counter for the number of active Workers
|
type CrawlWorkers struct {
|
||||||
var BusyWorkers int32
|
Queue *Walk.JobQueue
|
||||||
|
BusyWorkers int32
|
||||||
|
WorkerFunc CrawlWorkerFunc
|
||||||
|
}
|
||||||
|
|
||||||
// InitializeWorkers starts the number of Workers defined by the config.
|
// InitializeWorkers starts the number of Workers defined by the config.
|
||||||
func InitializeWorkers() {
|
func InitializeWorkers(workerFunc CrawlWorkerFunc) *CrawlWorkers {
|
||||||
Queue = NewJobQueue()
|
w := &CrawlWorkers{
|
||||||
|
WorkerFunc: workerFunc,
|
||||||
|
}
|
||||||
|
w.Queue = Walk.NewJobQueue()
|
||||||
for n := 1; n <= config.GetConfig().DirectoryCrawlers; n++ {
|
for n := 1; n <= config.GetConfig().DirectoryCrawlers; n++ {
|
||||||
go worker()
|
go w.WorkerFunc(w)
|
||||||
}
|
|
||||||
log.Debugf("WORKERS - Started %d directory crawler Workers.", config.GetConfig().DirectoryCrawlers)
|
|
||||||
}
|
|
||||||
|
|
||||||
// worker processes jobs forever.
|
|
||||||
func worker() {
|
|
||||||
for {
|
|
||||||
job := Queue.GetJob()
|
|
||||||
|
|
||||||
atomic.AddInt32(&BusyWorkers, 1)
|
|
||||||
err := job.Walker.processPath(job.StartPath)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("WORKER - %s - %s", job.StartPath, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
job.Walker.wg.Done()
|
|
||||||
atomic.AddInt32(&BusyWorkers, -1)
|
|
||||||
}
|
}
|
||||||
|
return w
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"crazyfs/CacheItem"
|
"crazyfs/CacheItem"
|
||||||
"crazyfs/DirectoryCrawler"
|
"crazyfs/DirectoryCrawler"
|
||||||
"crazyfs/SharedCache"
|
"crazyfs/SharedCache"
|
||||||
|
"crazyfs/globals"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
@ -14,7 +15,7 @@ func HandleFileNotFound(relPath string, fullPath string, w http.ResponseWriter)
|
||||||
// TODO: implement some sort of backoff or delay for repeated calls to recache the same path.
|
// TODO: implement some sort of backoff or delay for repeated calls to recache the same path.
|
||||||
|
|
||||||
log.Debugf(`HELPERS:HandleFileNotFound:Crawl - Not in cache, crawling: "%s"`, fullPath)
|
log.Debugf(`HELPERS:HandleFileNotFound:Crawl - Not in cache, crawling: "%s"`, fullPath)
|
||||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
|
||||||
|
|
||||||
// Check if this is a symlink. We do this before CrawlNoRecursion() because we want to tell the end user that
|
// Check if this is a symlink. We do this before CrawlNoRecursion() because we want to tell the end user that
|
||||||
// we're not going to resolve this symlink.
|
// we're not going to resolve this symlink.
|
||||||
|
@ -83,7 +84,7 @@ func HandleFileNotFound(relPath string, fullPath string, w http.ResponseWriter)
|
||||||
// Start a recursive crawl in the background.
|
// Start a recursive crawl in the background.
|
||||||
go func() {
|
go func() {
|
||||||
log.Debugf("HELPERS:HandleFileNotFound:Crawl - Starting background recursive crawl for %s", fullPath)
|
log.Debugf("HELPERS:HandleFileNotFound:Crawl - Starting background recursive crawl for %s", fullPath)
|
||||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := dc.Crawl(fullPath, nil)
|
err := dc.Crawl(fullPath, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -2,10 +2,10 @@ package admin
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crazyfs/DirectoryCrawler"
|
"crazyfs/DirectoryCrawler"
|
||||||
"crazyfs/Workers"
|
|
||||||
"crazyfs/api/helpers"
|
"crazyfs/api/helpers"
|
||||||
"crazyfs/config"
|
"crazyfs/config"
|
||||||
"crazyfs/elastic"
|
"crazyfs/elastic"
|
||||||
|
"crazyfs/globals"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"crypto/subtle"
|
"crypto/subtle"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -42,18 +42,18 @@ func APIAdminCrawlsInfo(w http.ResponseWriter, r *http.Request) {
|
||||||
"finished": DirectoryCrawler.GetFinishedCrawls(),
|
"finished": DirectoryCrawler.GetFinishedCrawls(),
|
||||||
},
|
},
|
||||||
"crawlWorkers": map[string]interface{}{
|
"crawlWorkers": map[string]interface{}{
|
||||||
"busy": atomic.LoadInt32(&Workers.BusyWorkers),
|
"busy": atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers),
|
||||||
"alive": config.GetConfig().DirectoryCrawlers,
|
"alive": config.GetConfig().DirectoryCrawlers,
|
||||||
},
|
},
|
||||||
"queue": map[string]interface{}{
|
"queue": map[string]interface{}{
|
||||||
"size": Workers.Queue.GetQueueSize(),
|
"size": globals.DirectoryCrawlers.Queue.GetQueueSize(),
|
||||||
},
|
},
|
||||||
"initialCrawlElapsed": config.InitialCrawlElapsed,
|
"initialCrawlElapsed": config.InitialCrawlElapsed,
|
||||||
"elastic": map[string]interface{}{
|
"elastic": map[string]interface{}{
|
||||||
"deleteWorkers": map[string]interface{}{
|
"deleteWorkers": map[string]interface{}{
|
||||||
"busy": elastic.BusyWorkers,
|
"busy": globals.ElasticCrawlers.BusyWorkers,
|
||||||
"alive": config.GetConfig().ElasticsearchSyncThreads,
|
"alive": config.GetConfig().ElasticsearchSyncThreads,
|
||||||
"queueSize": elastic.Queue.GetQueueSize(),
|
"queueSize": globals.ElasticCrawlers.Queue.GetQueueSize(),
|
||||||
},
|
},
|
||||||
"syncRunning": map[string]interface{}{
|
"syncRunning": map[string]interface{}{
|
||||||
"refresh": refreshSyncRunning,
|
"refresh": refreshSyncRunning,
|
||||||
|
|
|
@ -3,9 +3,8 @@ package cache
|
||||||
import (
|
import (
|
||||||
"crazyfs/DirectoryCrawler"
|
"crazyfs/DirectoryCrawler"
|
||||||
"crazyfs/SharedCache"
|
"crazyfs/SharedCache"
|
||||||
"crazyfs/Workers"
|
|
||||||
"crazyfs/config"
|
"crazyfs/config"
|
||||||
"crazyfs/elastic"
|
"crazyfs/globals"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
@ -13,9 +12,7 @@ import (
|
||||||
|
|
||||||
func StartCrawler() error {
|
func StartCrawler() error {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
crawlerChan := make(chan struct{}, config.GetConfig().DirectoryCrawlers)
|
go startCrawl(&wg)
|
||||||
|
|
||||||
go startCrawl(&wg, crawlerChan)
|
|
||||||
|
|
||||||
ticker := time.NewTicker(60 * time.Second)
|
ticker := time.NewTicker(60 * time.Second)
|
||||||
go logCacheStatus("CACHE STATUS", ticker, log.Debugf)
|
go logCacheStatus("CACHE STATUS", ticker, log.Debugf)
|
||||||
|
@ -23,29 +20,27 @@ func StartCrawler() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func startCrawl(wg *sync.WaitGroup, crawlerChan chan struct{}) {
|
func startCrawl(wg *sync.WaitGroup) {
|
||||||
ticker := time.NewTicker(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
|
ticker := time.NewTicker(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
time.Sleep(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
|
time.Sleep(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
|
||||||
|
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
crawlerChan <- struct{}{}
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
|
||||||
log.Infoln("CRAWLER - Starting a crawl...")
|
log.Infoln("CRAWLER - Recurring - Starting a crawl...")
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := dc.Crawl(config.GetConfig().RootDir, nil)
|
err := dc.Crawl(config.GetConfig().RootDir, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("CRAWLER - Crawl failed: %s", err)
|
log.Warnf("CRAWLER - Recurring - Crawl failed: %s", err)
|
||||||
} else {
|
} else {
|
||||||
duration := time.Since(start).Round(time.Second)
|
duration := time.Since(start).Round(time.Second)
|
||||||
log.Infof("CRAWLER - Crawl completed in %s", duration)
|
log.Infof("CRAWLER - Recurring - Crawl completed in %s", duration)
|
||||||
log.Debugf("%d/%d items in the cache.", config.GetConfig().CacheSize, len(SharedCache.Cache.Keys()))
|
log.Debugf("%d/%d items in the cache.", len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize)
|
||||||
}
|
}
|
||||||
<-crawlerChan
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -56,11 +51,11 @@ func logCacheStatus(msg string, ticker *time.Ticker, logFn func(format string, a
|
||||||
if !config.GetConfig().ElasticsearchSyncEnable || InitialCrawlInProgress {
|
if !config.GetConfig().ElasticsearchSyncEnable || InitialCrawlInProgress {
|
||||||
logStr := "%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d."
|
logStr := "%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d."
|
||||||
logFn(logStr,
|
logFn(logStr,
|
||||||
msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&Workers.BusyWorkers), Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls())
|
msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), globals.DirectoryCrawlers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls())
|
||||||
} else {
|
} else {
|
||||||
logStr := "%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d. Busy Elastic delete workers: %d. Elastic deletes queued: %d"
|
logStr := "%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d. Busy Elastic delete workers: %d. Elastic deletes queued: %d"
|
||||||
logFn(logStr,
|
logFn(logStr,
|
||||||
msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&Workers.BusyWorkers), Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls(), elastic.BusyWorkers, elastic.Queue.GetQueueSize())
|
msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), globals.DirectoryCrawlers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls(), globals.ElasticCrawlers.BusyWorkers, globals.ElasticCrawlers.Queue.GetQueueSize())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package cache
|
||||||
import (
|
import (
|
||||||
"crazyfs/DirectoryCrawler"
|
"crazyfs/DirectoryCrawler"
|
||||||
"crazyfs/config"
|
"crazyfs/config"
|
||||||
|
"crazyfs/globals"
|
||||||
"crazyfs/logging"
|
"crazyfs/logging"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -27,7 +28,7 @@ func InitialCrawl() {
|
||||||
|
|
||||||
InitialCrawlLock.Lock()
|
InitialCrawlLock.Lock()
|
||||||
InitialCrawlInProgress = true
|
InitialCrawlInProgress = true
|
||||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
|
||||||
//start := time.Now()
|
//start := time.Now()
|
||||||
err := dc.Crawl(config.GetConfig().RootDir, nil)
|
err := dc.Crawl(config.GetConfig().RootDir, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"crazyfs/SharedCache"
|
"crazyfs/SharedCache"
|
||||||
"crazyfs/config"
|
"crazyfs/config"
|
||||||
"crazyfs/file"
|
"crazyfs/file"
|
||||||
|
"crazyfs/globals"
|
||||||
"errors"
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -25,7 +26,7 @@ func CheckAndRecache(path string) {
|
||||||
sem <- struct{}{} // acquire a token
|
sem <- struct{}{} // acquire a token
|
||||||
go func() {
|
go func() {
|
||||||
defer func() { <-sem }() // release the token when done
|
defer func() { <-sem }() // release the token when done
|
||||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
|
||||||
err := dc.Crawl(path, nil)
|
err := dc.Crawl(path, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("CACHE:Recache - %s", err.Error())
|
log.Errorf("CACHE:Recache - %s", err.Error())
|
||||||
|
@ -36,7 +37,7 @@ func CheckAndRecache(path string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func Recache(path string) error {
|
func Recache(path string) error {
|
||||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
|
||||||
if dc.IsCrawlActive(path, nil) {
|
if dc.IsCrawlActive(path, nil) {
|
||||||
return errors.New("rejecting crawl, already in progress for this path")
|
return errors.New("rejecting crawl, already in progress for this path")
|
||||||
}
|
}
|
||||||
|
@ -46,7 +47,7 @@ func Recache(path string) error {
|
||||||
sem <- struct{}{} // acquire a token
|
sem <- struct{}{} // acquire a token
|
||||||
go func() {
|
go func() {
|
||||||
defer func() { <-sem }() // release the token when done
|
defer func() { <-sem }() // release the token when done
|
||||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
|
||||||
err := dc.Crawl(path, nil)
|
err := dc.Crawl(path, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("CACHE:Recache - %s", err.Error())
|
log.Errorf("CACHE:Recache - %s", err.Error())
|
||||||
|
@ -87,7 +88,7 @@ func Recache(path string) error {
|
||||||
// Update the parent directory in the cache
|
// Update the parent directory in the cache
|
||||||
SharedCache.Cache.Add(parentDir, parentItem)
|
SharedCache.Cache.Add(parentDir, parentItem)
|
||||||
}
|
}
|
||||||
} else if !CacheItem.PathOutsideRoot(parentDir) {
|
} else if !file.PathOutsideRoot(parentDir) {
|
||||||
// If the parent directory isn't in the cache, crawl it.
|
// If the parent directory isn't in the cache, crawl it.
|
||||||
log.Infof("CACHE:Recache - Crawling parent directory since it isn't in the cache yet: %s", parentDir)
|
log.Infof("CACHE:Recache - Crawling parent directory since it isn't in the cache yet: %s", parentDir)
|
||||||
_, err := dc.CrawlNoRecursion(parentDir)
|
_, err := dc.CrawlNoRecursion(parentDir)
|
||||||
|
|
|
@ -151,11 +151,11 @@ func SetConfig(configFile string) (*Config, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.DirectoryCrawlers < 1 {
|
if config.DirectoryCrawlers < 1 {
|
||||||
return nil, errors.New("crawl_mode_crawl_interval must be more than 1")
|
return nil, errors.New("crawl_mode_crawl_interval must be greater than or equal to 1")
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.CacheSize < 1 {
|
if config.CacheSize < 1 {
|
||||||
return nil, errors.New("crawl_workers must be more than 1")
|
return nil, errors.New("cache_size must be greater than or equal to 1")
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.CacheRecacheCrawlerLimit < 1 {
|
if config.CacheRecacheCrawlerLimit < 1 {
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crazyfs/Crawlers"
|
||||||
"crazyfs/SharedCache"
|
"crazyfs/SharedCache"
|
||||||
"crazyfs/Workers"
|
|
||||||
"crazyfs/api"
|
"crazyfs/api"
|
||||||
"crazyfs/cache"
|
"crazyfs/cache"
|
||||||
"crazyfs/config"
|
"crazyfs/config"
|
||||||
|
@ -115,7 +115,7 @@ func main() {
|
||||||
|
|
||||||
// Start the Elastic connection, so it can initialize while we're doing the initial crawl.
|
// Start the Elastic connection, so it can initialize while we're doing the initial crawl.
|
||||||
// If we fail to establish a connection to Elastic, don't kill the entire server. Instead, just disable Elastic.
|
// If we fail to establish a connection to Elastic, don't kill the entire server. Instead, just disable Elastic.
|
||||||
if cfg.ElasticsearchEnable {
|
if cfg.ElasticsearchEnable && !cliArgs.disableElasticSync {
|
||||||
esCfg := elasticsearch.Config{
|
esCfg := elasticsearch.Config{
|
||||||
Addresses: []string{
|
Addresses: []string{
|
||||||
cfg.ElasticsearchEndpoint,
|
cfg.ElasticsearchEndpoint,
|
||||||
|
@ -126,10 +126,10 @@ func main() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error creating the Elasticsearch client: %s", err)
|
log.Errorf("Error creating the Elasticsearch client: %s", err)
|
||||||
elastic.LogElasticQuit()
|
elastic.LogElasticQuit()
|
||||||
elastic.ElasticEnabled = false
|
elastic.Enabled = false
|
||||||
} else {
|
} else {
|
||||||
elastic.ElasticClient = es
|
elastic.ElasticClient = es
|
||||||
elastic.InitializeWorkers()
|
Crawlers.InitializeElasticCrawlerWorkers()
|
||||||
// This could take a minute, so we do this in the background while we crawl.
|
// This could take a minute, so we do this in the background while we crawl.
|
||||||
go func() {
|
go func() {
|
||||||
elastic.EnableElasticsearchConnection()
|
elastic.EnableElasticsearchConnection()
|
||||||
|
@ -149,9 +149,9 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Elasticsearch enabled: %t", cfg.ElasticsearchEnable)
|
log.Infof("Elasticsearch enabled: %t", cfg.ElasticsearchEnable && !cliArgs.disableElasticSync)
|
||||||
|
|
||||||
Workers.InitializeWorkers()
|
Crawlers.InitializeDirectoryCrawlerWorkers()
|
||||||
|
|
||||||
cache.InitRecacheSemaphore(cfg.CacheRecacheCrawlerLimit)
|
cache.InitRecacheSemaphore(cfg.CacheRecacheCrawlerLimit)
|
||||||
|
|
||||||
|
|
|
@ -1,50 +0,0 @@
|
||||||
package elastic
|
|
||||||
|
|
||||||
import "sync"
|
|
||||||
|
|
||||||
// More or less like the other queue implementation.
|
|
||||||
|
|
||||||
type DeleteJob struct {
|
|
||||||
Key string
|
|
||||||
Path string
|
|
||||||
}
|
|
||||||
|
|
||||||
type DeleteJobQueue struct {
|
|
||||||
jobs []DeleteJob
|
|
||||||
mutex sync.Mutex
|
|
||||||
cond *sync.Cond
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewJobQueue() *DeleteJobQueue {
|
|
||||||
q := &DeleteJobQueue{}
|
|
||||||
q.cond = sync.NewCond(&q.mutex)
|
|
||||||
return q
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddJob adds a job to the queue and signals the workers so they know to pick it up.
|
|
||||||
func (q *DeleteJobQueue) AddJob(job DeleteJob) {
|
|
||||||
q.mutex.Lock()
|
|
||||||
q.jobs = append(q.jobs, job)
|
|
||||||
q.mutex.Unlock()
|
|
||||||
q.cond.Signal()
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetJob is how a worker pulls a job from the queue.
|
|
||||||
func (q *DeleteJobQueue) GetJob() DeleteJob {
|
|
||||||
q.mutex.Lock()
|
|
||||||
defer q.mutex.Unlock()
|
|
||||||
|
|
||||||
for len(q.jobs) == 0 {
|
|
||||||
q.cond.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
job := q.jobs[0]
|
|
||||||
q.jobs = q.jobs[1:]
|
|
||||||
|
|
||||||
return job
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetQueueSize returns the size of the queue.
|
|
||||||
func (q *DeleteJobQueue) GetQueueSize() int {
|
|
||||||
return len(q.jobs)
|
|
||||||
}
|
|
|
@ -1,39 +0,0 @@
|
||||||
package elastic
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crazyfs/SharedCache"
|
|
||||||
"crazyfs/config"
|
|
||||||
"sync/atomic"
|
|
||||||
)
|
|
||||||
|
|
||||||
// BusyWorkers is an atomic counter for the number of active Workers
|
|
||||||
var BusyWorkers int32
|
|
||||||
|
|
||||||
// InitializeWorkers starts the number of Workers defined by the config.
|
|
||||||
func InitializeWorkers() {
|
|
||||||
Queue = NewJobQueue()
|
|
||||||
for n := 1; n <= config.GetConfig().ElasticsearchSyncThreads; n++ {
|
|
||||||
go worker()
|
|
||||||
}
|
|
||||||
log.Debugf("ELASTIC - Started %d sync workers.", config.GetConfig().ElasticsearchSyncThreads)
|
|
||||||
}
|
|
||||||
|
|
||||||
// worker processes jobs forever.
|
|
||||||
func worker() {
|
|
||||||
for {
|
|
||||||
job := Queue.GetJob()
|
|
||||||
atomic.AddInt32(&BusyWorkers, 1)
|
|
||||||
|
|
||||||
if _, ok := SharedCache.Cache.Get(job.Path); !ok {
|
|
||||||
// If a key in Elastic does not exist in the LRU cache, delete it from Elastic.
|
|
||||||
err := deleteFromElasticsearch(job.Key)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf(`ELASTIC:Delete - Error deleting key "%s" - %s`, job.Key, err)
|
|
||||||
} else {
|
|
||||||
log.Debugf(`ELASTIC:Delete - Deleted path: "%s"`, job.Path)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic.AddInt32(&BusyWorkers, -1)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -3,18 +3,17 @@ package elastic
|
||||||
import (
|
import (
|
||||||
"crazyfs/DirectoryCrawler"
|
"crazyfs/DirectoryCrawler"
|
||||||
"crazyfs/config"
|
"crazyfs/config"
|
||||||
|
"crazyfs/globals"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var Queue *DeleteJobQueue
|
|
||||||
|
|
||||||
var syncLock sync.Mutex
|
var syncLock sync.Mutex
|
||||||
|
|
||||||
var ElasticEnabled bool
|
var Enabled bool
|
||||||
|
|
||||||
func SyncThread() {
|
func SyncThread() {
|
||||||
if !ElasticEnabled {
|
if !Enabled {
|
||||||
LogElasticQuit()
|
LogElasticQuit()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -41,7 +40,7 @@ func SyncThread() {
|
||||||
|
|
||||||
// TODO: have the workers exit when the sync job is finished
|
// TODO: have the workers exit when the sync job is finished
|
||||||
func syncElasticsearch(doFullSync bool) {
|
func syncElasticsearch(doFullSync bool) {
|
||||||
if !ElasticEnabled {
|
if !Enabled {
|
||||||
log.Debugln("ELASTIC - Disabled, not syncing.")
|
log.Debugln("ELASTIC - Disabled, not syncing.")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -81,7 +80,7 @@ func syncElasticsearch(doFullSync bool) {
|
||||||
|
|
||||||
startRemoveStaleItemsFromElasticsearch(globalPathsByKey)
|
startRemoveStaleItemsFromElasticsearch(globalPathsByKey)
|
||||||
|
|
||||||
dc := DirectoryCrawler.NewDirectoryCrawler()
|
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue) // TODO: replace with proper elastic queue
|
||||||
err = dc.Crawl(config.GetConfig().RootDir, addToElasticsearch)
|
err = dc.Crawl(config.GetConfig().RootDir, addToElasticsearch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("ELASTIC - Crawl failed: %s", err)
|
log.Errorf("ELASTIC - Crawl failed: %s", err)
|
||||||
|
@ -101,14 +100,14 @@ func EnableElasticsearchConnection() {
|
||||||
esSize, err := getElasticSize()
|
esSize, err := getElasticSize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logElasticConnError(err)
|
logElasticConnError(err)
|
||||||
ElasticEnabled = false
|
Enabled = false
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ElasticEnabled = true
|
Enabled = true
|
||||||
log.Infof(`ELASTIC - Connected to index "%s". Contains %d items.`, config.GetConfig().ElasticsearchIndex, esSize)
|
log.Infof(`ELASTIC - Connected to index "%s". Contains %d items.`, config.GetConfig().ElasticsearchIndex, esSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func LogElasticQuit() {
|
func LogElasticQuit() {
|
||||||
ElasticEnabled = false
|
Enabled = false
|
||||||
log.Errorln("ELASTIC - Background thread exiting, Elastic indexing and search will not be available.")
|
log.Errorln("ELASTIC - Background thread exiting, Elastic indexing and search will not be available.")
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) er
|
||||||
// This sort of thing can happen if new files have been added on disk but a scan has not been run to refresh the cache.
|
// This sort of thing can happen if new files have been added on disk but a scan has not been run to refresh the cache.
|
||||||
log.Debugf(`ELASTIC:Add - Path "%s" exists on disk, but not in the LRU cache. Deleting from Elastic.`, relPath)
|
log.Debugf(`ELASTIC:Add - Path "%s" exists on disk, but not in the LRU cache. Deleting from Elastic.`, relPath)
|
||||||
// Delete this item from Elastic in order to avoid any strange inconsistencies.
|
// Delete this item from Elastic in order to avoid any strange inconsistencies.
|
||||||
err := deleteFromElasticsearch(encodeToBase64(relPath))
|
err := DeleteFromElasticsearch(encodeToBase64(relPath))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("ELASTIC:Add - Failed to delete \"%s\" - %s", relPath, err)
|
log.Errorf("ELASTIC:Add - Failed to delete \"%s\" - %s", relPath, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,13 +2,20 @@ package elastic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crazyfs/Walk"
|
||||||
"crazyfs/config"
|
"crazyfs/config"
|
||||||
|
"crazyfs/globals"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type DeleteJob struct {
|
||||||
|
Key string
|
||||||
|
Path string
|
||||||
|
}
|
||||||
|
|
||||||
func startRemoveStaleItemsFromElasticsearch(pathsByKey map[string]string) {
|
func startRemoveStaleItemsFromElasticsearch(pathsByKey map[string]string) {
|
||||||
log.Debugln("ELASTIC:Delete - Checking for deleted items that need to be removed from Elastic...")
|
log.Debugln("ELASTIC:Delete - Checking for deleted items that need to be removed from Elastic...")
|
||||||
|
|
||||||
|
@ -16,11 +23,18 @@ func startRemoveStaleItemsFromElasticsearch(pathsByKey map[string]string) {
|
||||||
|
|
||||||
// For each key in Elasticsearch, create a job to check (and remove it if the key no longer exists in the cache).
|
// For each key in Elasticsearch, create a job to check (and remove it if the key no longer exists in the cache).
|
||||||
for path, key := range pathsByKey {
|
for path, key := range pathsByKey {
|
||||||
Queue.AddJob(DeleteJob{Key: key, Path: path})
|
job := Walk.Job{
|
||||||
|
StartPath: path,
|
||||||
|
}
|
||||||
|
extra := make(map[string]interface{})
|
||||||
|
extra["key"] = key
|
||||||
|
job.Extra = &extra
|
||||||
|
|
||||||
|
globals.ElasticCrawlers.Queue.AddJob(job)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteFromElasticsearch(key string) error {
|
func DeleteFromElasticsearch(key string) error {
|
||||||
req := esapi.DeleteRequest{
|
req := esapi.DeleteRequest{
|
||||||
Index: config.GetConfig().ElasticsearchIndex,
|
Index: config.GetConfig().ElasticsearchIndex,
|
||||||
DocumentID: key,
|
DocumentID: key,
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
package file
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crazyfs/config"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
func PathOutsideRoot(fullPath string) bool {
|
||||||
|
return !strings.HasPrefix(fullPath, config.GetConfig().RootDir)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RetardCheck makes sure we never do anything outside the root dir.
|
||||||
|
func RetardCheck(fullPath string) {
|
||||||
|
if PathOutsideRoot(fullPath) {
|
||||||
|
panic(fmt.Sprintf("NewItem was not passed an absolute path. The path must start with the RootDir (%s). Failing path: %s", config.GetConfig().RootDir, fullPath))
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,14 @@
|
||||||
|
package globals
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crazyfs/Walk"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ElasticCrawlers *DcWorkers
|
||||||
|
|
||||||
|
var DirectoryCrawlers *DcWorkers
|
||||||
|
|
||||||
|
type DcWorkers struct {
|
||||||
|
Queue *Walk.JobQueue
|
||||||
|
BusyWorkers int32
|
||||||
|
}
|
|
@ -6,6 +6,7 @@ require (
|
||||||
github.com/chai2010/webp v1.1.1
|
github.com/chai2010/webp v1.1.1
|
||||||
github.com/disintegration/imaging v1.6.2
|
github.com/disintegration/imaging v1.6.2
|
||||||
github.com/elastic/go-elasticsearch/v8 v8.11.1
|
github.com/elastic/go-elasticsearch/v8 v8.11.1
|
||||||
|
github.com/enriquebris/goconcurrentqueue v0.7.0
|
||||||
github.com/gabriel-vasile/mimetype v1.4.2
|
github.com/gabriel-vasile/mimetype v1.4.2
|
||||||
github.com/gorilla/mux v1.8.0
|
github.com/gorilla/mux v1.8.0
|
||||||
github.com/hashicorp/golang-lru/v2 v2.0.4
|
github.com/hashicorp/golang-lru/v2 v2.0.4
|
||||||
|
|
|
@ -57,6 +57,8 @@ github.com/elastic/elastic-transport-go/v8 v8.3.0 h1:DJGxovyQLXGr62e9nDMPSxRyWIO
|
||||||
github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
|
github.com/elastic/elastic-transport-go/v8 v8.3.0/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
|
||||||
github.com/elastic/go-elasticsearch/v8 v8.11.1 h1:1VgTgUTbpqQZ4uE+cPjkOvy/8aw1ZvKcU0ZUE5Cn1mc=
|
github.com/elastic/go-elasticsearch/v8 v8.11.1 h1:1VgTgUTbpqQZ4uE+cPjkOvy/8aw1ZvKcU0ZUE5Cn1mc=
|
||||||
github.com/elastic/go-elasticsearch/v8 v8.11.1/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg=
|
github.com/elastic/go-elasticsearch/v8 v8.11.1/go.mod h1:GU1BJHO7WeamP7UhuElYwzzHtvf9SDmeVpSSy9+o6Qg=
|
||||||
|
github.com/enriquebris/goconcurrentqueue v0.7.0 h1:JYrDa45N3xo3Sr9mjvlRaWiBHvBEJIhAdLXO3VGVghA=
|
||||||
|
github.com/enriquebris/goconcurrentqueue v0.7.0/go.mod h1:OZ+KC2BcRYzjg0vgoUs1GFqdAjkD9mz2Ots7Jbm1yS4=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||||
|
|
Loading…
Reference in New Issue