diff options
author | John Mulligan <jmulligan@redhat.com> | 2023-08-16 21:25:03 +0200 |
---|---|---|
committer | John Mulligan <jmulligan@redhat.com> | 2023-08-30 20:02:15 +0200 |
commit | 34d2977a72985952a1dffa3b80e61a0fafd891ca (patch) | |
tree | e7af17fce92f45d81b5402527a4005f966b48bcd /src/cephadm/cephadmlib/call_wrappers.py | |
parent | cephadm: black format exe_utils.py (diff) | |
download | ceph-34d2977a72985952a1dffa3b80e61a0fafd891ca.tar.xz ceph-34d2977a72985952a1dffa3b80e61a0fafd891ca.zip |
cephadm: black format call_wrappers.py
Signed-off-by: John Mulligan <jmulligan@redhat.com>
Pair-programmed-with: Adam King <adking@redhat.com>
Co-authored-by: Adam King <adking@redhat.com>
Diffstat (limited to 'src/cephadm/cephadmlib/call_wrappers.py')
-rw-r--r-- | src/cephadm/cephadmlib/call_wrappers.py | 110 |
1 files changed, 71 insertions, 39 deletions
diff --git a/src/cephadm/cephadmlib/call_wrappers.py b/src/cephadm/cephadmlib/call_wrappers.py index d2881027bae..3fe2171e99d 100644 --- a/src/cephadm/cephadmlib/call_wrappers.py +++ b/src/cephadm/cephadmlib/call_wrappers.py @@ -61,7 +61,7 @@ class CallVerbosity(Enum): self.DEBUG: logging.DEBUG, self.QUIET_UNLESS_ERROR: QUIET_LOG_LEVEL, self.VERBOSE_ON_FAILURE: logging.DEBUG, - self.VERBOSE: logging.INFO + self.VERBOSE: logging.INFO, } return _verbosity_level_to_log_level[self] # type: ignore @@ -72,7 +72,7 @@ class CallVerbosity(Enum): self.DEBUG: logging.DEBUG, self.QUIET_UNLESS_ERROR: logging.INFO, self.VERBOSE_ON_FAILURE: logging.INFO, - self.VERBOSE: logging.INFO + self.VERBOSE: logging.INFO, } return _verbosity_level_to_log_level[self] # type: ignore @@ -107,8 +107,11 @@ if sys.version_info < (3, 8): # pragma: no cover def _join_threads(self) -> None: """Internal: Join all non-daemon threads""" - threads = [thread for thread in list(self._threads.values()) - if thread.is_alive() and not thread.daemon] + threads = [ + thread + for thread in list(self._threads.values()) + if thread.is_alive() and not thread.daemon + ] for thread in threads: thread.join() @@ -119,19 +122,28 @@ if sys.version_info < (3, 8): # pragma: no cover pass def __del__(self, _warn: Any = warnings.warn) -> None: - threads = [thread for thread in list(self._threads.values()) - if thread.is_alive()] + threads = [ + thread + for thread in list(self._threads.values()) + if thread.is_alive() + ] if threads: - _warn(f'{self.__class__} has registered but not finished child processes', - ResourceWarning, - source=self) - - def add_child_handler(self, pid: Any, callback: Any, *args: Any) -> None: + _warn( + f'{self.__class__} has registered but not finished child processes', + ResourceWarning, + source=self, + ) + + def add_child_handler( + self, pid: Any, callback: Any, *args: Any + ) -> None: loop = events.get_event_loop() - thread = threading.Thread(target=self._do_waitpid, - name=f'waitpid-{next(self._pid_counter)}', - args=(loop, pid, callback, args), - daemon=True) + thread = threading.Thread( + target=self._do_waitpid, + name=f'waitpid-{next(self._pid_counter)}', + args=(loop, pid, callback, args), + daemon=True, + ) self._threads[pid] = thread thread.start() @@ -144,7 +156,9 @@ if sys.version_info < (3, 8): # pragma: no cover def attach_loop(self, loop: Any) -> None: pass - def _do_waitpid(self, loop: Any, expected_pid: Any, callback: Any, args: Any) -> None: + def _do_waitpid( + self, loop: Any, expected_pid: Any, callback: Any, args: Any + ) -> None: assert expected_pid > 0 try: @@ -156,7 +170,8 @@ if sys.version_info < (3, 8): # pragma: no cover returncode = 255 logger.warning( 'Unknown child process pid %d, will report returncode 255', - pid) + pid, + ) else: if os.WIFEXITED(status): returncode = os.WEXITSTATUS(status) @@ -165,11 +180,16 @@ if sys.version_info < (3, 8): # pragma: no cover else: raise ValueError(f'unknown wait status {status}') if loop.get_debug(): - logger.debug('process %s exited with returncode %s', - expected_pid, returncode) + logger.debug( + 'process %s exited with returncode %s', + expected_pid, + returncode, + ) if loop.is_closed(): - logger.warning('Loop %r that handles pid %r is closed', loop, pid) + logger.warning( + 'Loop %r that handles pid %r is closed', loop, pid + ) else: loop.call_soon_threadsafe(callback, pid, returncode, *args) @@ -183,7 +203,7 @@ if sys.version_info < (3, 8): # pragma: no cover try: - from asyncio import run as async_run # type: ignore[attr-defined] + from asyncio import run as async_run # type: ignore[attr-defined] except ImportError: # pragma: no cover # disable coverage for this block. it should be a copy-n-paste from # from newer libs for compatibilty on older python versions @@ -200,12 +220,14 @@ except ImportError: # pragma: no cover loop.close() -def call(ctx: CephadmContext, - command: List[str], - desc: Optional[str] = None, - verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, - timeout: Optional[int] = DEFAULT_TIMEOUT, - **kwargs: Any) -> Tuple[str, str, int]: +def call( + ctx: CephadmContext, + command: List[str], + desc: Optional[str] = None, + verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, + timeout: Optional[int] = DEFAULT_TIMEOUT, + **kwargs: Any, +) -> Tuple[str, str, int]: """ Wrap subprocess.Popen to @@ -226,7 +248,8 @@ def call(ctx: CephadmContext, *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - env=os.environ.copy()) + env=os.environ.copy(), + ) assert process.stdout assert process.stderr try: @@ -258,7 +281,10 @@ def call(ctx: CephadmContext, log_level = verbosity.success_log_level() if returncode != 0: log_level = verbosity.error_log_level() - logger.log(log_level, f'Non-zero exit code {returncode} from {" ".join(command)}') + logger.log( + log_level, + f'Non-zero exit code {returncode} from {" ".join(command)}', + ) for line in stdout.splitlines(): logger.log(log_level, prefix + 'stdout ' + line) for line in stderr.splitlines(): @@ -267,25 +293,29 @@ def call(ctx: CephadmContext, def call_throws( - ctx: CephadmContext, - command: List[str], - desc: Optional[str] = None, - verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, - timeout: Optional[int] = DEFAULT_TIMEOUT, - **kwargs: Any) -> Tuple[str, str, int]: + ctx: CephadmContext, + command: List[str], + desc: Optional[str] = None, + verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, + timeout: Optional[int] = DEFAULT_TIMEOUT, + **kwargs: Any, +) -> Tuple[str, str, int]: out, err, ret = call(ctx, command, desc, verbosity, timeout, **kwargs) if ret: for s in (out, err): if s.strip() and len(s.splitlines()) <= 2: # readable message? - raise RuntimeError(f'Failed command: {" ".join(command)}: {s}') + raise RuntimeError( + f'Failed command: {" ".join(command)}: {s}' + ) raise RuntimeError('Failed command: %s' % ' '.join(command)) return out, err, ret def call_timeout(ctx, command, timeout): # type: (CephadmContext, List[str], int) -> int - logger.debug('Running command (timeout=%s): %s' - % (timeout, ' '.join(command))) + logger.debug( + 'Running command (timeout=%s): %s' % (timeout, ' '.join(command)) + ) def raise_timeout(command, timeout): # type: (List[str], int) -> NoReturn @@ -294,6 +324,8 @@ def call_timeout(ctx, command, timeout): raise TimeoutExpired(msg) try: - return subprocess.call(command, timeout=timeout, env=os.environ.copy()) + return subprocess.call( + command, timeout=timeout, env=os.environ.copy() + ) except subprocess.TimeoutExpired: raise_timeout(command, timeout) |