oVirt (Open Virtual) 之 VDSM 学习笔记 (二) 流程追踪

概述:

追踪 VDSM 启动后的代码大概流程,是继 vdsm/vdsm 文件学习后的继续。

Let's go!

从 “def serve_clients(log)” 开始

 1 def serve_clients(log):
 2     cif = None
 3     irs = None
 4     scheduler = None
 5     running = [True]
 6 
 7     def sigtermHandler(signum, frame):
 8         log.debug("Received signal %s" % signum)
 9         running[0] = False
10 
11     def sigusr1Handler(signum, frame):
12         if irs:
13             log.debug("Received signal %s" % signum)
14             irs.spmStop(
15                 irs.getConnectedStoragePoolsList()['poollist'][0])
16 
17     sigutils.register()
18     signal.signal(signal.SIGTERM, sigtermHandler)
19     signal.signal(signal.SIGUSR1, sigusr1Handler)
20     zombiereaper.registerSignalHandler()
21 
22     profile.start()
23 
24     libvirtconnection.start_event_loop()
25 
26     try:
27         if config.getboolean('irs', 'irs_enable'):
28             try:
29                 irs = Dispatcher(HSM())
30             except:
31                 utils.panic("Error initializing IRS")
32 
33         from clientIF import clientIF  # must import after config is read
34         cif = clientIF.getInstance(irs, log)
35 
36         install_manhole({'irs': irs, 'cif': cif})
37 
38         scheduler = schedule.Scheduler(name="vdsm.Scheduler",
39                                        clock=utils.monotonic_time)
40         scheduler.start()
41         cif.start()
42         periodic.start(cif, scheduler)
43         try:
44             while running[0]:
45                 sigutils.wait_for_signal()
46 
47             profile.stop()
48         finally:
49             periodic.stop()
50             cif.prepareForShutdown()
51             scheduler.stop()
52     finally:
53         libvirtconnection.stop_event_loop(wait=False)

代码:

22     profile.start()

是调用 vdsm/lib/vdsm/profiling/profile.py 的 start() 方法,如下:

1 def start():
2     cpu.start()
3     memory.start()

再次调用同级目录的 cpu.py 和 memory.py 的 start() 方法,如下:

 1 def start():
 2     """ Starts application wide CPU profiling """
 3     if is_enabled():
 4         _start_profiling(_CLOCK, _BUILTINS, _THREADS)
 5 
 6 
 7 
 8 def start():
 9     """ Starts application memory profiling """
10     if is_enabled():
11         _start_profiling()

其本质是两个线程,分别监测 CPU 和内存。

代码:

24     libvirtconnection.start_event_loop()

调用 libvirtconnection.start_event_loop() 注册与 libvirt 的连接事件处理。

其代码在 vdsm/lib/vdsm/libvirtconnection.py

 1 def start_event_loop():
 2     __event_loop.start()
 3 
 4 
 5 __event_loop = _EventLoop()
 6 
 7 
 8 class _EventLoop:
 9     def __init__(self):
10         self.run = False
11         self.__thread = None
12 
13     def start(self):
14         assert not self.run
15         self.__thread = threading.Thread(target=self.__run,
16                                          name="libvirtEventLoop")
17         self.__thread.setDaemon(True)
18         self.run = True
19         self.__thread.start()
20 
21     @utils.traceback(on=log.name)
22     def __run(self):
23         try:
24             libvirt.virEventRegisterDefaultImpl()
25             while self.run:
26                 libvirt.virEventRunDefaultImpl()
27         finally:
28             self.run = False

方法 start_event_loop() 通过 _EventLoop 对象的实例调用其 start() 方法,

从而调用到 __run 方法,在 __run() 方法中调用 libvirt 的默认实现方法:

virEventRegisterDefaultImpl()。

其实质也是一个线程,用于处理与 libvirt 的数据交互。

备注:

