summaryrefslogtreecommitdiff
blob: 24835e02277ef6b947adcc53582bace357a03c9e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
From b5020a047fc487f35b76fc05f31e52665a1afda1 Mon Sep 17 00:00:00 2001
From: abhishekkekane <abhishek.kekane@nttdata.com>
Date: Mon, 6 Jul 2015 01:51:26 -0700
Subject: [PATCH] libvirt: Kill rsync/scp processes before deleting instance

In the resize operation, during copying files from source to
destination compute node scp/rsync processes are not aborted after
the instance is deleted because linux kernel doesn't delete instance
files physically until all processes using the file handle is closed
completely. Hence rsync/scp process keeps on running until it
transfers 100% of file data.

Added new module instancejobtracker to libvirt driver which will add,
remove or terminate the processes running against particular instances.
Added callback methods to execute call which will store the pid of
scp/rsync process in cache as a key: value pair and to remove the
pid from the cache after process completion. Process id will be used to
kill the process if it is running while deleting the instance. Instance
uuid is used as a key in the cache and pid will be the value.

Conflicts:
        nova/virt/libvirt/driver.py

SecurityImpact

Closes-bug: #1387543
Change-Id: Ie03acc00a7c904aec13c90ae6a53938d08e5e0c9
(cherry picked from commit 7ab75d5b0b75fc3426323bef19bf436a258b9707)
---
 nova/tests/unit/virt/libvirt/test_driver.py | 38 +++++++++++
 nova/tests/unit/virt/libvirt/test_utils.py  |  9 ++-
 nova/virt/libvirt/driver.py                 | 18 +++++-
 nova/virt/libvirt/instancejobtracker.py     | 98 +++++++++++++++++++++++++++++
 nova/virt/libvirt/utils.py                  | 14 +++--
 5 files changed, 168 insertions(+), 9 deletions(-)
 create mode 100644 nova/virt/libvirt/instancejobtracker.py

diff --git a/nova/tests/unit/virt/libvirt/test_driver.py b/nova/tests/unit/virt/libvirt/test_driver.py
index 859df95..5ff978a 100644
--- a/nova/tests/unit/virt/libvirt/test_driver.py
+++ b/nova/tests/unit/virt/libvirt/test_driver.py
@@ -23,6 +23,7 @@
 import random
 import re
 import shutil
+import signal
 import threading
 import time
 import uuid
@@ -9817,6 +9818,15 @@ def test_shared_storage_detection_easy(self):
         self.mox.ReplayAll()
         self.assertTrue(drvr._is_storage_shared_with('foo', '/path'))
 
+    def test_store_pid_remove_pid(self):
+        instance = objects.Instance(**self.test_instance)
+        drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
+        popen = mock.Mock(pid=3)
+        drvr.job_tracker.add_job(instance, popen.pid)
+        self.assertIn(3, drvr.job_tracker.jobs[instance.uuid])
+        drvr.job_tracker.remove_job(instance, popen.pid)
+        self.assertNotIn(instance.uuid, drvr.job_tracker.jobs)
+
     @mock.patch('nova.virt.libvirt.host.Host.get_domain')
     def test_get_domain_info_with_more_return(self, mock_get_domain):
         instance = objects.Instance(**self.test_instance)
@@ -11316,12 +11326,18 @@ def fake_get_host_ip_addr():
         def fake_execute(*args, **kwargs):
             pass
 
+        def fake_copy_image(src, dest, host=None, receive=False,
+                            on_execute=None, on_completion=None):
+            self.assertIsNotNone(on_execute)
+            self.assertIsNotNone(on_completion)
+
         self.stubs.Set(self.drvr, 'get_instance_disk_info',
                        fake_get_instance_disk_info)
         self.stubs.Set(self.drvr, '_destroy', fake_destroy)
         self.stubs.Set(self.drvr, 'get_host_ip_addr',
                        fake_get_host_ip_addr)
         self.stubs.Set(utils, 'execute', fake_execute)
+        self.stubs.Set(libvirt_utils, 'copy_image', fake_copy_image)
 
         ins_ref = self._create_instance(params=params_for_instance)
 
