OpenStack源码学习笔记6

很久之前发现一个现象,在生产环境中配置了保留内存reserved_host_memory_mb以及 没配置 内存超分比ram_allocation_ratio的情况下,虚拟机使用的内存居然已经快将物理内存耗尽了。

比如物理机内存300G,方便举例忽略掉一些系统占用,当设置了reserved_host_memory_mb为20G,那么理论上所有虚拟机最大占用内存量为280G,而查看居然已经使用了290G的内存,还是在虚拟机并没有将各自申请的内存全部使用掉的情况下(比如申请一台4G的虚拟机,但物理机操作系统层面并没有分配4G给对应的进程,除非虚拟机内部把内存占满)。

单单就nova而言,如果在集群启动时就指定了保留内存大小和超分比为1的话,是不应该出现上述情形的。除非后期对这2个参数进行过修改,但由于年代久远已经没法追溯,这也就成了一桩悬案。

不过在追踪这个问题的过程中,顺便也学习了下主机热迁移的过程,这里的代码是 N版

根据之前文章写过的套路,这里直接定位到api/openstack/compute/migrate_server.py_migrate_live函数,这个函数很简单就不贴代码了,就是解析请求中的虚拟机ID、目标主机等参数,然后调用compute/api.pyAPI类的live_migrate函数:

@check_instance_lock
@check_instance_cell
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.PAUSED])
def live_migrate(self, context, instance, block_migration,
                    disk_over_commit, host_name, force=None, async=False):
    """Migrate a server lively to a new host."""

    instance.task_state = task_states.MIGRATING
    instance.save(expected_task_state=[None])

    self._record_action_start(context, instance,
                                instance_actions.LIVE_MIGRATION)
    try:
        request_spec = objects.RequestSpec.get_by_instance_uuid(
            context, instance.uuid)
    except exception.RequestSpecNotFound:
        request_spec = None
    ....
    try:
        self.compute_task_api.live_migrate_instance(context, instance,
            host_name, block_migration=block_migration,
            disk_over_commit=disk_over_commit,
            request_spec=request_spec, async=async)
    except oslo_exceptions.MessagingTimeout as messaging_timeout:
        with excutils.save_and_reraise_exception():
            compute_utils.add_instance_fault_from_exc(context,
                                                        instance,
                                                        messaging_timeout)

这个函数修改了主机状态为migrating,获取虚拟机参数(这里有个细节,request_spec这个东西记录在nova-api库里而非nova库),创建操作记录,由于我们调用的api版本还不支持force参数这部分代码就不贴了,具体可参考这里

然后调用live_migrate_instance函数位于conductor/api.py中:

def live_migrate_instance(self, context, instance, host_name,
                          block_migration, disk_over_commit,
                          request_spec=None, async=False):
    scheduler_hint = {'host': host_name}
    if async:
        self.conductor_compute_rpcapi.live_migrate_instance(
            context, instance, scheduler_hint, block_migration,
            disk_over_commit, request_spec)
    else:
        self.conductor_compute_rpcapi.migrate_server(
            context, instance, scheduler_hint, True, False, None,
            block_migration, disk_over_commit, None,
            request_spec=request_spec)

注意这里可能由于代码版本原因,这里就是直接固定为传入的主机名称的。然后进入rpc调用,位于conductor/rpcapi.py:

def migrate_server(self, context, instance, scheduler_hint, live, rebuild,
                  flavor, block_migration, disk_over_commit,
                  reservations=None, clean_shutdown=True, request_spec=None):
    kw = {'instance': instance, 'scheduler_hint': scheduler_hint,
            'live': live, 'rebuild': rebuild, 'flavor': flavor,
            'block_migration': block_migration,
            'disk_over_commit': disk_over_commit,
            'reservations': reservations,
            'clean_shutdown': clean_shutdown,
            'request_spec': request_spec,
            }
    ......
    cctxt = self.client.prepare(version=version)
    return cctxt.call(context, 'migrate_server', **kw)

这个函数进行了一堆版本兼容性判断,然后进入conductor/manager.pymigrate_server函数:

