Source code for stestr.scheduler

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import collections
import itertools
import multiprocessing
import operator
import random

import yaml

from stestr import selection


[docs]def partition_tests(test_ids, concurrency, repository, group_callback, randomize=False): """Partition test_ids by concurrency. Test durations from the repository are used to get partitions which have roughly the same expected runtime. New tests - those with no recorded duration - are allocated in round-robin fashion to the partitions created using test durations. :param list test_ids: The list of test_ids to be partitioned :param int concurrency: The concurrency that will be used for running the tests. This is the number of partitions that test_ids will be split into. :param repository: A repository object that :param group_callback: A callback function that is used as a scheduler hint to group test_ids together and treat them as a single unit for scheduling. This function expects a single test_id parameter and it will return a group identifier. Tests_ids that have the same group identifier will be kept on the same worker. :param bool randomize: If true each partition's test order will be randomized :return: A list where each element is a distinct subset of test_ids, and the union of all the elements is equal to set(test_ids). """ def noop(_): return None _group_callback = group_callback partitions = [list() for i in range(concurrency)] timed_partitions = [[0.0, partition] for partition in partitions] time_data = {} if repository: time_data = repository.get_test_times(test_ids) timed_tests = time_data['known'] unknown_tests = time_data['unknown'] else: timed_tests = {} unknown_tests = set(test_ids) # Group tests: generate group_id -> test_ids. group_ids = collections.defaultdict(list) if _group_callback is None: group_callback = noop else: group_callback = _group_callback for test_id in test_ids: group_id = group_callback(test_id) or test_id group_ids[group_id].append(test_id) # Time groups: generate three sets of groups: # - fully timed dict(group_id -> time), # - partially timed dict(group_id -> time) and # - unknown (set of group_id) # We may in future treat partially timed different for scheduling, but # at least today we just schedule them after the fully timed groups. timed = {} partial = {} unknown = [] for group_id, group_tests in group_ids.items(): untimed_ids = unknown_tests.intersection(group_tests) group_time = sum( [timed_tests[test_id] for test_id in untimed_ids.symmetric_difference( group_tests)]) if not untimed_ids: timed[group_id] = group_time elif group_time: partial[group_id] = group_time else: unknown.append(group_id) # Scheduling is NP complete in general, so we avoid aiming for # perfection. A quick approximation that is sufficient for our general # needs: # sort the groups by time # allocate to partitions by putting each group in to the partition with # the current (lowest time, shortest length[in tests]) def consume_queue(groups): queue = sorted( groups.items(), key=operator.itemgetter(1), reverse=True) for group_id, duration in queue: timed_partitions[0][0] = timed_partitions[0][0] + duration timed_partitions[0][1].extend(group_ids[group_id]) timed_partitions.sort(key=lambda item: (item[0], len(item[1]))) consume_queue(timed) consume_queue(partial) # Assign groups with entirely unknown times in round robin fashion to # the partitions. for partition, group_id in zip(itertools.cycle(partitions), unknown): partition.extend(group_ids[group_id]) if randomize: out_parts = [] for partition in partitions: temp_part = list(partition) random.shuffle(temp_part) out_parts.append(list(temp_part)) return out_parts else: return partitions
[docs]def local_concurrency(): """Get the number of available CPUs on the system. :return: An int for the number of cpus. Or None if it couldn't be found """ try: return multiprocessing.cpu_count() except NotImplementedError: # No concurrency logic known. return None
[docs]def generate_worker_partitions(ids, worker_path, repository=None, group_callback=None, randomize=False): """Parse a worker yaml file and generate test groups :param list ids: A list of test ids too be partitioned :param path worker_path: The path to a worker file :param repository: A repository object that will be used for looking up timing data. This is optional, and also will only be used for scheduling if there is a count field on a worker. :param group_callback: A callback function that is used as a scheduler hint to group test_ids together and treat them as a single unit for scheduling. This function expects a single test_id parameter and it will return a group identifier. Tests_ids that have the same group identifier will be kept on the same worker. This is optional and also will only be used for scheduling if there is a count field on a worker. :param bool randomize: If true each partition's test order will be randomized. This is optional and also will only be used for scheduling if there is a count field on a worker. :returns: A list where each element is a distinct subset of test_ids. """ with open(worker_path) as worker_file: workers_desc = yaml.safe_load(worker_file.read()) worker_groups = [] for worker in workers_desc: if isinstance(worker, dict) and 'worker' in worker.keys(): if isinstance(worker['worker'], list): local_worker_list = selection.filter_tests( worker['worker'], ids) if 'concurrency' in worker.keys() and worker[ 'concurrency'] > 1: partitioned_tests = partition_tests( local_worker_list, worker['concurrency'], repository, group_callback, randomize) worker_groups.extend(partitioned_tests) else: # If a worker partition is empty don't add it to the output if local_worker_list: worker_groups.append(local_worker_list) else: raise TypeError('The input yaml is the incorrect format') else: raise TypeError('The input yaml is the incorrect format') return worker_groups