libvirt 项目代码不包含在 VDSM 项目代码里,如需查看其实现,请下载 libvirt 代码查看。

下载地址:http://libvirt.org/downloads.html。

代码:

29                 irs = Dispatcher(HSM())

涉及两个类的初始化。

HSM 类在 vdsm/vdsm/storage/hsm.py 中,查看其 __init__ 方法,代码如下:

 1     def __init__(self):
 2         """
 3         The HSM Constructor
 4 
 5         :param defExcFunc: The function that will set the default exception
 6                            for this thread
 7         :type defExcFun: function
 8         """
 9         self._ready = False
10         rm.ResourceManager.getInstance().registerNamespace(
11             STORAGE, rm.SimpleResourceFactory())
12         self.storage_repository = config.get('irs', 'repository')
13         self.taskMng = taskManager.TaskManager()
14 
15         mountBasePath = os.path.join(self.storage_repository,
16                                      sd.DOMAIN_MNT_POINT)
17         fileUtils.createdir(mountBasePath)
18         storageServer.MountConnection.setLocalPathBase(mountBasePath)
19         storageServer.LocalDirectoryConnection.setLocalPathBase(mountBasePath)
20         self._connectionAliasRegistrar = 
21             storageServer.ConnectionAliasRegistrar(STORAGE_CONNECTION_DIR)
22         self._connectionMonitor = 
23             storageServer.ConnectionMonitor(self._connectionAliasRegistrar)
24         self._connectionMonitor.startMonitoring()
25 
26         sp.StoragePool.cleanupMasterMount()
27         self.__releaseLocks()
28 
29         self._preparedVolumes = defaultdict(list)
30 
31         self.__validateLvmLockingType()
32 
33         oop.setDefaultImpl(config.get('irs', 'oop_impl'))
34 
35         # cleanStorageRepoitory uses tasksDir value, this must be assigned
36         # before calling it
37         self.tasksDir = config.get('irs', 'hsm_tasks')
38 
39         # This part should be in same thread to prevent race on mounted path,
40         # otherwise, storageRefresh can unlink path that is used by another
41         # thread that was initiated in the same time and tried to use the
42         # same link.
43         try:
44             # This call won't get stuck if mount is inaccessible thanks to
45             # misc.walk, this sync call won't delay hsm initialization.
46             self.__cleanStorageRepository()
47         except Exception:
48             self.log.warn("Failed to clean Storage Repository.", exc_info=True)
49 
50         @utils.traceback(on=self.log.name)
51         def storageRefresh():
52             sdCache.refreshStorage()
53             lvm.bootstrap(refreshlvs=blockSD.SPECIAL_LVS)
54             self._ready = True
55             self.log.debug("HSM is ready")
56 
57         storageRefreshThread = threading.Thread(target=storageRefresh,
58                                                 name="storageRefresh")
59         storageRefreshThread.daemon = True
60         storageRefreshThread.start()
61 
62         monitorInterval = config.getint('irs', 'sd_health_check_delay')
63         self.domainMonitor = monitor.DomainMonitor(monitorInterval)

在初始化函数要看这么多代码,刚开始,我是拒绝的,你不能说代码走到这里,我就

要马上看啊,后来,我试着看了一下,duang ...

代码:

10         rm.ResourceManager.getInstance().registerNamespace(
11             STORAGE, rm.SimpleResourceFactory())

这里的 rm 是模块 resourceManager 的别名 (import <模块> as <别名>),

该模块文件在 vdsm.vdsm.storage.resourceManager (. 替换为 / 即可找到位置,大家都懂的)

调用其模块内的类 ResourceManager.getInstance() 方法,用于获取

ResourceManager 的实例。那代码就转换为调用 ResourceManager 实例的注册名称空间

registerNamespace() 方法。

补充:

这个方法一般用在某一个类只能有一个实例的场景,使用 getInstance 方法来保证单实例,

C++、Java 也有类似的使用。

代码:

