From b5020a047fc487f35b76fc05f31e52665a1afda1 Mon Sep 17 00:00:00 2001 From: abhishekkekane Date: Mon, 6 Jul 2015 01:51:26 -0700 Subject: [PATCH] libvirt: Kill rsync/scp processes before deleting instance In the resize operation, during copying files from source to destination compute node scp/rsync processes are not aborted after the instance is deleted because linux kernel doesn't delete instance files physically until all processes using the file handle is closed completely. Hence rsync/scp process keeps on running until it transfers 100% of file data. Added new module instancejobtracker to libvirt driver which will add, remove or terminate the processes running against particular instances. Added callback methods to execute call which will store the pid of scp/rsync process in cache as a key: value pair and to remove the pid from the cache after process completion. Process id will be used to kill the process if it is running while deleting the instance. Instance uuid is used as a key in the cache and pid will be the value. Conflicts: nova/virt/libvirt/driver.py SecurityImpact Closes-bug: #1387543 Change-Id: Ie03acc00a7c904aec13c90ae6a53938d08e5e0c9 (cherry picked from commit 7ab75d5b0b75fc3426323bef19bf436a258b9707) --- nova/tests/unit/virt/libvirt/test_driver.py | 38 +++++++++++ nova/tests/unit/virt/libvirt/test_utils.py | 9 ++- nova/virt/libvirt/driver.py | 18 +++++- nova/virt/libvirt/instancejobtracker.py | 98 +++++++++++++++++++++++++++++ nova/virt/libvirt/utils.py | 14 +++-- 5 files changed, 168 insertions(+), 9 deletions(-) create mode 100644 nova/virt/libvirt/instancejobtracker.py diff --git a/nova/tests/unit/virt/libvirt/test_driver.py b/nova/tests/unit/virt/libvirt/test_driver.py index 859df95..5ff978a 100644 --- a/nova/tests/unit/virt/libvirt/test_driver.py +++ b/nova/tests/unit/virt/libvirt/test_driver.py @@ -23,6 +23,7 @@ import random import re import shutil +import signal import threading import time import uuid @@ -9817,6 +9818,15 @@ def test_shared_storage_detection_easy(self): self.mox.ReplayAll() self.assertTrue(drvr._is_storage_shared_with('foo', '/path')) + def test_store_pid_remove_pid(self): + instance = objects.Instance(**self.test_instance) + drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) + popen = mock.Mock(pid=3) + drvr.job_tracker.add_job(instance, popen.pid) + self.assertIn(3, drvr.job_tracker.jobs[instance.uuid]) + drvr.job_tracker.remove_job(instance, popen.pid) + self.assertNotIn(instance.uuid, drvr.job_tracker.jobs) + @mock.patch('nova.virt.libvirt.host.Host.get_domain') def test_get_domain_info_with_more_return(self, mock_get_domain): instance = objects.Instance(**self.test_instance) @@ -11316,12 +11326,18 @@ def fake_get_host_ip_addr(): def fake_execute(*args, **kwargs): pass + def fake_copy_image(src, dest, host=None, receive=False, + on_execute=None, on_completion=None): + self.assertIsNotNone(on_execute) + self.assertIsNotNone(on_completion) + self.stubs.Set(self.drvr, 'get_instance_disk_info', fake_get_instance_disk_info) self.stubs.Set(self.drvr, '_destroy', fake_destroy) self.stubs.Set(self.drvr, 'get_host_ip_addr', fake_get_host_ip_addr) self.stubs.Set(utils, 'execute', fake_execute) + self.stubs.Set(libvirt_utils, 'copy_image', fake_copy_image) ins_ref = self._create_instance(params=params_for_instance) @@ -12428,6 +12444,28 @@ def test_delete_instance_files(self, get_instance_path, exists, exe, @mock.patch('shutil.rmtree') @mock.patch('nova.utils.execute') @mock.patch('os.path.exists') + @mock.patch('os.kill') + @mock.patch('nova.virt.libvirt.utils.get_instance_path') + def test_delete_instance_files_kill_running( + self, get_instance_path, kill, exists, exe, shutil): + get_instance_path.return_value = '/path' + instance = objects.Instance(uuid='fake-uuid', id=1) + self.drvr.job_tracker.jobs[instance.uuid] = [3, 4] + + exists.side_effect = [False, False, True, False] + + result = self.drvr.delete_instance_files(instance) + get_instance_path.assert_called_with(instance) + exe.assert_called_with('mv', '/path', '/path_del') + kill.assert_has_calls([mock.call(3, signal.SIGKILL), mock.call(3, 0), + mock.call(4, signal.SIGKILL), mock.call(4, 0)]) + shutil.assert_called_with('/path_del') + self.assertTrue(result) + self.assertNotIn(instance.uuid, self.drvr.job_tracker.jobs) + + @mock.patch('shutil.rmtree') + @mock.patch('nova.utils.execute') + @mock.patch('os.path.exists') @mock.patch('nova.virt.libvirt.utils.get_instance_path') def test_delete_instance_files_resize(self, get_instance_path, exists, exe, shutil): diff --git a/nova/tests/unit/virt/libvirt/test_utils.py b/nova/tests/unit/virt/libvirt/test_utils.py index 7fa0326..14bf822 100644 --- a/nova/tests/unit/virt/libvirt/test_utils.py +++ b/nova/tests/unit/virt/libvirt/test_utils.py @@ -62,7 +62,8 @@ def test_copy_image_local_cp(self, mock_execute): mock_execute.assert_called_once_with('cp', 'src', 'dest') _rsync_call = functools.partial(mock.call, - 'rsync', '--sparse', '--compress') + 'rsync', '--sparse', '--compress', + on_execute=None, on_completion=None) @mock.patch('nova.utils.execute') def test_copy_image_rsync(self, mock_execute): @@ -85,7 +86,8 @@ def test_copy_image_scp(self, mock_execute): mock_execute.assert_has_calls([ self._rsync_call('--dry-run', 'src', 'host:dest'), - mock.call('scp', 'src', 'host:dest'), + mock.call('scp', 'src', 'host:dest', + on_execute=None, on_completion=None), ]) self.assertEqual(2, mock_execute.call_count) @@ -110,7 +112,8 @@ def test_copy_image_scp_ipv6(self, mock_execute): mock_execute.assert_has_calls([ self._rsync_call('--dry-run', 'src', '[2600::]:dest'), - mock.call('scp', 'src', '[2600::]:dest'), + mock.call('scp', 'src', '[2600::]:dest', + on_execute=None, on_completion=None), ]) self.assertEqual(2, mock_execute.call_count) diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 40ee080..0a94d5a 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -95,6 +95,7 @@ from nova.virt.libvirt import host from nova.virt.libvirt import imagebackend from nova.virt.libvirt import imagecache +from nova.virt.libvirt import instancejobtracker from nova.virt.libvirt import lvm from nova.virt.libvirt import rbd_utils from nova.virt.libvirt import utils as libvirt_utils @@ -465,6 +466,8 @@ def __init__(self, virtapi, read_only=False): 'expect': ', '.join("'%s'" % k for k in sysinfo_serial_funcs.keys())}) + self.job_tracker = instancejobtracker.InstanceJobTracker() + def _get_volume_drivers(self): return libvirt_volume_drivers @@ -6301,6 +6304,11 @@ def migrate_disk_and_power_off(self, context, instance, dest, # finish_migration/_create_image to re-create it for us. continue + on_execute = lambda process: self.job_tracker.add_job( + instance, process.pid) + on_completion = lambda process: self.job_tracker.remove_job( + instance, process.pid) + if info['type'] == 'qcow2' and info['backing_file']: tmp_path = from_path + "_rbase" # merge backing file @@ -6310,11 +6318,15 @@ def migrate_disk_and_power_off(self, context, instance, dest, if shared_storage: utils.execute('mv', tmp_path, img_path) else: - libvirt_utils.copy_image(tmp_path, img_path, host=dest) + libvirt_utils.copy_image(tmp_path, img_path, host=dest, + on_execute=on_execute, + on_completion=on_completion) utils.execute('rm', '-f', tmp_path) else: # raw or qcow2 with no backing file - libvirt_utils.copy_image(from_path, img_path, host=dest) + libvirt_utils.copy_image(from_path, img_path, host=dest, + on_execute=on_execute, + on_completion=on_completion) except Exception: with excutils.save_and_reraise_exception(): self._cleanup_remote_migration(dest, inst_base, @@ -6683,6 +6695,8 @@ def delete_instance_files(self, instance): # invocation failed due to the absence of both target and # target_resize. if not remaining_path and os.path.exists(target_del): + self.job_tracker.terminate_jobs(instance) + LOG.info(_LI('Deleting instance files %s'), target_del, instance=instance) remaining_path = target_del diff --git a/nova/virt/libvirt/instancejobtracker.py b/nova/virt/libvirt/instancejobtracker.py new file mode 100644 index 0000000..d47fb45 --- /dev/null +++ b/nova/virt/libvirt/instancejobtracker.py @@ -0,0 +1,98 @@ +# Copyright 2015 NTT corp. +# All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import collections +import errno +import os +import signal + +from oslo_log import log as logging + +from nova.i18n import _LE +from nova.i18n import _LW + + +LOG = logging.getLogger(__name__) + + +class InstanceJobTracker(object): + def __init__(self): + self.jobs = collections.defaultdict(list) + + def add_job(self, instance, pid): + """Appends process_id of instance to cache. + + This method will store the pid of a process in cache as + a key: value pair which will be used to kill the process if it + is running while deleting the instance. Instance uuid is used as + a key in the cache and pid will be the value. + + :param instance: Object of instance + :param pid: Id of the process + """ + self.jobs[instance.uuid].append(pid) + + def remove_job(self, instance, pid): + """Removes pid of process from cache. + + This method will remove the pid of a process from the cache. + + :param instance: Object of instance + :param pid: Id of the process + """ + uuid = instance.uuid + if uuid in self.jobs and pid in self.jobs[uuid]: + self.jobs[uuid].remove(pid) + + # remove instance.uuid if no pid's remaining + if not self.jobs[uuid]: + self.jobs.pop(uuid, None) + + def terminate_jobs(self, instance): + """Kills the running processes for given instance. + + This method is used to kill all running processes of the instance if + it is deleted in between. + + :param instance: Object of instance + """ + pids_to_remove = list(self.jobs.get(instance.uuid, [])) + for pid in pids_to_remove: + try: + # Try to kill the process + os.kill(pid, signal.SIGKILL) + except OSError as exc: + if exc.errno != errno.ESRCH: + LOG.error(_LE('Failed to kill process %(pid)s ' + 'due to %(reason)s, while deleting the ' + 'instance.'), {'pid': pid, 'reason': exc}, + instance=instance) + + try: + # Check if the process is still alive. + os.kill(pid, 0) + except OSError as exc: + if exc.errno != errno.ESRCH: + LOG.error(_LE('Unexpected error while checking process ' + '%(pid)s.'), {'pid': pid}, + instance=instance) + else: + # The process is still around + LOG.warn(_LW("Failed to kill a long running process " + "%(pid)s related to the instance when " + "deleting it."), {'pid': pid}, + instance=instance) + + self.remove_job(instance, pid) diff --git a/nova/virt/libvirt/utils.py b/nova/virt/libvirt/utils.py index 7b80464..83d5ba3 100644 --- a/nova/virt/libvirt/utils.py +++ b/nova/virt/libvirt/utils.py @@ -294,13 +294,16 @@ def get_disk_backing_file(path, basename=True): return backing_file -def copy_image(src, dest, host=None, receive=False): +def copy_image(src, dest, host=None, receive=False, + on_execute=None, on_completion=None): """Copy a disk image to an existing directory :param src: Source image :param dest: Destination path :param host: Remote host :param receive: Reverse the rsync direction + :param on_execute: Callback method to store pid of process in cache + :param on_completion: Callback method to remove pid of process from cache """ if not host: @@ -322,11 +325,14 @@ def copy_image(src, dest, host=None, receive=False): # Do a relatively light weight test first, so that we # can fall back to scp, without having run out of space # on the destination for example. - execute('rsync', '--sparse', '--compress', '--dry-run', src, dest) + execute('rsync', '--sparse', '--compress', '--dry-run', src, dest, + on_execute=on_execute, on_completion=on_completion) except processutils.ProcessExecutionError: - execute('scp', src, dest) + execute('scp', src, dest, on_execute=on_execute, + on_completion=on_completion) else: - execute('rsync', '--sparse', '--compress', src, dest) + execute('rsync', '--sparse', '--compress', src, dest, + on_execute=on_execute, on_completion=on_completion) def write_to_file(path, contents, umask=None):