improve elastic setup, add fields arg to search route, improve list endpoint page handling, fix some minor issues with elastic

This commit is contained in:
Cyberes 2024-03-24 22:43:56 -06:00
parent 6a5526ba3e
commit cccfe6fa1a
11 changed files with 210 additions and 33 deletions

View File

@ -17,4 +17,74 @@ Why we don't store the cache in Elasticsearch? Because Elastic is not as fast as
### Searching ### Searching
We do an Elastic [simple query string search](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-simple-query-string-query.html). We do an
Elastic [simple query string search](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-simple-query-string-query.html).
### Custom Analyzer
An Elasticsearch analyzer is used to break down text data into searchable terms. The default Elastic search analyzer may
have issues with filenames. For example, the default analyzer will turn `testfile160.txt` into `["testfile160", "txt"]`,
meaning a search for `testfile160` will return a hit but a search for `160` will not.
We can create a custom analyzer that better fits our expected field types: file names. This new analyzer will
transform `testfile160.txt` into `["testfile", "160", "txt"]`.
<br>
You must create a new index to change analyzers. The command below will create a new index with this custom analyzer.
Use the Kibana Dev Console to easily run queries: [/app/dev_tools#/console](/app/dev_tools#/console)
<br>
**Create a new index with the custom filename analyzer:**
```json
PUT /crazyfs_search
{
"settings": {
"analysis": {
"analyzer": {
"default": {
"type": "custom",
"tokenizer": "char_group_tokenizer",
"filter": [
"lowercase",
"word_delimiter"
]
}
},
"tokenizer": {
"char_group_tokenizer": {
"type": "char_group",
"tokenize_on_chars": [
"."
]
}
}
}
},
"mappings": {
"properties": {
"filename": {
"type": "text"
}
}
}
}
```
<br>
You can also migrate an existing index:
```json
POST /_reindex
{
"source": {
"index": "old_index"
},
"dest": {
"index": "new_index"
}
}
```

View File

@ -50,9 +50,9 @@ func APISearch(w http.ResponseWriter, r *http.Request) {
limitResults = 0 limitResults = 0
} }
// TODO: update this to match how List does it
sortArg := r.URL.Query().Get("sort") sortArg := r.URL.Query().Get("sort")
var folderSorting string var folderSorting string
switch sortArg { switch sortArg {
case "default", "": case "default", "":
folderSorting = "default" folderSorting = "default"
@ -69,8 +69,14 @@ func APISearch(w http.ResponseWriter, r *http.Request) {
results = make([]*cacheitem.Item, 0) results = make([]*cacheitem.Item, 0)
if config.GetConfig().ElasticsearchEnable { if config.GetConfig().ElasticsearchEnable {
fieldsArg := r.URL.Query().Get("fields")
var queryFields []string
if fieldsArg != "" {
queryFields = strings.Split(fieldsArg, ",")
}
// Perform the Elasticsearch query // Perform the Elasticsearch query
resp, err := elastic.SimpleQuery(queryString, excludeElements) resp, err := elastic.SimpleQuery(queryString, excludeElements, queryFields)
if err != nil { if err != nil {
log.Errorf(`ROUTES:Search - Failed to perform Elasticsearch query "%s" - %s`, queryString, err) log.Errorf(`ROUTES:Search - Failed to perform Elasticsearch query "%s" - %s`, queryString, err)
helpers.Return500Msg(w) helpers.Return500Msg(w)
@ -95,6 +101,7 @@ func APISearch(w http.ResponseWriter, r *http.Request) {
} else { } else {
clientResp = "Query failed" clientResp = "Query failed"
} }
log.Warnf(`ROUTES:Search - query failed: "%s"`, clientResp)
helpers.Return400Msg(clientResp, w) helpers.Return400Msg(clientResp, w)
return return
} }

View File

@ -116,17 +116,15 @@ func APIList(w http.ResponseWriter, r *http.Request) {
if resolveItemName != "" { if resolveItemName != "" {
var resolvedChild *responseitem.ResponseItem var resolvedChild *responseitem.ResponseItem
var resolvedChildPage int var resolvedChildPage int
for i := range pages { outer:
for _, child := range pages[i] { for i, page := range pages {
for _, child := range page {
if child.Name == resolveItemName { if child.Name == resolveItemName {
resolvedChild = child resolvedChild = child
resolvedChildPage = i resolvedChildPage = i
break break outer
} }
} }
if resolvedChild != nil {
break
}
} }
if resolvedChild == nil { if resolvedChild == nil {
helpers.Return400Msg("failed to find item", w) helpers.Return400Msg("failed to find item", w)
@ -134,10 +132,10 @@ func APIList(w http.ResponseWriter, r *http.Request) {
} }
item.Children = make([]*responseitem.ResponseItem, 1) item.Children = make([]*responseitem.ResponseItem, 1)
item.Children[0] = resolvedChild item.Children[0] = resolvedChild
response["resolved_page"] = resolvedChildPage response["resolved_page"] = resolvedChildPage + 1
} }
if r.URL.Query().Get("page") != "" || resolveItemName != "" { if pageParam != "" || resolveItemName != "" {
response["total_pages"] = len(pages) + 1 // We add 1 to the count because arrays are zero-indexed. response["total_pages"] = len(pages) + 1 // We add 1 to the count because arrays are zero-indexed.
} }
@ -152,7 +150,12 @@ func generateListing(cacheItem *cacheitem.Item, paginationLimit int, sortType st
item := responseitem.NewResponseItem(cacheItem) item := responseitem.NewResponseItem(cacheItem)
totalItems := len(item.Children) totalItems := len(item.Children)
totalPages := totalItems / paginationLimit var totalPages int
if totalItems < paginationLimit {
totalPages = 1
} else {
totalPages = totalItems / paginationLimit
}
if sortType == sortFolders { if sortType == sortFolders {
var dirs, files []*responseitem.ResponseItem var dirs, files []*responseitem.ResponseItem

View File

@ -21,9 +21,10 @@ type JobExtras struct {
Key string Key string
} }
// InitializeElasticCrawlerWorkers creates an Elastic worker pool only if one does not already exist.
func InitializeElasticCrawlerWorkers() *globals.DcWorkers { func InitializeElasticCrawlerWorkers() *globals.DcWorkers {
if globals.ElasticCrawlers != nil { if globals.ElasticCrawlers != nil {
panic("ElasticCrawlers has already been defined!") panic("A pool for ElasticCrawlers is already running!")
} }
elWorkers := workers.InitializeWorkers(config.GetConfig().ElasticsearchSyncWorkers, elasticDeleteWorker) elWorkers := workers.InitializeWorkers(config.GetConfig().ElasticsearchSyncWorkers, elasticDeleteWorker)
d := &globals.DcWorkers{} d := &globals.DcWorkers{}

View File

@ -20,8 +20,6 @@ func SyncThread() {
return return
} }
createCrazyfsIndex()
// Run a partial sync at startup, unless configured to run a full one. // Run a partial sync at startup, unless configured to run a full one.
syncElasticsearch(config.GetConfig().ElasticsearchFullSyncOnStart) syncElasticsearch(config.GetConfig().ElasticsearchFullSyncOnStart)
@ -72,8 +70,6 @@ func syncElasticsearch(doFullSync bool) {
log.Infof("ELASTIC - Started a %s sync.", syncType) log.Infof("ELASTIC - Started a %s sync.", syncType)
start := time.Now() start := time.Now()
InitializeElasticCrawlerWorkers()
// Refresh the global variables for the workers. // Refresh the global variables for the workers.
var err error var err error
globalPathsByKeyMutex.Lock() globalPathsByKeyMutex.Lock()
@ -106,6 +102,7 @@ func syncElasticsearch(doFullSync bool) {
} }
}() }()
InitializeElasticCrawlerWorkers()
startRemoveStaleItemsFromElasticsearch() startRemoveStaleItemsFromElasticsearch()
dc := directorycrawler.NewDirectoryCrawler(globals.ElasticCrawlers.Queue) dc := directorycrawler.NewDirectoryCrawler(globals.ElasticCrawlers.Queue)
@ -143,6 +140,8 @@ func logElasticConnError(err error) {
// EnableElasticsearchConnection tests the connection to Elastic and enables the backend if it's successful. // EnableElasticsearchConnection tests the connection to Elastic and enables the backend if it's successful.
func EnableElasticsearchConnection() { func EnableElasticsearchConnection() {
createCrazyfsIndex()
waitForElasticGreen()
_, _, err := getPathsFromIndex(false, 10) // query a very small sample _, _, err := getPathsFromIndex(false, 10) // query a very small sample
if err != nil { if err != nil {
logElasticConnError(err) logElasticConnError(err)

View File

@ -14,6 +14,7 @@ import (
"io" "io"
"os" "os"
"sync" "sync"
"time"
) )
// TODO: is there a better way to store this data? It appears to eat up a lot of memory. // TODO: is there a better way to store this data? It appears to eat up a lot of memory.
@ -74,7 +75,7 @@ func performAddToElasticsearch(item *cacheitem.Item) error {
DocumentID: encodeToBase64(item.Path), DocumentID: encodeToBase64(item.Path),
Body: bytes.NewReader(data), Body: bytes.NewReader(data),
Refresh: "true", Refresh: "true",
Timeout: 100, Timeout: 30 * time.Second,
} }
res, err := req.Do(context.Background(), ElasticClient) res, err := req.Do(context.Background(), ElasticClient)
if err != nil { if err != nil {

View File

@ -1,7 +1,11 @@
package elastic package elastic
import ( import (
"crazyfs/cacheitem"
"crazyfs/config" "crazyfs/config"
"encoding/json"
"github.com/sethvargo/go-password/password"
"time"
) )
func createCrazyfsIndex() { func createCrazyfsIndex() {
@ -21,11 +25,88 @@ func createCrazyfsIndex() {
log.Fatalf("ELASTIC - Error creating index: %s", err) log.Fatalf("ELASTIC - Error creating index: %s", err)
} }
defer res.Body.Close() defer res.Body.Close()
if res.IsError() { if res.IsError() {
log.Fatalf("ELASTIC - Elasticsearch returned error when trying to create index: %s", res.String()) log.Fatalf("ELASTIC - Elasticsearch returned error when trying to create index: %s", res.String())
} }
log.Infof(`ELASTIC - Created a new index named "%s"`, config.GetConfig().ElasticsearchIndex) log.Infof(`ELASTIC - Created a new index named "%s"`, config.GetConfig().ElasticsearchIndex)
} }
} }
// waitForElasticGreen() manually checks the status of the index by waiting until a new document can be added successfully
// then deleting that document and waiting for confirmation it was removed.
// Will always return true or will exit the program.
func waitForElasticGreen() bool {
// Generate a random item to add to the index.
testItemPath, err := password.Generate(256, 10, 0, false, true)
nullStr := "abc123"
children := make([]string, 2)
children[0] = nullStr
children[1] = nullStr
testItem := &cacheitem.Item{
Path: testItemPath,
Name: testItemPath,
Size: 10,
Extension: &nullStr,
Modified: "2022-04-06T23:54:01Z",
Mode: 436,
IsDir: false,
IsSymlink: false,
MimeType: &nullStr,
Encoding: &nullStr,
Children: children,
Content: nullStr,
Cached: 1711338690232,
}
// Try adding that item until it is successfully added.
for {
addErr := performAddToElasticsearch(testItem)
if addErr != nil {
log.Infof("ELASTIC:Startup - waiting for index to be ready...")
log.Debugf(`ELASTIC:Startup - Got error from Elastic: "%s"`, addErr)
} else {
break
}
time.Sleep(5 * time.Second)
}
// Delete the test item.
testItemPathB64 := encodeToBase64(testItemPath)
delErr := DeleteFromElasticsearch(testItemPathB64)
if delErr != nil {
log.Fatalf(`ELASTIC:Startup - failed to clean up connection test: %+v`, delErr)
}
// Wait until Elastic confirms it was deleted.
for {
var excludeElements []string
var queryFields []string
resp, qErr := SimpleQuery(testItemPath, excludeElements, queryFields)
if qErr != nil {
log.Fatalf(`ELASTIC:Startup - query failed: %s`, err)
}
var respData map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&respData)
if err != nil {
log.Fatalf(`ELASTIC:Startup - failed to read query response: %s`, err)
}
if resp.IsError() || resp.StatusCode != 200 {
var errorMsg, clientResp string
errorMsg, err = GetSearchFailureReason(respData)
if err == nil {
log.Fatalf(`ELASTIC:Startup - confirmation query failed: %s`, clientResp)
} else {
log.Fatalf(`ELASTIC:Startup - confirmation query failed (elastic reported error): %s`, errorMsg)
}
}
hits := respData["hits"].(map[string]interface{})["hits"].([]interface{})
items := make([]*cacheitem.Item, len(hits))
if respData["hits"] == nil || len(items) == 0 {
break
}
log.Infof("ELASTIC:Startup - waiting for elastic to confirm test item deletion...")
time.Sleep(5 * time.Second)
}
return true
}

View File

@ -82,9 +82,16 @@ func getPathsFromIndex(doScroll bool, withSize int) (map[string]string, map[stri
} }
// Clear the scroll // Clear the scroll
clearScrollRequest := esapi.ClearScrollRequest{ var scrollID string
ScrollID: []string{r["_scroll_id"].(string)}, if r["_scroll_id"] != nil {
scrollID = r["_scroll_id"].(string)
} else {
return nil, nil, errors.New("_scroll_id was null")
} }
clearScrollRequest := esapi.ClearScrollRequest{
ScrollID: []string{scrollID},
}
clearScrollResponse, err := clearScrollRequest.Do(context.Background(), ElasticClient) clearScrollResponse, err := clearScrollRequest.Do(context.Background(), ElasticClient)
if err != nil { if err != nil {
msg := fmt.Sprintf("Error clearing scroll: %s", err) msg := fmt.Sprintf("Error clearing scroll: %s", err)

View File

@ -10,7 +10,7 @@ import (
"strings" "strings"
) )
func SimpleQuery(query string, exclude []string) (*esapi.Response, error) { func SimpleQuery(query string, exclude []string, fields []string) (*esapi.Response, error) {
var excludeQuery string var excludeQuery string
if len(exclude) > 0 { if len(exclude) > 0 {
var excludeConditions []string var excludeConditions []string
@ -20,18 +20,23 @@ func SimpleQuery(query string, exclude []string) (*esapi.Response, error) {
excludeQuery = fmt.Sprintf(`, "must_not": [%s]`, strings.Join(excludeConditions, ",")) excludeQuery = fmt.Sprintf(`, "must_not": [%s]`, strings.Join(excludeConditions, ","))
} }
var fieldsQuery string
if len(fields) > 0 {
fieldsQuery = fmt.Sprintf(`, "fields": ["%s"]`, strings.Join(fields, `","`))
}
esQuery := fmt.Sprintf(`{ esQuery := fmt.Sprintf(`{
"query": { "query": {
"bool": { "bool": {
"must": { "must": {
"simple_query_string": { "simple_query_string": {
"query": "%s", "query": "%s",
"default_operator": "and" "default_operator": "and"%s
}
}%s
} }
} }%s
}`, query, excludeQuery) }
}
}`, query, fieldsQuery, excludeQuery)
return ElasticClient.Search( return ElasticClient.Search(
ElasticClient.Search.WithContext(context.Background()), ElasticClient.Search.WithContext(context.Background()),

View File

@ -15,6 +15,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d
github.com/sethvargo/go-password v0.2.0
github.com/sirupsen/logrus v1.9.3 github.com/sirupsen/logrus v1.9.3
github.com/spf13/viper v1.16.0 github.com/spf13/viper v1.16.0
golang.org/x/sync v0.1.0 golang.org/x/sync v0.1.0

View File

@ -175,6 +175,8 @@ github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZV
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d h1:hrujxIzL1woJ7AwssoOcM/tq5JjjG2yYOc8odClEiXA= github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d h1:hrujxIzL1woJ7AwssoOcM/tq5JjjG2yYOc8odClEiXA=
github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU= github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU=
github.com/sethvargo/go-password v0.2.0 h1:BTDl4CC/gjf/axHMaDQtw507ogrXLci6XRiLc7i/UHI=
github.com/sethvargo/go-password v0.2.0/go.mod h1:Ym4Mr9JXLBycr02MFuVQ/0JHidNetSgbzutTr3zsYXE=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/afero v1.9.5 h1:stMpOSZFs//0Lv29HduCmli3GUfpFoF3Y1Q/aXj/wVM= github.com/spf13/afero v1.9.5 h1:stMpOSZFs//0Lv29HduCmli3GUfpFoF3Y1Q/aXj/wVM=