import data to feature store

This commit is contained in:
Cyberes 2024-09-28 16:33:03 -06:00
parent 2ec9053a4c
commit 328d2e8684
12 changed files with 167 additions and 52 deletions

View File

@ -0,0 +1,4 @@
```sql
GRANT ALL ON SCHEMA public TO geobackend;
GRANT ALL ON SCHEMA public TO public;
```

View File

@ -5,6 +5,7 @@ from django.db import models
class ImportQueue(models.Model):
id = models.AutoField(primary_key=True)
user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
imported = models.BooleanField(default=False)
geofeatures = models.JSONField(default=list)
original_filename = models.TextField()
raw_kml = models.TextField()
@ -12,3 +13,11 @@ class ImportQueue(models.Model):
data = models.JSONField(default=dict)
log = models.JSONField(default=list)
timestamp = models.DateTimeField(auto_now_add=True)
class FeatureStore(models.Model):
id = models.AutoField(primary_key=True)
user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
source = models.ForeignKey(ImportQueue, on_delete=models.SET_NULL, null=True)
geojson = models.JSONField(null=False)
timestamp = models.DateTimeField(auto_now_add=True)

View File

@ -1,6 +1,6 @@
from django.urls import path
from data.views.import_item import upload_item, fetch_import_queue, fetch_import_waiting, delete_import_item, update_import_item, fetch_import_history, fetch_import_history_item
from data.views.import_item import upload_item, fetch_import_queue, fetch_import_waiting, delete_import_item, update_import_item, fetch_import_history, fetch_import_history_item, import_to_featurestore
urlpatterns = [
path('item/import/upload', upload_item),
@ -10,4 +10,5 @@ urlpatterns = [
path('item/import/get/history/<int:item_id>', fetch_import_history_item),
path('item/import/delete/<int:id>', delete_import_item),
path('item/import/update/<int:item_id>', update_import_item),
path('item/import/perform/<int:item_id>', import_to_featurestore),
]

View File

@ -9,7 +9,7 @@ from django.http import HttpResponse, JsonResponse
from django.views.decorators.csrf import csrf_protect
from django.views.decorators.http import require_http_methods
from data.models import ImportQueue
from data.models import ImportQueue, FeatureStore
from geo_lib.daemon.database.locking import DBLockManager
from geo_lib.daemon.workers.workers_lib.importer.kml import kmz_to_kml
from geo_lib.daemon.workers.workers_lib.importer.tagging import generate_auto_tags
@ -38,7 +38,7 @@ def upload_item(request):
try:
kml_doc = kmz_to_kml(file_data)
except Exception as e:
except:
print(traceback.format_exc()) # TODO: logging
return JsonResponse({'success': False, 'msg': 'failed to parse KML/KMZ', 'id': None}, status=400)
@ -50,7 +50,6 @@ def upload_item(request):
import_queue = ImportQueue.objects.get(
raw_kml=kml_doc,
raw_kml_hash=_hash_kml(kml_doc),
# original_filename=file_name,
user=request.user
)
msg = 'upload successful'
@ -73,8 +72,11 @@ def fetch_import_queue(request, item_id):
lock_manager = DBLockManager()
try:
item = ImportQueue.objects.get(id=item_id)
if item.user_id != request.user.id:
return JsonResponse({'success': False, 'processing': False, 'msg': 'not authorized to view this item', 'code': 403}, status=400)
if item.imported:
return JsonResponse({'success': False, 'processing': False, 'msg': 'item already imported', 'code': 400}, status=400)
if not lock_manager.is_locked('data_importqueue', item.id) and (len(item.geofeatures) or len(item.log)):
return JsonResponse({'success': True, 'processing': False, 'geofeatures': item.geofeatures, 'log': item.log, 'msg': None, 'original_filename': item.original_filename}, status=200)
return JsonResponse({'success': True, 'processing': True, 'geofeatures': [], 'log': [], 'msg': 'uploaded data still processing'}, status=200)
@ -84,7 +86,7 @@ def fetch_import_queue(request, item_id):
@login_required_401
def fetch_import_waiting(request):
user_items = ImportQueue.objects.exclude(data__contains='[]').filter(user=request.user).values('id', 'geofeatures', 'original_filename', 'raw_kml_hash', 'data', 'log', 'timestamp')
user_items = ImportQueue.objects.exclude(data__contains='[]').filter(user=request.user, imported=False).values('id', 'geofeatures', 'original_filename', 'raw_kml_hash', 'data', 'log', 'timestamp', 'imported')
data = json.loads(json.dumps(list(user_items), cls=DjangoJSONEncoder))
lock_manager = DBLockManager()
for i, item in enumerate(data):
@ -97,7 +99,7 @@ def fetch_import_waiting(request):
@login_required_401
def fetch_import_history(request):
user_items = ImportQueue.objects.filter(geofeatures__contains='[]', user=request.user).values('id', 'original_filename', 'timestamp')
user_items = ImportQueue.objects.filter(imported=True).values('id', 'original_filename', 'timestamp')
data = json.loads(json.dumps(list(user_items), cls=DjangoJSONEncoder))
return JsonResponse({'data': data})
@ -159,9 +161,6 @@ def update_import_item(request, item_id):
c.properties.tags = generate_auto_tags(c)
parsed_data.append(json.loads(c.model_dump_json()))
# Erase the geofeatures column
queue.geofeatures = []
# Update the data column with the new data
queue.data = parsed_data
@ -169,6 +168,43 @@ def update_import_item(request, item_id):
return JsonResponse({'success': True, 'msg': 'Item updated successfully'})
@login_required_401
@csrf_protect # TODO: put this on all routes
@require_http_methods(["POST"])
def import_to_featurestore(request, item_id):
try:
import_item = ImportQueue.objects.get(id=item_id)
except ImportQueue.DoesNotExist:
return JsonResponse({'success': False, 'msg': 'ID does not exist', 'code': 404}, status=400)
if import_item.user_id != request.user.id:
return JsonResponse({'success': False, 'msg': 'not authorized to edit this item', 'code': 403}, status=403)
import_item.imported = True
i = 0
for feature in import_item.geofeatures:
match feature['type'].lower():
case 'point':
c = GeoPoint(**feature)
case 'linestring':
c = GeoLineString(**feature)
case 'polygon':
c = GeoPolygon(**feature)
case _:
continue
data = json.loads(c.model_dump_json())
feature = FeatureStore.objects.create(geojson=data, source=import_item, user=request.user)
feature.save()
i += 1
# Erase the geofeatures column
import_item.geofeatures = []
import_item.save()
return JsonResponse({'success': True, 'msg': f'Successfully imported {i} items'})
def _hash_kml(b: str):
if not isinstance(b, bytes):
b = b.encode()

View File

@ -14,7 +14,7 @@ from geo_lib.logging.database import log_to_db, DatabaseLogLevel, DatabaseLogSou
from geo_lib.time import get_time_ms
from geo_lib.types.feature import geojson_to_geofeature
_SQL_GET_UNPROCESSED_ITEMS = "SELECT * FROM public.data_importqueue WHERE geofeatures = '[]'::jsonb ORDER BY id ASC"
_SQL_GET_UNPROCESSED_ITEMS = "SELECT * FROM public.data_importqueue WHERE geofeatures = '[]'::jsonb AND imported = false ORDER BY id ASC"
_SQL_INSERT_PROCESSED_ITEM = "UPDATE public.data_importqueue SET geofeatures = %s, log = %s WHERE id = %s"
_SQL_DELETE_ITEM = "DELETE FROM public.data_importqueue WHERE id = %s"
@ -62,7 +62,6 @@ def import_worker():
features = [] # dummy data
if success:
features = [json.loads(x.model_dump_json()) for x in geofetures]
time.sleep(1)
import_log.add(f'Processing finished {"un" if not success else ""}successfully')
with CursorFromConnectionFromPool(cursor_factory=RealDictCursor) as cursor:
data = json.dumps(features)

View File

@ -1,12 +1,13 @@
import datetime
import json
from typing import List, Optional
from typing import List
from typing import Optional
from pydantic import BaseModel
from pydantic import BaseModel, Field
class ImportLogMsg(BaseModel):
timestamp: Optional[str] = datetime.datetime.now(datetime.timezone.utc).isoformat()
timestamp: Optional[str] = Field(default_factory=lambda: datetime.datetime.now(datetime.timezone.utc).isoformat())
msg: str

View File

@ -6,7 +6,8 @@ import {ImportQueueItem} from "@/assets/js/types/import-types";
export default createStore({
state: {
userInfo: UserInfo,
importQueue: ImportQueueItem
importQueue: ImportQueueItem,
importQueueRefreshTrigger: false,
}, mutations: {
userInfo(state, payload) {
@ -14,10 +15,21 @@ export default createStore({
},
importQueue(state, payload) {
state.importQueue = payload
}
},
setImportQueue(state, importQueue) {
state.importQueue = importQueue;
},
triggerImportQueueRefresh(state) {
state.importQueueRefreshTrigger = !state.importQueueRefreshTrigger;
},
}, getters: {
// alertExists: (state) => (message) => {
// return state.siteAlerts.includes(message);
// },
}
},
actions: {
refreshImportQueue({commit}) {
commit('triggerImportQueueRefresh');
},
},
})

View File

@ -88,9 +88,14 @@
</div>
<div v-if="itemsForUser.length > 0">
<button class="m-2 bg-green-500 hover:bg-green-600 text-white font-bold py-2 px-4 rounded"
<button :disabled="lockButtons"
class="m-2 bg-green-500 hover:bg-green-600 disabled:bg-green-300 text-white font-bold py-2 px-4 rounded"
@click="saveChanges">Save
</button>
<button :disabled="lockButtons"
class="m-2 bg-blue-500 hover:bg-blue-600 disabled:bg-blue-300 text-white font-bold py-2 px-4 rounded"
@click="performImport">Import
</button>
</div>
@ -129,6 +134,7 @@ export default {
itemsForUser: [],
originalItems: [],
workerLog: [],
lockButtons: false,
flatpickrConfig: {
enableTime: true,
time_24hr: true,
@ -184,38 +190,69 @@ export default {
this.itemsForUser[index].properties.created = selectedDates[0];
},
saveChanges() {
const csrftoken = getCookie('csrftoken');
this.lockButtons = true
const csrftoken = getCookie('csrftoken')
axios.put('/api/data/item/import/update/' + this.id, this.itemsForUser, {
headers: {
'X-CSRFToken': csrftoken
}
}).then(response => {
if (response.data.success) {
this.msg = 'Changes saved successfully.';
window.alert(this.msg);
window.alert(response.data.msg);
} else {
this.msg = 'Error saving changes: ' + response.data.msg;
window.alert(this.msg);
}
this.lockButtons = false
}).catch(error => {
this.msg = 'Error saving changes: ' + error.message;
window.alert(this.msg);
});
},
}
,
async performImport() {
this.lockButtons = true
const csrftoken = getCookie('csrftoken')
// Save changes first.
await axios.put('/api/data/item/import/update/' + this.id, this.itemsForUser, {
headers: {
'X-CSRFToken': csrftoken
}
})
axios.post('/api/data/item/import/perform/' + this.id, [], {
headers: {
'X-CSRFToken': csrftoken
}
}).then(response => {
if (response.data.success) {
this.$store.dispatch('refreshImportQueue')
window.alert(response.data.msg);
} else {
this.msg = 'Error performing import: ' + response.data.msg;
window.alert(this.msg);
}
this.lockButtons = false
}).catch(error => {
this.msg = 'Error performing import: ' + error.message;
window.alert(this.msg);
this.lockButtons = false
});
},
},
beforeRouteEnter(to, from, next) {
const now = new Date().toISOString()
let ready = false
next(async vm => {
if (vm.currentId !== vm.id) {
vm.msg = ""
vm.currentId = null
vm.originalFilename = null
vm.itemsForUser = []
vm.originalItems = []
vm.workerLog = []
vm.lockButtons = false
while (!ready) {
vm.msg = ""
vm.currentId = null
vm.originalFilename = null
vm.itemsForUser = []
vm.originalItems = []
vm.workerLog = []
try {
const response = await axios.get('/api/data/item/import/get/' + vm.id)
if (!response.data.success) {
@ -246,7 +283,8 @@ export default {
}
}
})
},
}
,
}
</script>

View File

@ -31,12 +31,12 @@
<div class="text-center mt-2">{{ uploadProgress }}%</div>
</div>
<div class="prose" v-html="uploadResponse"></div>
<div v-if="uploadMsg !== ''" class="mt-10 max-h-40 overflow-y-auto bg-gray-200 rounded-s p-5">
<strong>Message from Server:</strong><br>
<div v-if="uploadMsg !== ''" class="max-h-40 overflow-y-auto bg-gray-200 rounded-s p-5">
<!-- <strong>Message from Server:</strong><br>-->
{{ uploadMsg }}
</div>
<div class="prose mt-5" v-html="uploadResponse"></div>
</div>
<div class="prose mt-10">
@ -56,6 +56,7 @@ import {capitalizeFirstLetter} from "@/assets/js/string.js";
import {IMPORT_QUEUE_LIST_URL} from "@/assets/js/import/url.js";
import {ImportQueueItem} from "@/assets/js/types/import-types"
import Importqueue from "@/components/import/parts/importqueue.vue";
import {getCookie} from "@/assets/js/auth.js";
// TODO: after import, don't disable the upload, instead add the new item to a table at the button and then prompt the user to continue
@ -104,10 +105,13 @@ export default {
const response = await axios.post('/api/data/item/import/upload', formData, {
headers: {
'Content-Type': 'multipart/form-data',
'X-CSRFToken': this.userInfo.csrftoken
'X-CSRFToken': getCookie('csrftoken')
},
onUploadProgress: (progressEvent) => { // Add this block
onUploadProgress: (progressEvent) => {
this.uploadProgress = Math.round((progressEvent.loaded * 100) / progressEvent.total)
if (this.uploadProgress === 100) {
this.uploadMsg = "Processing..."
}
},
})
this.uploadMsg = capitalizeFirstLetter(response.data.msg).trim(".") + "."
@ -136,6 +140,8 @@ export default {
vm.file = null
vm.disableUpload = false
vm.uploadMsg = ""
vm.uploadProgress = 0
vm.uploadResponse = ""
})
},
watch: {},

View File

@ -1,8 +1,4 @@
<template>
<!-- <div class="mt-4">-->
<!-- <button class="px-4 py-2 bg-blue-500 text-white rounded hover:bg-blue-600" @click="fetchQueueList">Refresh</button>-->
<!-- </div>-->
<table class="mt-6 w-full border-collapse">
<thead>
<tr class="bg-gray-100">
@ -50,6 +46,7 @@ import {authMixin} from "@/assets/js/authMixin.js";
import axios from "axios";
import {IMPORT_QUEUE_LIST_URL} from "@/assets/js/import/url.js";
import {ImportQueueItem} from "@/assets/js/types/import-types";
import {getCookie} from "@/assets/js/auth.js";
export default {
computed: {
@ -63,34 +60,47 @@ export default {
}
},
methods: {
subscribeToRefreshMutation() {
this.$store.subscribe((mutation, state) => {
if (mutation.type === 'triggerImportQueueRefresh') {
this.refreshData();
}
});
},
async refreshData() {
console.log("IMPORT QUEUE: refreshing")
await this.fetchQueueList()
},
async fetchQueueList() {
this.isLoading = true
const response = await axios.get(IMPORT_QUEUE_LIST_URL)
const ourImportQueue = response.data.data.map((item) => new ImportQueueItem(item))
this.$store.commit('importQueue', ourImportQueue)
this.$store.commit('setImportQueue', ourImportQueue)
this.isLoading = false
},
async deleteItem(item, index) {
if (window.confirm(`Delete "${item.original_filename}" (#${item.id})`))
if (window.confirm(`Delete "${item.original_filename}" (#${item.id})`)) {
try {
this.importQueue.splice(index, 1)
// TODO: add a message popup when delete is completed
this.importQueue.splice(index, 1);
const response = await axios.delete('/api/data/item/import/delete/' + item.id, {
headers: {
'X-CSRFToken': this.userInfo.csrftoken
'X-CSRFToken': getCookie('csrftoken')
}
})
});
if (!response.data.success) {
throw new Error("server reported failure")
throw new Error("server reported failure");
}
await this.fetchQueueList()
await this.refreshData(); // Refresh the data after deleting an item
} catch (error) {
alert(`Failed to delete ${item.id}: ${error.message}`)
this.importQueue.splice(index, 0, item)
alert(`Failed to delete ${item.id}: ${error.message}`);
this.importQueue.splice(index, 0, item);
}
}
}
},
},
async created() {
await this.fetchQueueList()
this.subscribeToRefreshMutation()
},
}
</script>