def migrate_server(self, context, instance, scheduler_hint, live, rebuild,
            flavor, block_migration, disk_over_commit, reservations=None,
            clean_shutdown=True, request_spec=None):
    ......
    if live and not rebuild and not flavor:
        self._live_migrate(context, instance, scheduler_hint,
                            block_migration, disk_over_commit, request_spec)
    ......

由于这里我们仅关注热迁移就直接跟进_live_migrate:

def _live_migrate(self, context, instance, scheduler_hint,
                      block_migration, disk_over_commit, request_spec):
    destination = scheduler_hint.get("host")
    migration = objects.Migration(context=context.elevated())
    ......
    migration.create()
    task = self._build_live_migrate_task(context, instance, destination,
                                        block_migration, disk_over_commit,
                                        migration, request_spec)
    try:
        task.execute()
    except:
        ......

def _build_live_migrate_task(self, context, instance, destination,
                            block_migration, disk_over_commit, migration,
                            request_spec=None):
    return live_migrate.LiveMigrationTask(context, instance,
                                            destination, block_migration,
                                            disk_over_commit, migration,
                                            self.compute_rpcapi,
                                            self.servicegroup_api,
                                            self.scheduler_client,
                                            request_spec)

这个函数就是创建了一条数据库迁移记录,然后继续跟进到conductor/tasks/live_migrate.py:

class LiveMigrationTask(base.TaskBase):
    def __init__(self, context, instance, destination,
                 block_migration, disk_over_commit, migration, compute_rpcapi,
                 servicegroup_api, scheduler_client, request_spec=None):
        super(LiveMigrationTask, self).__init__(context, instance)
        self.destination = destination
        self.block_migration = block_migration
        self.disk_over_commit = disk_over_commit
        self.migration = migration
        self.source = instance.host
        self.migrate_data = None

        self.compute_rpcapi = compute_rpcapi
        self.servicegroup_api = servicegroup_api
        self.scheduler_client = scheduler_client
        self.request_spec = request_spec

    def _execute(self):
        self._check_instance_is_active()
        self._check_host_is_up(self.source)

        if not self.destination:
            self.destination = self._find_destination()
            self.migration.dest_compute = self.destination
            self.migration.save()
        else:
            self._check_requested_destination()

        return self.compute_rpcapi.live_migration(self.context,
                host=self.source,
                instance=self.instance,
                dest=self.destination,
                block_migration=self.block_migration,
                migration=self.migration,
                migrate_data=self.migrate_data)

    def _check_requested_destination(self):
        self._check_destination_is_not_source()
        self._check_host_is_up(self.destination)
        self._check_destination_has_enough_memory()
        self._check_compatible_with_source_hypervisor(self.destination)
        self._call_livem_checks_on_host(self.destination)
    
    def _check_destination_has_enough_memory(self):
        compute = self._get_compute_info(self.destination)
        free_ram_mb = compute.free_ram_mb
        total_ram_mb = compute.memory_mb
        mem_inst = self.instance.memory_mb
        # NOTE(sbauza): Now the ComputeNode object reports an allocation ratio

        # that can be provided by the compute_node if new or by the controller

        ram_ratio = compute.ram_allocation_ratio

        # NOTE(sbauza): Mimic the RAMFilter logic in order to have the same

        # ram validation

        avail = total_ram_mb * ram_ratio - (total_ram_mb - free_ram_mb)
        if not mem_inst or avail <= mem_inst:
            instance_uuid = self.instance.uuid
            dest = self.destination
            reason = _("Unable to migrate %(instance_uuid)s to %(dest)s: "
                       "Lack of memory(host:%(avail)s <= "
                       "instance:%(mem_inst)s)")
            raise exception.MigrationPreCheckError(reason=reason % dict(
                    instance_uuid=instance_uuid, dest=dest, avail=avail,
                    mem_inst=mem_inst))

重点是_execute这个函数,首先进行状态检查,然后判断是否传入了目标计算节点,如果传入了在依次检查源节点和目标节点是否是同一台机器、目标节点是否运行、目标节点内存是否充足、虚拟化类型是否一致、是否准许热迁移;如果没传入则调用schedulerselect_destinations函数来筛选目标节点。

然后再调用compute/rpcapi.pyComputeAPI类的live_migrate函数,这个函数又调用了compute/manager.pyComputeManager类的live_migrate函数,这里设置状态为queued,然后开新线程处理请求:

