reorganize data on admin pages, adjust global vars
This commit is contained in:
parent
7004e3935c
commit
11edbeadc3
|
@ -4,7 +4,6 @@ import (
|
||||||
"crazyfs/SharedCache"
|
"crazyfs/SharedCache"
|
||||||
"crazyfs/api/helpers"
|
"crazyfs/api/helpers"
|
||||||
"crazyfs/config"
|
"crazyfs/config"
|
||||||
"crazyfs/elastic"
|
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"crypto/subtle"
|
"crypto/subtle"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -25,14 +24,12 @@ func AdminCacheInfo(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
cacheLen := SharedCache.Cache.Len()
|
cacheLen := SharedCache.Cache.Len()
|
||||||
|
|
||||||
response := map[string]interface{}{
|
response := map[string]interface{}{
|
||||||
"cachedItems": cacheLen,
|
"cachedItems": cacheLen,
|
||||||
"cacheMax": config.GetConfig().CacheSize,
|
"cacheMax": config.GetConfig().CacheSize,
|
||||||
"recacheCrawlLimit": config.GetConfig().CacheRecacheCrawlerLimit,
|
"recacheCrawlLimit": config.GetConfig().CacheRecacheCrawlerLimit,
|
||||||
"newSyncRunning": elastic.FullSyncRunning,
|
|
||||||
"refreshSyncRunning": elastic.RefreshSyncRunning,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set("Cache-Control", "no-store")
|
w.Header().Set("Cache-Control", "no-store")
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
helpers.WriteJsonResponse(response, false, w, r)
|
helpers.WriteJsonResponse(response, false, w, r)
|
||||||
|
|
|
@ -44,8 +44,15 @@ func AdminCrawlsInfo(w http.ResponseWriter, r *http.Request) {
|
||||||
"queue": map[string]interface{}{
|
"queue": map[string]interface{}{
|
||||||
"size": elastic.Queue.GetQueueSize(),
|
"size": elastic.Queue.GetQueueSize(),
|
||||||
},
|
},
|
||||||
|
"syncRunning": map[string]interface{}{
|
||||||
|
"refresh": !elastic.RefreshSyncRunning.TryAcquire(1),
|
||||||
|
"full": !elastic.FullSyncRunning.TryAcquire(1),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
defer elastic.RefreshSyncRunning.Release(1)
|
||||||
|
defer elastic.FullSyncRunning.Release(1)
|
||||||
|
|
||||||
w.Header().Set("Cache-Control", "no-store")
|
w.Header().Set("Cache-Control", "no-store")
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
helpers.WriteJsonResponse(response, false, w, r)
|
helpers.WriteJsonResponse(response, false, w, r)
|
||||||
|
|
|
@ -42,28 +42,37 @@ func SyncThread() {
|
||||||
// TODO: have the workers exit when the sync job is finished
|
// TODO: have the workers exit when the sync job is finished
|
||||||
func syncElasticsearch(doFullSync bool) {
|
func syncElasticsearch(doFullSync bool) {
|
||||||
if !ElasticEnabled {
|
if !ElasticEnabled {
|
||||||
log.Errorln("ELASTIC - disabled, not syncing.")
|
log.Debugln("ELASTIC - disabled, not syncing.")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only one sync at a time. Also helps to prevent races with the global variables.
|
// Only one sync at a time. Also helps to prevent races with the global variables.
|
||||||
syncLock.Lock()
|
syncLock.Lock()
|
||||||
|
|
||||||
|
defer syncLock.Unlock()
|
||||||
|
|
||||||
var syncType string
|
var syncType string
|
||||||
if fullSync {
|
if doFullSync {
|
||||||
FullSyncRunning = true
|
if !FullSyncRunning.TryAcquire(1) {
|
||||||
|
log.Fatalln("ELASTIC - failed to acquire the FullSyncRunning semaphore. This is a logic error.")
|
||||||
|
}
|
||||||
|
defer FullSyncRunning.Release(1)
|
||||||
syncType = "full refresh"
|
syncType = "full refresh"
|
||||||
} else {
|
} else {
|
||||||
RefreshSyncRunning = true
|
if !RefreshSyncRunning.TryAcquire(1) {
|
||||||
syncType = "refresh"
|
log.Fatalln("ELASTIC - failed to acquire the RefreshSyncRunning semaphore. This is a logic error.")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
defer RefreshSyncRunning.Release(1)
|
||||||
|
syncType = "refresh"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set global variables for the workers to read.
|
||||||
|
defer func() { fullSync = false }()
|
||||||
|
fullSync = doFullSync
|
||||||
|
|
||||||
log.Infof("ELASTIC - started a %s sync.", syncType)
|
log.Infof("ELASTIC - started a %s sync.", syncType)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
// Set global variables for the workers to read.
|
|
||||||
fullSync = doFullSync
|
|
||||||
var err error
|
var err error
|
||||||
globalKeysByPath, globalPathsByKey, err = getPathsFromIndex()
|
globalKeysByPath, globalPathsByKey, err = getPathsFromIndex()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -82,9 +91,6 @@ func syncElasticsearch(doFullSync bool) {
|
||||||
|
|
||||||
duration := time.Since(start)
|
duration := time.Since(start)
|
||||||
log.Infof("ELASTIC - %s sync finished in %s", syncType, duration)
|
log.Infof("ELASTIC - %s sync finished in %s", syncType, duration)
|
||||||
FullSyncRunning = false
|
|
||||||
RefreshSyncRunning = false
|
|
||||||
syncLock.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func logElasticConnError(err error) {
|
func logElasticConnError(err error) {
|
||||||
|
|
|
@ -18,7 +18,7 @@ import (
|
||||||
var globalKeysByPath map[string]string
|
var globalKeysByPath map[string]string
|
||||||
var globalPathsByKey map[string]string
|
var globalPathsByKey map[string]string
|
||||||
|
|
||||||
// fullSync is another global variable accessed by the workers and set by syncElasticsearch()
|
// fullSync is another global variable accessed by the workers and only set by syncElasticsearch()
|
||||||
var fullSync bool
|
var fullSync bool
|
||||||
|
|
||||||
func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) error {
|
func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) error {
|
||||||
|
@ -27,7 +27,10 @@ func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) er
|
||||||
cacheItem, found := SharedCache.Cache.Get(relPath)
|
cacheItem, found := SharedCache.Cache.Get(relPath)
|
||||||
if !found {
|
if !found {
|
||||||
log.Warnf(`ELASTICSEARCH - Could not fetch item "%s" from the LRU cache! Deleting this item from Elastic. This error can probably be ignored.`, relPath)
|
log.Warnf(`ELASTICSEARCH - Could not fetch item "%s" from the LRU cache! Deleting this item from Elastic. This error can probably be ignored.`, relPath)
|
||||||
deleteFromElasticsearch(encodeToBase64(relPath))
|
err := deleteFromElasticsearch(encodeToBase64(relPath))
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("ELASTIC - failed to delete \"%s\" - %s", relPath, err)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if _, ok := globalPathsByKey[relPath]; ok {
|
if _, ok := globalPathsByKey[relPath]; ok {
|
||||||
// Item already exists.
|
// Item already exists.
|
||||||
|
|
|
@ -4,16 +4,16 @@ import (
|
||||||
"crazyfs/logging"
|
"crazyfs/logging"
|
||||||
"github.com/elastic/go-elasticsearch/v8"
|
"github.com/elastic/go-elasticsearch/v8"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log *logrus.Logger
|
var log *logrus.Logger
|
||||||
var ElasticClient *elasticsearch.Client
|
var ElasticClient *elasticsearch.Client
|
||||||
|
|
||||||
var RefreshSyncRunning bool
|
// Global variables used by the Admin HTTP routes to show info.
|
||||||
var FullSyncRunning bool
|
var RefreshSyncRunning = semaphore.NewWeighted(1)
|
||||||
|
var FullSyncRunning = semaphore.NewWeighted(1)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
log = logging.GetLogger()
|
log = logging.GetLogger()
|
||||||
RefreshSyncRunning = false
|
|
||||||
FullSyncRunning = false
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ require (
|
||||||
github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d
|
github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d
|
||||||
github.com/sirupsen/logrus v1.9.3
|
github.com/sirupsen/logrus v1.9.3
|
||||||
github.com/spf13/viper v1.16.0
|
github.com/spf13/viper v1.16.0
|
||||||
|
golang.org/x/sync v0.1.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
|
|
@ -301,6 +301,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
|
||||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
||||||
|
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
|
Loading…
Reference in New Issue