aboutsummaryrefslogtreecommitdiff
blob: 03ace915a217cb17d428752f28953cbf45ec6253 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# Copyright 2018-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import time
from portage.tests import TestCase
from portage.util._async.ForkProcess import ForkProcess
from portage.util._eventloop.global_event_loop import global_event_loop
from portage.util.futures import asyncio
from portage.util.futures.compat_coroutine import coroutine, coroutine_return
from portage.util.futures.iter_completed import (
	iter_completed,
	async_iter_completed,
)


class SleepProcess(ForkProcess):
	__slots__ = ('future', 'seconds')
	def _start(self):
		self.addExitListener(self._future_done)
		ForkProcess._start(self)

	def _future_done(self, task):
		if not self.future.cancelled():
			self.future.set_result(self.seconds)

	def _run(self):
		time.sleep(self.seconds)


class IterCompletedTestCase(TestCase):

	def testIterCompleted(self):

		# Mark this as todo, since we don't want to fail if heavy system
		# load causes the tasks to finish in an unexpected order.
		self.todo = True

		loop = global_event_loop()
		tasks = [
			SleepProcess(seconds=0.200),
			SleepProcess(seconds=0.100),
			SleepProcess(seconds=0.001),
		]

		expected_order = sorted(task.seconds for task in tasks)

		@coroutine
		def task_coroutine(task):
			task.future = loop.create_future()
			task.scheduler = loop
			yield task.async_start()
			result = yield task.future
			coroutine_return(result)

		def future_generator():
			for task in tasks:
				yield task_coroutine(task)

		for seconds, future in zip(expected_order, iter_completed(future_generator(),
			max_jobs=True, max_load=None, loop=loop)):
			self.assertEqual(seconds, future.result())

	def testAsyncCancel(self):

		loop = global_event_loop()
		input_futures = set()
		future_count = 3

		def future_generator():
			for i in range(future_count):
				future = loop.create_future()
				loop.call_soon(lambda future: None if future.done()
					else future.set_result(None), future)
				input_futures.add(future)
				yield future

		for future_done_set in async_iter_completed(future_generator(),
			max_jobs=True, max_load=True, loop=loop):
			while not input_futures:
				loop.run_until_complete(asyncio.sleep(0, loop=loop))
			future_done_set.cancel()
			break

		# With max_jobs=True, async_iter_completed should have executed
		# the generator until it raised StopIteration.
		self.assertEqual(future_count, len(input_futures))

		loop.run_until_complete(asyncio.wait(input_futures, loop=loop))

		# The futures may have results or they may have been cancelled
		# by TaskScheduler, and behavior varies depending on the python
		# interpreter.
		for future in input_futures:
			future.cancelled() or future.result()