fix elastic workers, fix misc bugs

This commit is contained in:
Cyberes 2024-03-17 20:19:36 -06:00
parent c0faaab3f8
commit 74206cd6e5
13 changed files with 117 additions and 63 deletions

View File

@ -42,7 +42,7 @@ func APIAdminCrawlsInfo(w http.ResponseWriter, r *http.Request) {
elasticWorkers = map[string]interface{}{
"busy": globals.ElasticCrawlers.BusyWorkers,
"alive": config.GetConfig().ElasticsearchSyncThreads,
"queueSize": globals.ElasticCrawlers.Queue.GetQueueSize(),
"queueSize": globals.ElasticCrawlers.Queue.GetQueuedJobs(),
}
} else {
elasticWorkers = map[string]interface{}{
@ -62,7 +62,7 @@ func APIAdminCrawlsInfo(w http.ResponseWriter, r *http.Request) {
"alive": config.GetConfig().DirectoryCrawlers,
},
"queue": map[string]interface{}{
"items": globals.DirectoryCrawlers.Queue.GetQueueSize(),
"items": globals.DirectoryCrawlers.Queue.GetQueuedJobs(),
},
"initialCrawlElapsed": config.InitialCrawlElapsed,
"elastic": map[string]interface{}{

View File

@ -56,7 +56,7 @@ func logCacheStatus(msg string, ticker *time.Ticker, logFn func(format string, a
case <-ticker.C:
logStr := "%s - %d/%d items in the cache. Busy workers: %d. Jobs queued: %d. Running crawls: %d."
logFn(logStr,
msg, len(sharedcache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), globals.DirectoryCrawlers.Queue.GetQueueSize(), directorycrawler.GetTotalActiveCrawls())
msg, len(sharedcache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), globals.DirectoryCrawlers.Queue.GetQueuedJobs(), directorycrawler.GetTotalActiveCrawls())
}
}
}

View File

@ -41,6 +41,7 @@ type Config struct {
ElasticsearchExcludePatterns []string
ElasticsearchFullSyncOnStart bool
ElasticsearchDefaultQueryField string
ElasticPrintChanges bool
HTTPRealIPHeader string
HTTPNoMimeSniffHeader bool
HTTPAccessControlAllowOriginHeader string
@ -88,6 +89,7 @@ func SetConfig(configFile string) (*Config, error) {
viper.SetDefault("elasticsearch_full_sync_on_start", false)
viper.SetDefault("elasticsearch_query_fields", []string{"extension", "name", "path", "type", "size", "isDir"})
viper.SetDefault("elasticsearch_default_query_field", "name")
viper.SetDefault("elasticsearch_print_changes", false)
viper.SetDefault("http_real_ip_header", "X-Forwarded-For")
viper.SetDefault("http_no_mime_sniff_header", false)
viper.SetDefault("http_access_control_allow_origin_header", "*")
@ -141,6 +143,7 @@ func SetConfig(configFile string) (*Config, error) {
ElasticsearchExcludePatterns: viper.GetStringSlice("elasticsearch_exclude_patterns"),
ElasticsearchFullSyncOnStart: viper.GetBool("elasticsearch_full_sync_on_start"),
ElasticsearchDefaultQueryField: viper.GetString("elasticsearch_default_query_field"),
ElasticPrintChanges: viper.GetBool("elasticsearch_print_changes"),
HTTPRealIPHeader: viper.GetString("http_real_ip_header"),
HTTPNoMimeSniffHeader: viper.GetBool("http_no_mime_sniff_header"),
HTTPAccessControlAllowOriginHeader: viper.GetString("http_access_control_allow_origin_header"),

View File

@ -116,11 +116,16 @@ func main() {
// Start the Elastic connection, so it can initialize while we're doing the initial crawl.
// If we fail to establish a connection to Elastic, don't kill the entire server. Instead, just disable Elastic.
if cfg.ElasticsearchEnable && !cliArgs.disableElasticSync {
fmt.Println(config.GetConfig().ElasticsearchSyncThreads + 1)
esCfg := elasticsearch.Config{
Addresses: []string{
cfg.ElasticsearchEndpoint,
},
APIKey: cfg.ElasticsearchAPIKey,
//Transport: &http.Transport{
// MaxIdleConnsPerHost: config.GetConfig().ElasticsearchSyncThreads + 1,
// IdleConnTimeout: 30 * time.Second,
//},
}
es, err := elasticsearch.NewClient(esCfg)
if err != nil {

View File

@ -11,7 +11,7 @@ func InitializeDirectoryCrawlerWorkers() *globals.DcWorkers {
if globals.DirectoryCrawlers != nil {
panic("DirectoryCrawlers has already been defined!")
}
dcWorkers := workers.InitializeWorkers(directoryCrawlerWorker) // *workers.CrawlWorkers
dcWorkers := workers.InitializeWorkers(config.GetConfig().DirectoryCrawlers, directoryCrawlerWorker)
d := &globals.DcWorkers{}
// Copy the fields given to us by InitializeWorkers() to the global object.

View File

@ -5,7 +5,6 @@ import (
"crazyfs/globals"
"crazyfs/sharedcache"
"crazyfs/workers"
"fmt"
"sync"
"sync/atomic"
)
@ -13,14 +12,27 @@ import (
// aliveWorkers is used by syncElasticsearch to know when it is safe to erase the queue.
var aliveWorkers sync.WaitGroup
type CrawlerExtras struct {
Added int32
}
type JobExtras struct {
Task string
Key string
}
func InitializeElasticCrawlerWorkers() *globals.DcWorkers {
if globals.ElasticCrawlers != nil {
panic("ElasticCrawlers has already been defined!")
}
deleteWorkers := workers.InitializeWorkers(elasticDeleteWorker)
elWorkers := workers.InitializeWorkers(config.GetConfig().ElasticsearchSyncThreads, elasticDeleteWorker)
d := &globals.DcWorkers{}
d.Queue = deleteWorkers.Queue
deleteWorkers.BusyWorkers = &d.BusyWorkers
d.Queue = elWorkers.Queue
extra := make(map[string]interface{})
var completed int32
extra["completed"] = completed
d.Extra = &CrawlerExtras{}
elWorkers.BusyWorkers = &d.BusyWorkers
globals.ElasticCrawlers = d
log.Debugf("CRAWLERS - Started %d Elasticsearch sync workers.", config.GetConfig().ElasticsearchSyncThreads)
return d
@ -31,9 +43,7 @@ func elasticDeleteWorker(w *workers.CrawlWorkers) {
defer aliveWorkers.Done()
for {
job := w.Queue.GetJob()
if job.Terminate {
fmt.Println("delete worker stopping")
return
}
@ -46,22 +56,27 @@ func elasticDeleteWorker(w *workers.CrawlWorkers) {
log.Warnf("ELCrawlWorker:Add - %s - %s", job.StartPath, err)
}
job.Walker.Wg.Done()
// Only increment the completed counter when we're adding.
e := globals.ElasticCrawlers.Extra.(*CrawlerExtras)
atomic.AddInt32(&e.Added, 1)
} else {
e := *job.Extra
task := e["task"]
if task == TASKDELETE {
e := job.Extra.(JobExtras)
if e.Task == TASKDELETE {
if _, ok := sharedcache.Cache.Get(job.StartPath); !ok {
// If a key in Elastic does not exist in the LRU cache, delete it from Elastic.
key := e["key"].(string)
key := e.Key
err := DeleteFromElasticsearch(key)
if err != nil {
log.Errorf(`ELCrawlWorker:Delete - Error deleting key "%s" - %s`, key, err)
} else {
log.Debugf(`ELCrawlWorker:Delete - Deleted path: "%s"`, job.StartPath)
if config.GetConfig().ElasticPrintChanges {
log.Debugf(`ELCrawlWorker:Delete - Deleted path: "%s"`, job.StartPath)
}
}
}
} else {
panic(task)
panic(e.Task)
}
}
atomic.AddInt32(w.BusyWorkers, -1)

View File

@ -6,6 +6,7 @@ import (
"crazyfs/directorycrawler"
"crazyfs/globals"
"sync"
"sync/atomic"
"time"
)
@ -73,9 +74,22 @@ func syncElasticsearch(doFullSync bool) {
InitializeElasticCrawlerWorkers()
// Refresh the global variables for the workers.
var err error
globalPathsByKeyMutex.Lock()
globalKeysByPathMutex.Lock()
globalKeysByPath, globalPathsByKey, err = getPathsFromIndex(true, 100)
globalPathsByKeyMutex.Unlock()
globalKeysByPathMutex.Unlock()
if err != nil {
log.Errorf("ELASTIC - Error retrieving keys from Elasticsearch: %s", err)
return
}
frozenIndexSize := len(globalKeysByPath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ticker := time.NewTicker(60 * time.Second)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
go func() {
@ -85,33 +99,26 @@ func syncElasticsearch(doFullSync bool) {
return
case <-ticker.C:
elapsed := time.Since(start)
logStr := "ELASTIC - Sync in progress. Elapsed: %s. Busy workers: %d. Jobs queued: %d"
log.Debugf(logStr, elapsed, globals.ElasticCrawlers.BusyWorkers, globals.ElasticCrawlers.Queue.GetQueueSize())
e := globals.ElasticCrawlers.Extra.(*CrawlerExtras)
logStr := "ELASTIC - Sync in progress. Completed: %d/%d, Elapsed: %s. Busy workers: %d. Jobs queued: %d"
log.Debugf(logStr, atomic.LoadInt32(&e.Added), frozenIndexSize, elapsed, atomic.LoadInt32(&globals.ElasticCrawlers.BusyWorkers), globals.ElasticCrawlers.Queue.GetQueuedJobs())
}
}
}()
// Refresh the global variables for the workers.
var err error
globalPathsByKeyMutex.Lock()
globalKeysByPathMutex.Lock()
globalKeysByPath, globalPathsByKey, err = getPathsFromIndex()
globalPathsByKeyMutex.Unlock()
globalKeysByPathMutex.Unlock()
if err != nil {
log.Errorf("ELASTIC - Error retrieving keys from Elasticsearch: %s", err)
return
}
startRemoveStaleItemsFromElasticsearch()
dc := directorycrawler.NewDirectoryCrawler(globals.ElasticCrawlers.Queue)
err = dc.Crawl(config.GetConfig().RootDir, addToElasticsearch)
var crawlFailed bool
if err != nil {
crawlFailed = true
log.Errorf("ELASTIC - Crawl failed: %s", err)
return
}
e := globals.ElasticCrawlers.Extra.(*CrawlerExtras)
addedItems := atomic.LoadInt32(&e.Added)
// Shut down the elastic sync workers once we've finished.
globals.ElasticCrawlers.Queue.Terminate()
aliveWorkers.Wait()
@ -123,8 +130,11 @@ func syncElasticsearch(doFullSync bool) {
globalPathsByKeyMutex.Unlock()
globalKeysByPathMutex.Unlock()
duration := time.Since(start)
log.Infof("ELASTIC - %s sync finished in %s", syncType, duration)
if !crawlFailed {
duration := time.Since(start)
log.Infof("ELASTIC - %s sync finished in %s and added %d items.", syncType, duration, addedItems)
}
}
func logElasticConnError(err error) {
@ -133,14 +143,15 @@ func logElasticConnError(err error) {
// EnableElasticsearchConnection tests the connection to Elastic and enables the backend if it's successful.
func EnableElasticsearchConnection() {
esSize, err := getElasticSize()
if err != nil || esSize == -1 {
_, _, err := getPathsFromIndex(false, 10) // query a very small sample
if err != nil {
logElasticConnError(err)
Enabled = false
return
}
Enabled = true
log.Infof(`ELASTIC - Connected to index "%s". Contains %d items.`, config.GetConfig().ElasticsearchIndex, esSize)
//http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = config.GetConfig().ElasticsearchSyncThreads
log.Infof(`ELASTIC - Connected to index "%s".`, config.GetConfig().ElasticsearchIndex)
}
func LogElasticQuit() {

View File

@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"github.com/elastic/go-elasticsearch/v8/esapi"
"io"
"os"
"sync"
)
@ -72,22 +73,30 @@ func performAddToElasticsearch(item *cacheitem.Item) error {
DocumentID: encodeToBase64(item.Path),
Body: bytes.NewReader(data),
Refresh: "true",
Timeout: 100,
}
res, err := req.Do(context.Background(), ElasticClient)
if err != nil {
return err
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return err
}
res.Body.Close()
reader := bytes.NewReader(body)
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
if err := json.NewDecoder(reader).Decode(&e); err != nil {
return errors.New(fmt.Sprintf("Error parsing the response body: %s", err))
}
return errors.New(fmt.Sprintf(`Error indexing document "%s" - Status code: %d - Response: %s`, item.Path, res.StatusCode, e))
}
log.Debugf(`ELASTIC:Add - Added: "%s"`, preparedItem.Path)
if config.GetConfig().ElasticPrintChanges {
log.Debugf(`ELASTIC:Add - Added: "%s"`, preparedItem.Path)
}
return nil
}

View File

@ -1,6 +1,7 @@
package elastic
import (
"bytes"
"context"
"crazyfs/config"
"crazyfs/globals"
@ -9,6 +10,7 @@ import (
"errors"
"fmt"
"github.com/elastic/go-elasticsearch/v8/esapi"
"io"
)
type DeleteJob struct {
@ -29,12 +31,11 @@ func startRemoveStaleItemsFromElasticsearch() {
for path, key := range globalPathsByKey {
job := queuedwalk.Job{
StartPath: path,
Extra: JobExtras{
Task: TASKDELETE,
Key: key,
},
}
extra := make(map[string]interface{})
extra["task"] = TASKDELETE
extra["key"] = key
job.Extra = &extra
globals.ElasticCrawlers.Queue.AddJob(job)
}
}
@ -49,12 +50,17 @@ func DeleteFromElasticsearch(key string) error {
if err != nil {
return err
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return err
}
res.Body.Close()
reader := bytes.NewReader(body)
// If we tried to delete a key that doesn't exist in Elastic, it will return an error.
if res.IsError() && res.StatusCode != 404 {
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
if err := json.NewDecoder(reader).Decode(&e); err != nil {
text := fmt.Sprintf("failed to parse the response body: %s", err)
return errors.New(text)
}

View File

@ -11,14 +11,14 @@ import (
)
func getElasticSize() (int, error) {
keysByPath, _, err := getPathsFromIndex()
keysByPath, _, err := getPathsFromIndex(true, 100)
if err != nil {
return -1, err
}
return len(keysByPath), nil
}
func getPathsFromIndex() (map[string]string, map[string]string, error) {
func getPathsFromIndex(doScroll bool, withSize int) (map[string]string, map[string]string, error) {
// This may take a bit if the index is very large, so avoid calling this.
// Print a debug message so the user doesn't think we're frozen.
@ -32,7 +32,8 @@ func getPathsFromIndex() (map[string]string, map[string]string, error) {
ElasticClient.Search.WithContext(context.Background()),
ElasticClient.Search.WithIndex(config.GetConfig().ElasticsearchIndex),
ElasticClient.Search.WithScroll(time.Minute),
ElasticClient.Search.WithSize(1000),
ElasticClient.Search.WithSize(withSize),
ElasticClient.Search.WithSourceIncludes("path"), // Only return the 'path' field
)
if err != nil {
msg := fmt.Sprintf("Error getting response: %s", err)
@ -70,6 +71,10 @@ func getPathsFromIndex() (map[string]string, map[string]string, error) {
}
}
if !doScroll {
break
}
// Next scroll
res, err = ElasticClient.Scroll(ElasticClient.Scroll.WithScrollID(scrollID), ElasticClient.Scroll.WithScroll(time.Minute))
if err != nil {

View File

@ -10,4 +10,5 @@ var DirectoryCrawlers *DcWorkers
type DcWorkers struct {
Queue *queuedwalk.JobQueue
BusyWorkers int32
Extra interface{} // Used to store additional info.
}

View File

@ -11,7 +11,7 @@ import (
type Job struct {
StartPath string
Walker *Walker // A pointer to the shared Walker object is passed as well.
Extra *map[string]interface{}
Extra interface{}
Terminate bool
}
@ -48,6 +48,11 @@ func (q *JobQueue) AddJob(job Job) bool {
// GetJob is how a worker pulls a job from the queue.
func (q *JobQueue) GetJob() Job {
q.mutex.Lock()
defer q.mutex.Unlock()
for q.GetQueuedJobs() == 0 && !q.terminate {
q.cond.Wait()
}
if q.terminate {
// Return an empty job that tells the worker to quit.
return Job{
@ -55,12 +60,6 @@ func (q *JobQueue) GetJob() Job {
Terminate: true,
}
}
q.mutex.Lock()
defer q.mutex.Unlock()
for q.GetQueueSize() == 0 {
q.cond.Wait()
}
job, err := q.fifo.DequeueOrWaitForNextElement()
if err != nil {
panic(err)
@ -68,8 +67,8 @@ func (q *JobQueue) GetJob() Job {
return job.(Job)
}
// GetQueueSize returns the size of the queue.
func (q *JobQueue) GetQueueSize() int {
// GetQueuedJobs returns the size of the queue.
func (q *JobQueue) GetQueuedJobs() int {
return q.fifo.GetLen()
}
@ -78,4 +77,5 @@ func (q *JobQueue) GetQueueSize() int {
func (q *JobQueue) Terminate() {
q.terminate = true
q.fifo.Lock()
q.cond.Broadcast()
}

View File

@ -1,7 +1,6 @@
package workers
import (
"crazyfs/config"
"crazyfs/queuedwalk"
)
@ -16,12 +15,12 @@ type CrawlWorkers struct {
}
// InitializeWorkers starts the number of workers defined by the config.
func InitializeWorkers(workerFunc CrawlWorkerFunc) *CrawlWorkers {
func InitializeWorkers(workerCount int, workerFunc CrawlWorkerFunc) *CrawlWorkers {
w := &CrawlWorkers{
WorkerFunc: workerFunc,
}
w.Queue = queuedwalk.NewJobQueue()
for n := 1; n <= config.GetConfig().DirectoryCrawlers; n++ {
for n := 1; n <= workerCount; n++ {
go w.WorkerFunc(w)
}
return w