82 lines
2.7 KiB
Python
82 lines
2.7 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
# Copyright 2018 New Vector Ltd
|
||
|
#
|
||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
# you may not use this file except in compliance with the License.
|
||
|
# You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
# See the License for the specific language governing permissions and
|
||
|
# limitations under the License.
|
||
|
|
||
|
|
||
|
from twisted.internet import defer
|
||
|
|
||
|
from synapse.rest.media.v1._base import FileInfo
|
||
|
from synapse.rest.media.v1.media_storage import MediaStorage
|
||
|
from synapse.rest.media.v1.filepath import MediaFilePaths
|
||
|
from synapse.rest.media.v1.storage_provider import FileStorageProviderBackend
|
||
|
|
||
|
from tests import unittest
|
||
|
|
||
|
import os
|
||
|
import shutil
|
||
|
import tempfile
|
||
|
|
||
|
|
||
|
class MediaStorageTests(unittest.TestCase):
|
||
|
def setUp(self):
|
||
|
self.test_dir = tempfile.mkdtemp(prefix="synapse-tests-")
|
||
|
|
||
|
self.primary_base_path = os.path.join(self.test_dir, "primary")
|
||
|
self.secondary_base_path = os.path.join(self.test_dir, "secondary")
|
||
|
|
||
|
storage_providers = [FileStorageProviderBackend(
|
||
|
self.primary_base_path, self.secondary_base_path
|
||
|
)]
|
||
|
|
||
|
self.filepaths = MediaFilePaths(self.primary_base_path)
|
||
|
self.media_storage = MediaStorage(
|
||
|
self.primary_base_path, self.filepaths, storage_providers,
|
||
|
)
|
||
|
|
||
|
def tearDown(self):
|
||
|
shutil.rmtree(self.test_dir)
|
||
|
|
||
|
@defer.inlineCallbacks
|
||
|
def test_ensure_media_is_in_local_cache(self):
|
||
|
media_id = "some_media_id"
|
||
|
test_body = "Test\n"
|
||
|
|
||
|
# First we create a file that is in a storage provider but not in the
|
||
|
# local primary media store
|
||
|
rel_path = self.filepaths.local_media_filepath_rel(media_id)
|
||
|
secondary_path = os.path.join(self.secondary_base_path, rel_path)
|
||
|
|
||
|
os.makedirs(os.path.dirname(secondary_path))
|
||
|
|
||
|
with open(secondary_path, "w") as f:
|
||
|
f.write(test_body)
|
||
|
|
||
|
# Now we run ensure_media_is_in_local_cache, which should copy the file
|
||
|
# to the local cache.
|
||
|
file_info = FileInfo(None, media_id)
|
||
|
local_path = yield self.media_storage.ensure_media_is_in_local_cache(file_info)
|
||
|
|
||
|
self.assertTrue(os.path.exists(local_path))
|
||
|
|
||
|
# Asserts the file is under the expected local cache directory
|
||
|
self.assertEquals(
|
||
|
os.path.commonprefix([self.primary_base_path, local_path]),
|
||
|
self.primary_base_path,
|
||
|
)
|
||
|
|
||
|
with open(local_path) as f:
|
||
|
body = f.read()
|
||
|
|
||
|
self.assertEqual(test_body, body)
|