OpenStack源码学习笔记2
上次学习了Nova创建虚拟机的过程,这次来看一下Glance是如何上传镜像的。相比于Nova,Glance源码使用了大量的代理模式和装饰器模式,阅读代码时候一个不仔细就会一脸懵X。根据上次说的Openstack套路,我们通过setup.cfg
直奔主题——glance/cmd/api.py
:
def main():
try:
config.parse_args()
config.set_config_defaults()
wsgi.set_eventlet_hub()
logging.setup(CONF, 'glance')
notifier.set_defaults()
if cfg.CONF.profiler.enabled:
_notifier = osprofiler.notifier.create("Messaging",
oslo_messaging, {},
notifier.get_transport(),
"glance", "api",
cfg.CONF.bind_host)
osprofiler.notifier.set(_notifier)
osprofiler.web.enable(cfg.CONF.profiler.hmac_keys)
else:
osprofiler.web.disable()
server = wsgi.Server(initialize_glance_store=True)
server.start(config.load_paste_app('glance-api'), default_port=9292)
server.wait()
except KNOWN_EXCEPTIONS as e:
print(e)
fail(e)
配置加载与路由绑定
和Nova一样,这个文件主要作用就是加载配置、创建WSGI Server并运行,这里我们注意一下initialize_glance_store=True
这里,新版中关于存储的部分已经独立出项目叫做glance_store,这里还对这部分进行了初始化,我们跟进glance/common/wsgi.py
中:
def initialize_glance_store():
"""Initialize glance store."""
glance_store.register_opts(CONF)
glance_store.create_stores(CONF)
glance_store.verify_default_store()
class Server(object):
......
def __init__(self, threads=1000, initialize_glance_store=False):
......
self.initialize_glance_store = initialize_glance_store
......
def start(self, application, default_port):
self.application = application
self.default_port = default_port
self.configure()
self.start_wsgi()
def configure(self, old_conf=None, has_changed=None):
eventlet.wsgi.MAX_HEADER_LINE = CONF.max_header_line
self.client_socket_timeout = CONF.client_socket_timeout or None
self.configure_socket(old_conf, has_changed)
if self.initialize_glance_store:
initialize_glance_store()
这里我们跟进glance_store/backend.py
中:
def create_stores(conf=CONF):
store_count = 0
for (store_entry, store_instance) in _load_stores(conf):
try:
schemes = store_instance.get_schemes()
store_instance.configure(re_raise_bsc=False)
except NotImplementedError:
continue
if not schemes:
raise exceptions.BackendException('Unable to register store %s. '
'No schemes associated with it.'
% store_entry)
else:
LOG.debug("Registering store %s with schemes %s",
store_entry, schemes)
scheme_map = {}
loc_cls = store_instance.get_store_location_class()
for scheme in schemes:
scheme_map[scheme] = {
'store': store_instance,
'location_class': loc_cls,
'store_entry': store_entry
}
location.register_scheme_map(scheme_map)
store_count += 1
return store_count
在这里会根据/etc/glance/glance_api.conf
中的配置信息找到对应的driver(位于glance_store/_drivers
目录)并配置,然后调用register_scheme_map()
进行绑定:
SCHEME_TO_CLS_MAP = {}
def register_scheme_map(scheme_map):
for (k, v) in scheme_map.items():
LOG.debug("Registering scheme %s with %s", k, v)
SCHEME_TO_CLS_MAP[k] = v
这样就完成了所需的准备工作。
镜像上传
glance镜像上传分为2个步骤,首先在数据库中创建一条记录,并返回相关信息,此时使用glance image-list
命令可以查看到一个空镜像,状态为queued
。然后再上传镜像数据,上传完成后进入active
状态,代码均来源于rocky版。
create
根据文档创建镜像是向/v2/images
发送POST请求,然后再结合glance-api-paste.ini
中的定义:
[pipeline:glance-api]
pipeline = cors healthcheck http_proxy_to_wsgi versionnegotiation osprofiler unauthenticated-context rootapp
[composite:rootapp]
paste.composite_factory = glance.api:root_app_factory
/: apiversions
/v1: apiv1app
/v2: apiv2app
[app:apiversions]
paste.app_factory = glance.api.versions:create_resource
[app:apiv1app]
paste.app_factory = glance.api.v1.router:API.factory
[app:apiv2app]
paste.app_factory = glance.api.v2.router:API.factory
根据glance/api/v2/router.py
中的API()
定义,找到实际处理post请求的函数为glance/api/v2/images.py
中的ImagesController
类的create
方法:
def create(self, req, image, extra_properties, tags):
image_factory = self.gateway.get_image_factory(req.context)
image_repo = self.gateway.get_repo(req.context)
try:
image = image_factory.new_image(extra_properties=extra_properties,
tags=tags, **image)
image_repo.add(image)
except (exception.DuplicateLocation,
exception.Invalid) as e:
raise webob.exc.HTTPBadRequest(explanation=e.msg)
except (exception.ReservedProperty,
exception.ReadonlyProperty) as e:
raise webob.exc.HTTPForbidden(explanation=e.msg)
except exception.Forbidden as e:
LOG.debug("User not permitted to create image")
raise webob.exc.HTTPForbidden(explanation=e.msg)
except exception.LimitExceeded as e:
LOG.warn(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPRequestEntityTooLarge(
explanation=e.msg, request=req, content_type='text/plain')
except exception.Duplicate as e:
raise webob.exc.HTTPConflict(explanation=e.msg)
except exception.NotAuthenticated as e:
raise webob.exc.HTTPUnauthorized(explanation=e.msg)
except TypeError as e:
LOG.debug(encodeutils.exception_to_unicode(e))
raise webob.exc.HTTPBadRequest(explanation=e)
return image
这段代码看上简单,实际内含玄机,如果跟进gateway.get_image_factory
和gateway.get_repo
函数,会发现作者用了大量的装饰器模式和代理模式:
# glance/gateway.py
def get_image_factory(self, context):
image_factory = glance.domain.ImageFactory()
store_image_factory = glance.location.ImageFactoryProxy(
image_factory, context, self.store_api, self.store_utils)
quota_image_factory = glance.quota.ImageFactoryProxy(
store_image_factory, context, self.db_api, self.store_utils)
policy_image_factory = policy.ImageFactoryProxy(
quota_image_factory, context, self.policy)
notifier_image_factory = glance.notifier.ImageFactoryProxy(
policy_image_factory, context, self.notifier)
if property_utils.is_property_protection_enabled():
property_rules = property_utils.PropertyRules(self.policy)
pif = property_protections.ProtectedImageFactoryProxy(
notifier_image_factory, context, property_rules)
authorized_image_factory = authorization.ImageFactoryProxy(
pif, context)
else:
authorized_image_factory = authorization.ImageFactoryProxy(
notifier_image_factory, context)
return authorized_image_factory
def get_repo(self, context):
image_repo = glance.db.ImageRepo(context, self.db_api)
store_image_repo = glance.location.ImageRepoProxy(
image_repo, context, self.store_api, self.store_utils)
quota_image_repo = glance.quota.ImageRepoProxy(
store_image_repo, context, self.db_api, self.store_utils)
policy_image_repo = policy.ImageRepoProxy(
quota_image_repo, context, self.policy)
notifier_image_repo = glance.notifier.ImageRepoProxy(
policy_image_repo, context, self.notifier)
if property_utils.is_property_protection_enabled():
property_rules = property_utils.PropertyRules(self.policy)
pir = property_protections.ProtectedImageRepoProxy(
notifier_image_repo, context, property_rules)
authorized_image_repo = authorization.ImageRepoProxy(
pir, context)
else:
authorized_image_repo = authorization.ImageRepoProxy(
notifier_image_repo, context)
return authorized_image_repo
阅读这里的时候一个不仔细逻辑就断了,要像剥洋葱一样,一层一层剥开它的心。这里我就不记录追踪细节了,经过一层层判断后,image_factory.new_image
函数最终进入glance/domain/__init__.py
中并返回了一个Image
类型实例。
然后将这个实例传递给image_repo.add
函数,这个函数再经过一层层判断,进入glance/db/__init__.py
中调用ImageRepo
的add()
方法,在这个方法中最终调用了glance/db/sqlalchemy/api.py
中的image_create()
函数来在数据库中创建新记录。
如果没有发生错误,则返回创建的空镜像信息给客户端。
upload
文档中定义,上传数据行为是向/v2/images/{image_id}/file
发送PUT请求,实际处理函数为glance/api/v2/image_data.py
中的ImageDataController
类的upload
方法:
......
@utils.mutating
def upload(self, req, image_id, data, size):
backend = None
image_repo = self.gateway.get_repo(req.context)
image = None
refresher = None
cxt = req.context
try:
self.policy.enforce(cxt, 'upload_image', {})
image = image_repo.get(image_id)
image.status = 'saving'
try:
image_repo.save(image, from_state='queued')
image.set_data(data, size, backend=backend)
try:
image_repo.save(image, from_state='saving')
except exception.NotAuthenticated:
if refresher is not None:
# request a new token to update an image in database
cxt.auth_token = refresher.refresh_token()
image_repo = self.gateway.get_repo(req.context)
image_repo.save(image, from_state='saving')
else:
raise
......
新版本中glance已经可以支持多后端的配置,但还不是稳定版。这里以单后端为例,首先将状态改为saving,然后调用set_data
函数。由于某些不可描述的原因,我没法启动程序进行单步调试,只能使用IDE的跳转功能,结果这里兜兜转转饶了很久。这里我就直接给出答案吧,最终调用的是glance/location.py
的ImageProxy
类中的方法。这里面的关键点在于image其实是由ImageProxy
实例代理的,转换发生在初始化对象时候创建的Helper
类,这个类有一个proxy
方法用来将原始Image
类型转换成ImageProxy
类型。这里具体就不再详细说明了,回到set_data
函数定义:
def set_data(self, data, size=None, backend=None):
......
hashing_algo = CONF['hashing_algorithm']
if CONF.enabled_backends:
(location, size, checksum,
multihash, loc_meta) = self.store_api.add_with_multihash(
CONF,
self.image.image_id,
utils.LimitingReader(utils.CooperativeReader(data),
CONF.image_size_cap),
size,
backend,
hashing_algo,
context=self.context,
verifier=verifier)
else:
(location, size, checksum,
multihash, loc_meta) = self.store_api.add_to_backend_with_multihash(
CONF,
self.image.image_id,
utils.LimitingReader(utils.CooperativeReader(data),
CONF.image_size_cap),
size,
hashing_algo,
context=self.context,
verifier=verifier)
self.image.locations = [{'url': location, 'metadata': loc_meta, 'status': 'active'}]
self.image.size = size
self.image.checksum = checksum
self.image.os_hash_value = multihash
self.image.os_hash_algo = hashing_algo
self.image.status = 'active'
这里的store_api
默认就是glance_store
了,其中add_with_multihash
是启用多后端时候调用,add_to_backend_with_multihash
启用单后端时候调用。这里以单后端为例:
def add_to_backend_with_multihash(conf, image_id, data, size, hashing_algo,
scheme=None, context=None, verifier=None):
if scheme is None:
scheme = conf['glance_store']['default_store']
store = get_store_from_scheme(scheme)
return store_add_to_backend_with_multihash(
image_id, data, size, hashing_algo, store, context, verifier)
其中get_store_from_scheme
函数作用是获取到文章开头所说的绑定到SCHEME_TO_CLS_MAP
中的对应的driver
,然后经过store_add_to_backend_with_multihash
函数进入相应的driver
的add
方法,这里以Ceph的块存储RBD(RADOS Block Device)为例,函数位于glance_store/_drivers/rbd.py
:
@driver.back_compat_add
@capabilities.check
def add(self, image_id, image_file, image_size, hashing_algo, context=None,
verifier=None):
checksum = hashlib.md5()
os_hash_value = hashlib.new(str(hashing_algo))
image_name = str(image_id)
with self.get_connection(conffile=self.conf_file,
rados_id=self.user) as conn:
fsid = None
if hasattr(conn, 'get_fsid'):
fsid = encodeutils.safe_decode(conn.get_fsid())
with conn.open_ioctx(self.pool) as ioctx:
order = int(math.log(self.WRITE_CHUNKSIZE, 2))
LOG.debug('creating image %s with order %d and size %d',
image_name, order, image_size)
if image_size == 0:
LOG.warning(_("since image size is zero we will be doing "
"resize-before-write for each chunk which "
"will be considerably slower than normal"))
try:
loc = self._create_image(fsid, conn, ioctx, image_name,
image_size, order)
except rbd.ImageExists:
msg = _('RBD image %s already exists') % image_id
raise exceptions.Duplicate(message=msg)
try:
with rbd.Image(ioctx, image_name) as image:
bytes_written = 0
offset = 0
chunks = utils.chunkreadable(image_file,
self.WRITE_CHUNKSIZE)
for chunk in chunks:
# If the image size provided is zero we need to do
# a resize for the amount we are writing. This will
# be slower so setting a higher chunk size may
# speed things up a bit.
if image_size == 0:
chunk_length = len(chunk)
length = offset + chunk_length
bytes_written += chunk_length
LOG.debug(_("resizing image to %s KiB") %
(length / units.Ki))
image.resize(length)
LOG.debug(_("writing chunk at offset %s") %
(offset))
offset += image.write(chunk, offset)
os_hash_value.update(chunk)
checksum.update(chunk)
if verifier:
verifier.update(chunk)
if loc.snapshot:
image.create_snap(loc.snapshot)
image.protect_snap(loc.snapshot)
......
if image_size == 0:
image_size = bytes_written
metadata = {}
if self.backend_group:
metadata['store'] = u"%s" % self.backend_group
return (loc.get_uri(),
image_size,
checksum.hexdigest(),
os_hash_value.hexdigest(),
metadata)
这个函数首先计算hash值,然后创建连接,再判断镜像是否存在,不存在则上传,然后将数据返回给调用者。最后glance中修改状态为active,整个镜像上传过程就结束了。