125 lines
4.3 KiB
Python
125 lines
4.3 KiB
Python
# coding=utf-8
|
|
|
|
__all__ = ["Downloader"]
|
|
|
|
import logging
|
|
import traceback
|
|
from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed
|
|
from os import environ, makedirs
|
|
from os.path import exists, join
|
|
from random import choice
|
|
from time import time
|
|
|
|
import requests
|
|
from kivy.clock import Clock
|
|
from kivy.logger import LOG_LEVELS, Logger
|
|
|
|
from mapview.constants import CACHE_DIR
|
|
|
|
# if "MAPVIEW_DEBUG_DOWNLOADER" in environ:
|
|
# Logger.setLevel(LOG_LEVELS['debug'])
|
|
# Logger.setLevel(LOG_LEVELS['error'])
|
|
|
|
# user agent is needed because since may 2019 OSM gives me a 429 or 403 server error
|
|
# I tried it with a simpler one (just Mozilla/5.0) this also gets rejected
|
|
USER_AGENT = 'Kivy-garden.mapview'
|
|
|
|
|
|
class Downloader:
|
|
_instance = None
|
|
MAX_WORKERS = 5
|
|
CAP_TIME = 0.064 # 15 FPS
|
|
|
|
@staticmethod
|
|
def instance(cache_dir=None):
|
|
if Downloader._instance is None:
|
|
if not cache_dir:
|
|
cache_dir = CACHE_DIR
|
|
Downloader._instance = Downloader(cache_dir=cache_dir)
|
|
return Downloader._instance
|
|
|
|
def __init__(self, max_workers=None, cap_time=None, **kwargs):
|
|
self.cache_dir = kwargs.get('cache_dir', CACHE_DIR)
|
|
if max_workers is None:
|
|
max_workers = Downloader.MAX_WORKERS
|
|
if cap_time is None:
|
|
cap_time = Downloader.CAP_TIME
|
|
self.is_paused = False
|
|
self.cap_time = cap_time
|
|
self.executor = ThreadPoolExecutor(max_workers=max_workers)
|
|
self._futures = []
|
|
Clock.schedule_interval(self._check_executor, 1 / 60.0)
|
|
if not exists(self.cache_dir):
|
|
makedirs(self.cache_dir)
|
|
|
|
def submit(self, f, *args, **kwargs):
|
|
future = self.executor.submit(f, *args, **kwargs)
|
|
self._futures.append(future)
|
|
|
|
def download_tile(self, tile):
|
|
# Logger.debug(
|
|
# "Downloader: queue(tile) zoom={} x={} y={}".format(
|
|
# tile.zoom, tile.tile_x, tile.tile_y
|
|
# )
|
|
# )
|
|
future = self.executor.submit(self._load_tile, tile)
|
|
self._futures.append(future)
|
|
|
|
def download(self, url, callback, **kwargs):
|
|
# Logger.debug("Downloader: queue(url) {}".format(url))
|
|
future = self.executor.submit(self._download_url, url, callback, kwargs)
|
|
self._futures.append(future)
|
|
|
|
def _download_url(self, url, callback, kwargs):
|
|
# Logger.debug("Downloader: download(url) {}".format(url))
|
|
response = requests.get(url, **kwargs)
|
|
response.raise_for_status()
|
|
return callback, (url, response)
|
|
|
|
def _load_tile(self, tile):
|
|
if tile.state == "done":
|
|
return
|
|
cache_fn = tile.cache_fn
|
|
if exists(cache_fn):
|
|
# Logger.debug("Downloader: use cache {}".format(cache_fn))
|
|
return tile.set_source, (cache_fn,)
|
|
tile_y = tile.map_source.get_row_count(tile.zoom) - tile.tile_y - 1
|
|
uri = tile.map_source.url.format(
|
|
z=tile.zoom, x=tile.tile_x, y=tile_y, s=choice(tile.map_source.subdomains)
|
|
)
|
|
# Logger.debug("Downloader: download(tile) {}".format(uri))
|
|
response = requests.get(uri, headers={'User-agent': USER_AGENT}, timeout=5)
|
|
try:
|
|
response.raise_for_status()
|
|
data = response.content
|
|
with open(cache_fn, "wb") as fd:
|
|
fd.write(data)
|
|
# Logger.debug("Downloaded {} bytes: {}".format(len(data), uri))
|
|
return tile.set_source, (cache_fn,)
|
|
except Exception as e:
|
|
print("Downloader error: {!r}".format(e))
|
|
|
|
def _check_executor(self, dt):
|
|
start = time()
|
|
try:
|
|
for future in as_completed(self._futures[:], 0):
|
|
self._futures.remove(future)
|
|
try:
|
|
result = future.result()
|
|
except Exception:
|
|
traceback.print_exc()
|
|
# make an error tile?
|
|
continue
|
|
if result is None:
|
|
continue
|
|
callback, args = result
|
|
callback(*args)
|
|
|
|
# capped executor in time, in order to prevent too much
|
|
# slowiness.
|
|
# seems to works quite great with big zoom-in/out
|
|
if time() - start > self.cap_time:
|
|
break
|
|
except TimeoutError:
|
|
pass
|