xemu/python/qemu/aqmp/legacy.py
John Snow b0b662bb2b python/aqmp: add socket bind step to legacy.py
The synchronous QMP library would bind to the server address during
__init__(). The new library delays this to the accept() call, because
binding occurs inside of the call to start_[unix_]server(), which is an
async method -- so it cannot happen during __init__ anymore.

Python 3.7+ adds the ability to create the server (and thus the bind()
call) and begin the active listening in separate steps, but we don't
have that functionality in 3.6, our current minimum.

Therefore ... Add a temporary workaround that allows the synchronous
version of the client to bind the socket in advance, guaranteeing that
there will be a UNIX socket in the filesystem ready for the QEMU client
to connect to without a race condition.

(Yes, it's a bit ugly. Fixing it more nicely will have to wait until our
minimum Python version is 3.7+.)

Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Kevin Wolf <kwolf@redhat.com>
Message-id: 20220201041134.1237016-5-jsnow@redhat.com
Signed-off-by: John Snow <jsnow@redhat.com>
2022-02-02 14:12:22 -05:00

180 lines
5 KiB
Python

"""
Sync QMP Wrapper
This class pretends to be qemu.qmp.QEMUMonitorProtocol.
"""
import asyncio
from typing import (
Any,
Awaitable,
Dict,
List,
Optional,
TypeVar,
Union,
)
import qemu.qmp
from .error import QMPError
from .protocol import Runstate, SocketAddrT
from .qmp_client import QMPClient
# (Temporarily) Re-export QMPBadPortError
QMPBadPortError = qemu.qmp.QMPBadPortError
#: QMPMessage is an entire QMP message of any kind.
QMPMessage = Dict[str, Any]
#: QMPReturnValue is the 'return' value of a command.
QMPReturnValue = object
#: QMPObject is any object in a QMP message.
QMPObject = Dict[str, object]
# QMPMessage can be outgoing commands or incoming events/returns.
# QMPReturnValue is usually a dict/json object, but due to QAPI's
# 'returns-whitelist', it can actually be anything.
#
# {'return': {}} is a QMPMessage,
# {} is the QMPReturnValue.
# pylint: disable=missing-docstring
class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
def __init__(self, address: SocketAddrT,
server: bool = False,
nickname: Optional[str] = None):
# pylint: disable=super-init-not-called
self._aqmp = QMPClient(nickname)
self._aloop = asyncio.get_event_loop()
self._address = address
self._timeout: Optional[float] = None
if server:
self._aqmp._bind_hack(address) # pylint: disable=protected-access
_T = TypeVar('_T')
def _sync(
self, future: Awaitable[_T], timeout: Optional[float] = None
) -> _T:
return self._aloop.run_until_complete(
asyncio.wait_for(future, timeout=timeout)
)
def _get_greeting(self) -> Optional[QMPMessage]:
if self._aqmp.greeting is not None:
# pylint: disable=protected-access
return self._aqmp.greeting._asdict()
return None
# __enter__ and __exit__ need no changes
# parse_address needs no changes
def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
self._aqmp.await_greeting = negotiate
self._aqmp.negotiate = negotiate
self._sync(
self._aqmp.connect(self._address)
)
return self._get_greeting()
def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
self._aqmp.await_greeting = True
self._aqmp.negotiate = True
self._sync(
self._aqmp.accept(self._address),
timeout
)
ret = self._get_greeting()
assert ret is not None
return ret
def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
return dict(
self._sync(
# pylint: disable=protected-access
# _raw() isn't a public API, because turning off
# automatic ID assignment is discouraged. For
# compatibility with iotests *only*, do it anyway.
self._aqmp._raw(qmp_cmd, assign_id=False),
self._timeout
)
)
# Default impl of cmd() delegates to cmd_obj
def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
return self._sync(
self._aqmp.execute(cmd, kwds),
self._timeout
)
def pull_event(self,
wait: Union[bool, float] = False) -> Optional[QMPMessage]:
if not wait:
# wait is False/0: "do not wait, do not except."
if self._aqmp.events.empty():
return None
# If wait is 'True', wait forever. If wait is False/0, the events
# queue must not be empty; but it still needs some real amount
# of time to complete.
timeout = None
if wait and isinstance(wait, float):
timeout = wait
return dict(
self._sync(
self._aqmp.events.get(),
timeout
)
)
def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
events = [dict(x) for x in self._aqmp.events.clear()]
if events:
return events
event = self.pull_event(wait)
return [event] if event is not None else []
def clear_events(self) -> None:
self._aqmp.events.clear()
def close(self) -> None:
self._sync(
self._aqmp.disconnect()
)
def settimeout(self, timeout: Optional[float]) -> None:
self._timeout = timeout
def send_fd_scm(self, fd: int) -> None:
self._aqmp.send_fd_scm(fd)
def __del__(self) -> None:
if self._aqmp.runstate == Runstate.IDLE:
return
if not self._aloop.is_running():
self.close()
else:
# Garbage collection ran while the event loop was running.
# Nothing we can do about it now, but if we don't raise our
# own error, the user will be treated to a lot of traceback
# they might not understand.
raise QMPError(
"QEMUMonitorProtocol.close()"
" was not called before object was garbage collected"
)