12         self.storage_repository = config.get('irs', 'repository')
13         self.taskMng = taskManager.TaskManager()

从配置文件里读取存储路径

创建任务管理器,TaskManager 会创建线程池以及有关线程池操作、维护需要的锁等资源

代码:

15         mountBasePath = os.path.join(self.storage_repository,
16                                      sd.DOMAIN_MNT_POINT)
17         fileUtils.createdir(mountBasePath)
18         storageServer.MountConnection.setLocalPathBase(mountBasePath)
19         storageServer.LocalDirectoryConnection.setLocalPathBase(mountBasePath)
20         self._connectionAliasRegistrar = 
21             storageServer.ConnectionAliasRegistrar(STORAGE_CONNECTION_DIR)
22         self._connectionMonitor = 
23             storageServer.ConnectionMonitor(self._connectionAliasRegistrar)
24         self._connectionMonitor.startMonitoring()

根据从配置中获取的 [irs] 的 'repository' 的值来复制挂载的根路径

创建该目录

设置挂载目录连接的主目录

设置本地目录连接的主目录

连接的别名注册(打开 connnection 目录下的 '*.con' 文件,并保存文件信息, 使用了 pickle 模块 )

根据别名对象,生成连接管理对象

创建线程,定时检查连接、并根据连接情况进行处理

代码:

26         sp.StoragePool.cleanupMasterMount()
27         self.__releaseLocks()

清除目录 mnt/blockSD/ 目录下的挂载

释放所有的锁(在初始化 VDSM 需要释放所有的锁,例如:之前有 VDSM 在运行)

代码:

29         self._preparedVolumes = defaultdict(list)
30 
31         self.__validateLvmLockingType()
32 
33         oop.setDefaultImpl(config.get('irs', 'oop_impl'))

创建一个 defaultdict 类型(参见: Python defaultdict

检查 lvm 的 LockingType(执行 lvm 命令检查 lvm 的配置项:global/locking_type)

根据配置:irs/oop_impl 设置 oop 的默认实现

代码:

35         # cleanStorageRepoitory uses tasksDir value, this must be assigned
36         # before calling it
37         self.tasksDir = config.get('irs', 'hsm_tasks')

 设置 tasks 目录

代码:

39         # This part should be in same thread to prevent race on mounted path,
40         # otherwise, storageRefresh can unlink path that is used by another
41         # thread that was initiated in the same time and tried to use the
42         # same link.
43         try:
44             # This call won't get stuck if mount is inaccessible thanks to
45             # misc.walk, this sync call won't delay hsm initialization.
46             self.__cleanStorageRepository()
47         except Exception:
48             self.log.warn("Failed to clean Storage Repository.", exc_info=True)

调用清除存储目录

代码:

50         @utils.traceback(on=self.log.name)
51         def storageRefresh():
52             sdCache.refreshStorage()
53             lvm.bootstrap(refreshlvs=blockSD.SPECIAL_LVS)
54             self._ready = True
55             self.log.debug("HSM is ready")
56 
57         storageRefreshThread = threading.Thread(target=storageRefresh,
58                                                 name="storageRefresh")
59         storageRefreshThread.daemon = True
60         storageRefreshThread.start()

创建线程,线程执行的函数为 storageRefresh(),而该函数调用 sdCache 模块的 refreshStorage

来进行实际的刷新存储相关的,

代码:

62         monitorInterval = config.getint('irs', 'sd_health_check_delay')
63         self.domainMonitor = monitor.DomainMonitor(monitorInterval)

存储域管理

[欲知后事如何,请听下回分解...]

基于:

 1. 文章代码流程分析篇幅较长,太长会冗长、乏味;

 2. 没有一个实际的 oVirt 系统运行,像纸上谈兵、凭空捏造;

 3. 后续流程涉及部分相关的业务。

 VDSM 流程追踪将在后续文章中继续进行分析,而最近的将是 oVirt 系统搭建的简介。