aboutsummaryrefslogtreecommitdiff
blob: 640eead91c7d0adb7e8d46a070f5db45b05ad6c4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# Copyright 1999-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import functools

from _emerge.AsynchronousLock import AsynchronousLock
from _emerge.CompositeTask import CompositeTask
from _emerge.SpawnProcess import SpawnProcess
try:
	from urllib.parse import urlparse as urllib_parse_urlparse
except ImportError:
	from urlparse import urlparse as urllib_parse_urlparse
import stat
import sys
import portage
from portage import os
from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
from portage.util._pty import _create_pty_or_pipe
from portage.util.futures.compat_coroutine import coroutine

if sys.hexversion >= 0x3000000:
	long = int


class BinpkgFetcher(CompositeTask):

	__slots__ = ("pkg", "pretend", "logfile", "pkg_path")

	def __init__(self, **kwargs):
		CompositeTask.__init__(self, **kwargs)
		pkg = self.pkg
		self.pkg_path = pkg.root_config.trees["bintree"].getname(
			pkg.cpv) + ".partial"

	def _start(self):
		fetcher = _BinpkgFetcherProcess(background=self.background,
			logfile=self.logfile, pkg=self.pkg, pkg_path=self.pkg_path,
			pretend=self.pretend, scheduler=self.scheduler)

		if not self.pretend:
			portage.util.ensure_dirs(os.path.dirname(self.pkg_path))
			if "distlocks" in self.pkg.root_config.settings.features:
				self._start_task(
					AsyncTaskFuture(future=fetcher.async_lock()),
					functools.partial(self._start_locked, fetcher))
				return

		self._start_task(fetcher, self._fetcher_exit)

	def _start_locked(self, fetcher, lock_task):
		self._assert_current(lock_task)
		if lock_task.cancelled:
			self._default_final_exit(lock_task)
			return

		lock_task.future.result()
		self._start_task(fetcher, self._fetcher_exit)

	def _fetcher_exit(self, fetcher):
		self._assert_current(fetcher)
		if not self.pretend and fetcher.returncode == os.EX_OK:
			fetcher.sync_timestamp()
		if fetcher.locked:
			self._start_task(
				AsyncTaskFuture(future=fetcher.async_unlock()),
				functools.partial(self._fetcher_exit_unlocked, fetcher))
		else:
			self._fetcher_exit_unlocked(fetcher)

	def _fetcher_exit_unlocked(self, fetcher, unlock_task=None):
		if unlock_task is not None:
			self._assert_current(unlock_task)
			if unlock_task.cancelled:
				self._default_final_exit(unlock_task)
				return

			unlock_task.future.result()

		self._current_task = None
		self.returncode = fetcher.returncode
		self._async_wait()