@@ -12428,6 +12444,28 @@ def test_delete_instance_files(self, get_instance_path, exists, exe,
     @mock.patch('shutil.rmtree')
     @mock.patch('nova.utils.execute')
     @mock.patch('os.path.exists')
+    @mock.patch('os.kill')
+    @mock.patch('nova.virt.libvirt.utils.get_instance_path')
+    def test_delete_instance_files_kill_running(
+            self, get_instance_path, kill, exists, exe, shutil):
+        get_instance_path.return_value = '/path'
+        instance = objects.Instance(uuid='fake-uuid', id=1)
+        self.drvr.job_tracker.jobs[instance.uuid] = [3, 4]
+
+        exists.side_effect = [False, False, True, False]
+
+        result = self.drvr.delete_instance_files(instance)
+        get_instance_path.assert_called_with(instance)
+        exe.assert_called_with('mv', '/path', '/path_del')
+        kill.assert_has_calls([mock.call(3, signal.SIGKILL), mock.call(3, 0),
+                               mock.call(4, signal.SIGKILL), mock.call(4, 0)])
+        shutil.assert_called_with('/path_del')
+        self.assertTrue(result)
+        self.assertNotIn(instance.uuid, self.drvr.job_tracker.jobs)
+
+    @mock.patch('shutil.rmtree')
+    @mock.patch('nova.utils.execute')
+    @mock.patch('os.path.exists')
     @mock.patch('nova.virt.libvirt.utils.get_instance_path')
     def test_delete_instance_files_resize(self, get_instance_path, exists,
                                           exe, shutil):
diff --git a/nova/tests/unit/virt/libvirt/test_utils.py b/nova/tests/unit/virt/libvirt/test_utils.py
index 7fa0326..14bf822 100644
--- a/nova/tests/unit/virt/libvirt/test_utils.py
+++ b/nova/tests/unit/virt/libvirt/test_utils.py
@@ -62,7 +62,8 @@ def test_copy_image_local_cp(self, mock_execute):
         mock_execute.assert_called_once_with('cp', 'src', 'dest')
 
     _rsync_call = functools.partial(mock.call,
-                                    'rsync', '--sparse', '--compress')
+                                    'rsync', '--sparse', '--compress',
+                                    on_execute=None, on_completion=None)
 
     @mock.patch('nova.utils.execute')
     def test_copy_image_rsync(self, mock_execute):
@@ -85,7 +86,8 @@ def test_copy_image_scp(self, mock_execute):
 
         mock_execute.assert_has_calls([
             self._rsync_call('--dry-run', 'src', 'host:dest'),
-            mock.call('scp', 'src', 'host:dest'),
+            mock.call('scp', 'src', 'host:dest',
+                      on_execute=None, on_completion=None),
         ])
         self.assertEqual(2, mock_execute.call_count)
 
@@ -110,7 +112,8 @@ def test_copy_image_scp_ipv6(self, mock_execute):
 
         mock_execute.assert_has_calls([
             self._rsync_call('--dry-run', 'src', '[2600::]:dest'),
-            mock.call('scp', 'src', '[2600::]:dest'),
+            mock.call('scp', 'src', '[2600::]:dest',
+                      on_execute=None, on_completion=None),
         ])
         self.assertEqual(2, mock_execute.call_count)
 
diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py
index 40ee080..0a94d5a 100644
--- a/nova/virt/libvirt/driver.py
+++ b/nova/virt/libvirt/driver.py
@@ -95,6 +95,7 @@
 from nova.virt.libvirt import host
 from nova.virt.libvirt import imagebackend
 from nova.virt.libvirt import imagecache
+from nova.virt.libvirt import instancejobtracker
 from nova.virt.libvirt import lvm
 from nova.virt.libvirt import rbd_utils
 from nova.virt.libvirt import utils as libvirt_utils
@@ -465,6 +466,8 @@ def __init__(self, virtapi, read_only=False):
                    'expect': ', '.join("'%s'" % k for k in
                                        sysinfo_serial_funcs.keys())})
 
+        self.job_tracker = instancejobtracker.InstanceJobTracker()
+
     def _get_volume_drivers(self):
         return libvirt_volume_drivers
 