def live_migration(self, context, dest, instance, block_migration,
                       migration, migrate_data):
    """Executing live migration.
    """
    self._set_migration_status(migration, 'queued')

    def dispatch_live_migration(*args, **kwargs):
        with self._live_migration_semaphore:
            self._do_live_migration(*args, **kwargs)

    # NOTE(danms): We spawn here to return the RPC worker thread back to

    # the pool. Since what follows could take a really long time, we don't

    # want to tie up RPC workers.

    utils.spawn_n(dispatch_live_migration,
                    context, dest, instance,
                    block_migration, migration,
                    migrate_data)


def _do_live_migration(self, context, dest, instance, block_migration,
                        migration, migrate_data):
    # NOTE(danms): We should enhance the RT to account for migrations

    # and use the status field to denote when the accounting has been

    # done on source/destination. For now, this is just here for status

    # reporting

    self._set_migration_status(migration, 'preparing')

    got_migrate_data_object = isinstance(migrate_data,
                                         migrate_data_obj.LiveMigrateData)
    if not got_migrate_data_object:
        migrate_data = \
            migrate_data_obj.LiveMigrateData.detect_implementation(
                migrate_data)

    try:
        ......
        migrate_data = self.compute_rpcapi.pre_live_migration(context, instance,
                                                              block_migration, disk, dest, migrate_data)

    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_LE('Pre live migration failed at %s'),
                            dest, instance=instance)
            self._set_migration_status(migration, 'error')
            self._rollback_live_migration(context, instance, dest,
                                            block_migration, migrate_data)

    self._set_migration_status(migration, 'running')

    if migrate_data:
        migrate_data.migration = migration
    try:
        self.driver.live_migration(context, instance, dest,
                                    self._post_live_migration,
                                    self._rollback_live_migration,
                                    block_migration, migrate_data)
    except Exception:
    ......

最终的pre_live_migration函数位于compute/manager.py,代码如下:

def pre_live_migration(self, context, instance, block_migration, disk,
                           migrate_data):
    """Preparations for live migration at dest host.
    """
    LOG.debug('pre_live_migration data is %s', migrate_data)
    # TODO(tdurakov): remove dict to object conversion once RPC API version

    # is bumped to 5.x

    got_migrate_data_object = isinstance(migrate_data,
                                            migrate_data_obj.LiveMigrateData)
    if not got_migrate_data_object:
        migrate_data = \
            migrate_data_obj.LiveMigrateData.detect_implementation(
                migrate_data)

    block_device_info = self._get_instance_block_device_info(
                        context, instance, refresh_conn_info=True)

    network_info = self.network_api.get_instance_nw_info(context, instance)
    self._notify_about_instance_usage(
                    context, instance, "live_migration.pre.start",
                    network_info=network_info)

    migrate_data = self.driver.pre_live_migration(context,
                                    instance,
                                    block_device_info,
                                    network_info,
                                    disk,
                                    migrate_data)
    LOG.debug('driver pre_live_migration data is %s' % migrate_data)

    # NOTE(tr3buchet): setup networks on destination host

    self.network_api.setup_networks_on_host(context, instance, self.host)

    self.driver.ensure_filtering_rules_for_instance(instance, network_info)

    self._notify_about_instance_usage(
                    context, instance, "live_migration.pre.end",
                    network_info=network_info)
    # TODO(tdurakov): remove dict to object conversion once RPC API version

    # is bumped to 5.x

    if not got_migrate_data_object and migrate_data:
        migrate_data = migrate_data.to_legacy_dict(
            pre_migration_result=True)
        migrate_data = migrate_data['pre_live_migration_result']
    LOG.debug('pre_live_migration result data is %s', migrate_data)
    return migrate_data

具体的位于virt/libvirt/driver.py的同名函数我这里就不贴了,就是进行磁盘、网络相关操作。最后依次调用virt/libvirt/driver.py_live_migration_live_migration_operation,进入virt/libvirt/guest.pyGuest类的migrate函数,最终交给了libvirt处理。

至此,nova的使命大部分完成,剩下的就是等待迁移状态回写数据库了。