summaryrefslogtreecommitdiffstats
path: root/src/cephadm/cephadmlib/call_wrappers.py
diff options
context:
space:
mode:
authorJohn Mulligan <jmulligan@redhat.com>2023-08-16 21:25:03 +0200
committerJohn Mulligan <jmulligan@redhat.com>2023-08-30 20:02:15 +0200
commit34d2977a72985952a1dffa3b80e61a0fafd891ca (patch)
treee7af17fce92f45d81b5402527a4005f966b48bcd /src/cephadm/cephadmlib/call_wrappers.py
parentcephadm: black format exe_utils.py (diff)
downloadceph-34d2977a72985952a1dffa3b80e61a0fafd891ca.tar.xz
ceph-34d2977a72985952a1dffa3b80e61a0fafd891ca.zip
cephadm: black format call_wrappers.py
Signed-off-by: John Mulligan <jmulligan@redhat.com> Pair-programmed-with: Adam King <adking@redhat.com> Co-authored-by: Adam King <adking@redhat.com>
Diffstat (limited to 'src/cephadm/cephadmlib/call_wrappers.py')
-rw-r--r--src/cephadm/cephadmlib/call_wrappers.py110
1 files changed, 71 insertions, 39 deletions
diff --git a/src/cephadm/cephadmlib/call_wrappers.py b/src/cephadm/cephadmlib/call_wrappers.py
index d2881027bae..3fe2171e99d 100644
--- a/src/cephadm/cephadmlib/call_wrappers.py
+++ b/src/cephadm/cephadmlib/call_wrappers.py
@@ -61,7 +61,7 @@ class CallVerbosity(Enum):
self.DEBUG: logging.DEBUG,
self.QUIET_UNLESS_ERROR: QUIET_LOG_LEVEL,
self.VERBOSE_ON_FAILURE: logging.DEBUG,
- self.VERBOSE: logging.INFO
+ self.VERBOSE: logging.INFO,
}
return _verbosity_level_to_log_level[self] # type: ignore
@@ -72,7 +72,7 @@ class CallVerbosity(Enum):
self.DEBUG: logging.DEBUG,
self.QUIET_UNLESS_ERROR: logging.INFO,
self.VERBOSE_ON_FAILURE: logging.INFO,
- self.VERBOSE: logging.INFO
+ self.VERBOSE: logging.INFO,
}
return _verbosity_level_to_log_level[self] # type: ignore
@@ -107,8 +107,11 @@ if sys.version_info < (3, 8): # pragma: no cover
def _join_threads(self) -> None:
"""Internal: Join all non-daemon threads"""
- threads = [thread for thread in list(self._threads.values())
- if thread.is_alive() and not thread.daemon]
+ threads = [
+ thread
+ for thread in list(self._threads.values())
+ if thread.is_alive() and not thread.daemon
+ ]
for thread in threads:
thread.join()
@@ -119,19 +122,28 @@ if sys.version_info < (3, 8): # pragma: no cover
pass
def __del__(self, _warn: Any = warnings.warn) -> None:
- threads = [thread for thread in list(self._threads.values())
- if thread.is_alive()]
+ threads = [
+ thread
+ for thread in list(self._threads.values())
+ if thread.is_alive()
+ ]
if threads:
- _warn(f'{self.__class__} has registered but not finished child processes',
- ResourceWarning,
- source=self)
-
- def add_child_handler(self, pid: Any, callback: Any, *args: Any) -> None:
+ _warn(
+ f'{self.__class__} has registered but not finished child processes',
+ ResourceWarning,
+ source=self,
+ )
+
+ def add_child_handler(
+ self, pid: Any, callback: Any, *args: Any
+ ) -> None:
loop = events.get_event_loop()
- thread = threading.Thread(target=self._do_waitpid,
- name=f'waitpid-{next(self._pid_counter)}',
- args=(loop, pid, callback, args),
- daemon=True)
+ thread = threading.Thread(
+ target=self._do_waitpid,
+ name=f'waitpid-{next(self._pid_counter)}',
+ args=(loop, pid, callback, args),
+ daemon=True,
+ )
self._threads[pid] = thread
thread.start()
@@ -144,7 +156,9 @@ if sys.version_info < (3, 8): # pragma: no cover
def attach_loop(self, loop: Any) -> None:
pass
- def _do_waitpid(self, loop: Any, expected_pid: Any, callback: Any, args: Any) -> None:
+ def _do_waitpid(
+ self, loop: Any, expected_pid: Any, callback: Any, args: Any
+ ) -> None:
assert expected_pid > 0
try:
@@ -156,7 +170,8 @@ if sys.version_info < (3, 8): # pragma: no cover
returncode = 255
logger.warning(
'Unknown child process pid %d, will report returncode 255',
- pid)
+ pid,
+ )
else:
if os.WIFEXITED(status):
returncode = os.WEXITSTATUS(status)
@@ -165,11 +180,16 @@ if sys.version_info < (3, 8): # pragma: no cover
else:
raise ValueError(f'unknown wait status {status}')
if loop.get_debug():
- logger.debug('process %s exited with returncode %s',
- expected_pid, returncode)
+ logger.debug(
+ 'process %s exited with returncode %s',
+ expected_pid,
+ returncode,
+ )
if loop.is_closed():
- logger.warning('Loop %r that handles pid %r is closed', loop, pid)
+ logger.warning(
+ 'Loop %r that handles pid %r is closed', loop, pid
+ )
else:
loop.call_soon_threadsafe(callback, pid, returncode, *args)
@@ -183,7 +203,7 @@ if sys.version_info < (3, 8): # pragma: no cover
try:
- from asyncio import run as async_run # type: ignore[attr-defined]
+ from asyncio import run as async_run # type: ignore[attr-defined]
except ImportError: # pragma: no cover
# disable coverage for this block. it should be a copy-n-paste from
# from newer libs for compatibilty on older python versions
@@ -200,12 +220,14 @@ except ImportError: # pragma: no cover
loop.close()
-def call(ctx: CephadmContext,
- command: List[str],
- desc: Optional[str] = None,
- verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
- timeout: Optional[int] = DEFAULT_TIMEOUT,
- **kwargs: Any) -> Tuple[str, str, int]:
+def call(
+ ctx: CephadmContext,
+ command: List[str],
+ desc: Optional[str] = None,
+ verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
+ timeout: Optional[int] = DEFAULT_TIMEOUT,
+ **kwargs: Any,
+) -> Tuple[str, str, int]:
"""
Wrap subprocess.Popen to
@@ -226,7 +248,8 @@ def call(ctx: CephadmContext,
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
- env=os.environ.copy())
+ env=os.environ.copy(),
+ )
assert process.stdout
assert process.stderr
try:
@@ -258,7 +281,10 @@ def call(ctx: CephadmContext,
log_level = verbosity.success_log_level()
if returncode != 0:
log_level = verbosity.error_log_level()
- logger.log(log_level, f'Non-zero exit code {returncode} from {" ".join(command)}')
+ logger.log(
+ log_level,
+ f'Non-zero exit code {returncode} from {" ".join(command)}',
+ )
for line in stdout.splitlines():
logger.log(log_level, prefix + 'stdout ' + line)
for line in stderr.splitlines():
@@ -267,25 +293,29 @@ def call(ctx: CephadmContext,
def call_throws(
- ctx: CephadmContext,
- command: List[str],
- desc: Optional[str] = None,
- verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
- timeout: Optional[int] = DEFAULT_TIMEOUT,
- **kwargs: Any) -> Tuple[str, str, int]:
+ ctx: CephadmContext,
+ command: List[str],
+ desc: Optional[str] = None,
+ verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
+ timeout: Optional[int] = DEFAULT_TIMEOUT,
+ **kwargs: Any,
+) -> Tuple[str, str, int]:
out, err, ret = call(ctx, command, desc, verbosity, timeout, **kwargs)
if ret:
for s in (out, err):
if s.strip() and len(s.splitlines()) <= 2: # readable message?
- raise RuntimeError(f'Failed command: {" ".join(command)}: {s}')
+ raise RuntimeError(
+ f'Failed command: {" ".join(command)}: {s}'
+ )
raise RuntimeError('Failed command: %s' % ' '.join(command))
return out, err, ret
def call_timeout(ctx, command, timeout):
# type: (CephadmContext, List[str], int) -> int
- logger.debug('Running command (timeout=%s): %s'
- % (timeout, ' '.join(command)))
+ logger.debug(
+ 'Running command (timeout=%s): %s' % (timeout, ' '.join(command))
+ )
def raise_timeout(command, timeout):
# type: (List[str], int) -> NoReturn
@@ -294,6 +324,8 @@ def call_timeout(ctx, command, timeout):
raise TimeoutExpired(msg)
try:
- return subprocess.call(command, timeout=timeout, env=os.environ.copy())
+ return subprocess.call(
+ command, timeout=timeout, env=os.environ.copy()
+ )
except subprocess.TimeoutExpired:
raise_timeout(command, timeout)