class _BinpkgFetcherProcess(SpawnProcess):

	__slots__ = ("pkg", "pretend", "locked", "pkg_path", "_lock_obj")

	def _start(self):
		self.scheduler.run_until_complete(self._async_start())

	@coroutine
	def _async_start(self):
		pkg = self.pkg
		pretend = self.pretend
		bintree = pkg.root_config.trees["bintree"]
		settings = bintree.settings
		pkg_path = self.pkg_path

		exists = os.path.exists(pkg_path)
		resume = exists and os.path.basename(pkg_path) in bintree.invalids
		if not (pretend or resume):
			# Remove existing file or broken symlink.
			try:
				os.unlink(pkg_path)
			except OSError:
				pass

		# urljoin doesn't work correctly with
		# unrecognized protocols like sftp
		if bintree._remote_has_index:
			instance_key = bintree.dbapi._instance_key(pkg.cpv)
			rel_uri = bintree._remotepkgs[instance_key].get("PATH")
			if not rel_uri:
				rel_uri = pkg.cpv + ".tbz2"
			remote_base_uri = bintree._remotepkgs[
				instance_key]["BASE_URI"]
			uri = remote_base_uri.rstrip("/") + "/" + rel_uri.lstrip("/")
		else:
			uri = settings["PORTAGE_BINHOST"].rstrip("/") + \
				"/" + pkg.pf + ".tbz2"

		if pretend:
			portage.writemsg_stdout("\n%s\n" % uri, noiselevel=-1)
			self.returncode = os.EX_OK
			self._async_wait()
			return

		protocol = urllib_parse_urlparse(uri)[0]
		fcmd_prefix = "FETCHCOMMAND"
		if resume:
			fcmd_prefix = "RESUMECOMMAND"
		fcmd = settings.get(fcmd_prefix + "_" + protocol.upper())
		if not fcmd:
			fcmd = settings.get(fcmd_prefix)

		fcmd_vars = {
			"DISTDIR" : os.path.dirname(pkg_path),
			"URI"     : uri,
			"FILE"    : os.path.basename(pkg_path)
		}

		for k in ("PORTAGE_SSH_OPTS",):
			v = settings.get(k)
			if v is not None:
				fcmd_vars[k] = v

		fetch_env = dict(settings.items())
		fetch_args = [portage.util.varexpand(x, mydict=fcmd_vars) \
			for x in portage.util.shlex_split(fcmd)]

		if self.fd_pipes is None:
			self.fd_pipes = {}
		fd_pipes = self.fd_pipes

		# Redirect all output to stdout since some fetchers like
		# wget pollute stderr (if portage detects a problem then it
		# can send it's own message to stderr).
		fd_pipes.setdefault(0, portage._get_stdin().fileno())
		fd_pipes.setdefault(1, sys.__stdout__.fileno())
		fd_pipes.setdefault(2, sys.__stdout__.fileno())

		self.args = fetch_args
		self.env = fetch_env
		if settings.selinux_enabled():
			self._selinux_type = settings["PORTAGE_FETCH_T"]
		yield SpawnProcess._async_start(self)

	def _pipe(self, fd_pipes):
		"""When appropriate, use a pty so that fetcher progress bars,
		like wget has, will work properly."""
		if self.background or not sys.__stdout__.isatty():
			# When the output only goes to a log file,
			# there's no point in creating a pty.
			return os.pipe()
		stdout_pipe = None
		if not self.background:
			stdout_pipe = fd_pipes.get(1)
		got_pty, master_fd, slave_fd = \
			_create_pty_or_pipe(copy_term_size=stdout_pipe)
		return (master_fd, slave_fd)

	def sync_timestamp(self):
			# If possible, update the mtime to match the remote package if
			# the fetcher didn't already do it automatically.
			bintree = self.pkg.root_config.trees["bintree"]
			if bintree._remote_has_index:
				remote_mtime = bintree._remotepkgs[
					bintree.dbapi._instance_key(
					self.pkg.cpv)].get("_mtime_")
				if remote_mtime is not None:
					try:
						remote_mtime = long(remote_mtime)
					except ValueError:
						pass
					else:
						try:
							local_mtime = os.stat(self.pkg_path)[stat.ST_MTIME]
						except OSError:
							pass
						else:
							if remote_mtime != local_mtime:
								try:
									os.utime(self.pkg_path,
										(remote_mtime, remote_mtime))
								except OSError:
									pass

	def async_lock(self):
		"""
		This raises an AlreadyLocked exception if lock() is called
		while a lock is already held. In order to avoid this, call
		unlock() or check whether the "locked" attribute is True
		or False before calling lock().
		"""
		if self._lock_obj is not None:
			raise self.AlreadyLocked((self._lock_obj,))

		result = self.scheduler.create_future()

		def acquired_lock(async_lock):
			if async_lock.wait() == os.EX_OK:
				self.locked = True
				result.set_result(None)
			else:
				result.set_exception(AssertionError(
					"AsynchronousLock failed with returncode %s"
					% (async_lock.returncode,)))

		self._lock_obj = AsynchronousLock(path=self.pkg_path,
			scheduler=self.scheduler)
		self._lock_obj.addExitListener(acquired_lock)
		self._lock_obj.start()
		return result

	class AlreadyLocked(portage.exception.PortageException):
		pass

	def async_unlock(self):
		if self._lock_obj is None:
			raise AssertionError('already unlocked')
		result = self._lock_obj.async_unlock()
		self._lock_obj = None
		self.locked = False
		return result