Source code for stestr.repository.memory

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""In memory storage of test results."""

from extras import try_import

from io import BytesIO
from operator import methodcaller

import subunit
import testtools

from stestr.repository import abstract as repository

OrderedDict = try_import('collections.OrderedDict', dict)


[docs]class RepositoryFactory(repository.AbstractRepositoryFactory): """A factory that can initialise and open memory repositories. This is used for testing where a repository may be created and later opened, but tests should not see each others repositories. """ def __init__(self): self.repos = {}
[docs] def initialise(self, url): self.repos[url] = Repository() return self.repos[url]
[docs] def open(self, url): try: return self.repos[url] except KeyError: raise repository.RepositoryNotFound(url)
[docs]class Repository(repository.AbstractRepository): """In memory storage of test results.""" def __init__(self): # Test runs: self._runs = [] self._failing = OrderedDict() # id -> test self._times = {} # id -> duration
[docs] def count(self): return len(self._runs)
[docs] def get_failing(self): return _Failures(self)
[docs] def get_test_run(self, run_id): if run_id < 0: raise KeyError("No such run.") return self._runs[run_id]
[docs] def get_run_ids(self): return list(self._runs.keys())
[docs] def remove_run_id(self, run_id): if run_id not in self._runs: raise KeyError("No run %s in repository" % run_id) del self._run[run_id]
[docs] def latest_id(self): result = self.count() - 1 if result < 0: raise KeyError("No tests in repository") return result
def _get_inserter(self, partial, run_id=None, metadata=None): return _Inserter(self, partial, run_id, metadata) def _get_test_times(self, test_ids): result = {} for test_id in test_ids: duration = self._times.get(test_id, None) if duration is not None: result[test_id] = duration return result
# XXX: Too much duplication between this and _Inserter class _Failures(repository.AbstractTestRun): """Report on failures from a memory repository.""" def __init__(self, repository): self._repository = repository def get_id(self): return None def get_subunit_stream(self): result = BytesIO() serialiser = subunit.v2.StreamResultToBytes(result) serialiser = testtools.ExtendedToStreamDecorator(serialiser) serialiser.startTestRun() try: self.run(serialiser) finally: serialiser.stopTestRun() result.seek(0) return result def get_test(self): def wrap_result(result): # Wrap in a router to mask out startTestRun/stopTestRun from the # ExtendedToStreamDecorator. result = testtools.StreamResultRouter(result, do_start_stop_run=False) # Wrap that in ExtendedToStreamDecorator to convert v1 calls to # StreamResult. return testtools.ExtendedToStreamDecorator(result) return testtools.DecorateTestCaseResult( self, wrap_result, methodcaller('startTestRun'), methodcaller('stopTestRun')) def run(self, result): # Speaks original V1 protocol. for case in self._repository._failing.values(): case.run(result) class _Inserter(repository.AbstractTestRun): """Insert test results into a memory repository.""" def __init__(self, repository, partial, run_id=None, metadata=None): self._repository = repository self._partial = partial self._tests = [] # Subunit V2 stream for get_subunit_stream self._subunit = None self._run_id = run_id self._metadata = metadata def startTestRun(self): self._subunit = BytesIO() serialiser = subunit.v2.StreamResultToBytes(self._subunit) self._hook = testtools.CopyStreamResult([ testtools.StreamToDict(self._handle_test), serialiser]) self._hook.startTestRun() def _handle_test(self, test_dict): self._tests.append(test_dict) start, stop = test_dict['timestamps'] if test_dict['status'] == 'exists' or None in (start, stop): return duration_delta = stop - start duration_seconds = ( (duration_delta.microseconds + ( duration_delta.seconds + duration_delta.days * 24 * 3600) * 10 ** 6) / 10.0 ** 6) self._repository._times[test_dict['id']] = duration_seconds def stopTestRun(self): self._hook.stopTestRun() self._repository._runs.append(self) if not self._run_id: self._run_id = len(self._repository._runs) - 1 if not self._partial: self._repository._failing = OrderedDict() for test_dict in self._tests: test_id = test_dict['id'] if test_dict['status'] == 'fail': case = testtools.testresult.real.test_dict_to_case(test_dict) self._repository._failing[test_id] = case else: self._repository._failing.pop(test_id, None) return self._run_id def status(self, *args, **kwargs): self._hook.status(*args, **kwargs) def get_id(self): return self._run_id def get_subunit_stream(self): self._subunit.seek(0) return self._subunit def get_test(self): def wrap_result(result): # Wrap in a router to mask out startTestRun/stopTestRun from the # ExtendedToStreamDecorator. result = testtools.StreamResultRouter(result, do_start_stop_run=False) # Wrap that in ExtendedToStreamDecorator to convert v1 calls to # StreamResult. return testtools.ExtendedToStreamDecorator(result) return testtools.DecorateTestCaseResult( self, wrap_result, methodcaller('startTestRun'), methodcaller('stopTestRun')) def run(self, result): # Speaks original. for test_dict in self._tests: case = testtools.testresult.real.test_dict_to_case(test_dict) case.run(result)