@@ -6301,6 +6304,11 @@ def migrate_disk_and_power_off(self, context, instance, dest,
                     # finish_migration/_create_image to re-create it for us.
                     continue
 
+                on_execute = lambda process: self.job_tracker.add_job(
+                    instance, process.pid)
+                on_completion = lambda process: self.job_tracker.remove_job(
+                    instance, process.pid)
+
                 if info['type'] == 'qcow2' and info['backing_file']:
                     tmp_path = from_path + "_rbase"
                     # merge backing file
@@ -6310,11 +6318,15 @@ def migrate_disk_and_power_off(self, context, instance, dest,
                     if shared_storage:
                         utils.execute('mv', tmp_path, img_path)
                     else:
-                        libvirt_utils.copy_image(tmp_path, img_path, host=dest)
+                        libvirt_utils.copy_image(tmp_path, img_path, host=dest,
+                                                 on_execute=on_execute,
+                                                 on_completion=on_completion)
                         utils.execute('rm', '-f', tmp_path)
 
                 else:  # raw or qcow2 with no backing file
-                    libvirt_utils.copy_image(from_path, img_path, host=dest)
+                    libvirt_utils.copy_image(from_path, img_path, host=dest,
+                                             on_execute=on_execute,
+                                             on_completion=on_completion)
         except Exception:
             with excutils.save_and_reraise_exception():
                 self._cleanup_remote_migration(dest, inst_base,
@@ -6683,6 +6695,8 @@ def delete_instance_files(self, instance):
         # invocation failed due to the absence of both target and
         # target_resize.
         if not remaining_path and os.path.exists(target_del):
+            self.job_tracker.terminate_jobs(instance)
+
             LOG.info(_LI('Deleting instance files %s'), target_del,
                      instance=instance)
             remaining_path = target_del
diff --git a/nova/virt/libvirt/instancejobtracker.py b/nova/virt/libvirt/instancejobtracker.py
new file mode 100644
index 0000000..d47fb45
--- /dev/null
+++ b/nova/virt/libvirt/instancejobtracker.py
@@ -0,0 +1,98 @@
+# Copyright 2015 NTT corp.
+# All Rights Reserved.
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+import collections
+import errno
+import os
+import signal
+
+from oslo_log import log as logging
+
+from nova.i18n import _LE
+from nova.i18n import _LW
+
+
+LOG = logging.getLogger(__name__)
+
+
+class InstanceJobTracker(object):
+    def __init__(self):
+        self.jobs = collections.defaultdict(list)
+
+    def add_job(self, instance, pid):
+        """Appends process_id of instance to cache.
+
+        This method will store the pid of a process in cache as
+        a key: value pair which will be used to kill the process if it
+        is running while deleting the instance. Instance uuid is used as
+        a key in the cache and pid will be the value.
+
+        :param instance: Object of instance
+        :param pid: Id of the process
+        """
+        self.jobs[instance.uuid].append(pid)
+
+    def remove_job(self, instance, pid):
+        """Removes pid of process from cache.
+
+        This method will remove the pid of a process from the cache.
+
+        :param instance: Object of instance
+        :param pid: Id of the process
+        """
+        uuid = instance.uuid
+        if uuid in self.jobs and pid in self.jobs[uuid]:
+            self.jobs[uuid].remove(pid)
+
+        # remove instance.uuid if no pid's remaining
+        if not self.jobs[uuid]:
+            self.jobs.pop(uuid, None)
+
+    def terminate_jobs(self, instance):
+        """Kills the running processes for given instance.
+
+        This method is used to kill all running processes of the instance if
+        it is deleted in between.
+
+        :param instance: Object of instance
+        """
+        pids_to_remove = list(self.jobs.get(instance.uuid, []))
+        for pid in pids_to_remove:
+            try:
+                # Try to kill the process
+                os.kill(pid, signal.SIGKILL)
+            except OSError as exc:
+                if exc.errno != errno.ESRCH:
+                    LOG.error(_LE('Failed to kill process %(pid)s '
+                                  'due to %(reason)s, while deleting the '
+                                  'instance.'), {'pid': pid, 'reason': exc},
+                              instance=instance)
+
+            try:
+                # Check if the process is still alive.
+                os.kill(pid, 0)
+            except OSError as exc:
+                if exc.errno != errno.ESRCH:
+                    LOG.error(_LE('Unexpected error while checking process '
+                                  '%(pid)s.'), {'pid': pid},
+                              instance=instance)
+            else:
+                # The process is still around
+                LOG.warn(_LW("Failed to kill a long running process "
+                             "%(pid)s related to the instance when "
+                             "deleting it."), {'pid': pid},
+                         instance=instance)
+
+            self.remove_job(instance, pid)
diff --git a/nova/virt/libvirt/utils.py b/nova/virt/libvirt/utils.py
index 7b80464..83d5ba3 100644
--- a/nova/virt/libvirt/utils.py
+++ b/nova/virt/libvirt/utils.py
@@ -294,13 +294,16 @@ def get_disk_backing_file(path, basename=True):
     return backing_file
 
 
-def copy_image(src, dest, host=None, receive=False):
+def copy_image(src, dest, host=None, receive=False,
+               on_execute=None, on_completion=None):
     """Copy a disk image to an existing directory
 
     :param src: Source image
     :param dest: Destination path
     :param host: Remote host
     :param receive: Reverse the rsync direction
+    :param on_execute: Callback method to store pid of process in cache
+    :param on_completion: Callback method to remove pid of process from cache
     """
 
     if not host:
@@ -322,11 +325,14 @@ def copy_image(src, dest, host=None, receive=False):
             # Do a relatively light weight test first, so that we
             # can fall back to scp, without having run out of space
             # on the destination for example.
-            execute('rsync', '--sparse', '--compress', '--dry-run', src, dest)
+            execute('rsync', '--sparse', '--compress', '--dry-run', src, dest,
+                    on_execute=on_execute, on_completion=on_completion)
         except processutils.ProcessExecutionError:
-            execute('scp', src, dest)
+            execute('scp', src, dest, on_execute=on_execute,
+                    on_completion=on_completion)
         else:
-            execute('rsync', '--sparse', '--compress', src, dest)
+            execute('rsync', '--sparse', '--compress', src, dest,
+                    on_execute=on_execute, on_completion=on_completion)
 
 
 def write_to_file(path, contents, umask=None):