pep-mklive/pylibraries/tendo/tee.py

278 lines
8.5 KiB
Python
Raw Normal View History

2024-10-27 15:16:46 -01:00
#!/usr/bin/env python
# encoding: utf-8
import codecs
import logging
import os
import pipes
from six import string_types
import subprocess
import sys
import time
import types
import unittest
global logger
global stdout
global stderr
global timing
global log_command
logger = None
stdout = False
stderr = False
# print execution time of each command in the log, just after the return code
timing = True
# outputs the command being executed to the log (before command output)
log_command = True
_sentinel = object()
def quote_command(cmd):
"""This function does assure that the command line is entirely quoted.
This is required in order to prevent getting "The input line is too long" error message.
"""
if not (os.name == "nt" or os.name == "dos"):
# the escaping is required only on Windows platforms, in fact it will
# break cmd line on others
return cmd
if '"' in cmd[1:-1]:
cmd = '"' + cmd + '"'
return cmd
def system2(cmd, cwd=None, logger=_sentinel, stdout=_sentinel, log_command=_sentinel, timing=_sentinel):
# def tee(cmd, cwd=None, logger=tee_logger, console=tee_console):
"""Works exactly like :func:`system` but it returns both the exit code and the output as a list of lines.
This method returns a tuple: (return_code, output_lines_as_list). The return code of 0 means success.
"""
# if isinstance(cmd, collections.Iterable): # -- this line was replaced
# because collections.Iterable seems to be missing on Debian Python 2.5.5
# (but not on OS X 10.8 with Python 2.5.6)
if hasattr(cmd, '__iter__'):
cmd = " ".join(pipes.quote(s) for s in cmd)
t = time.process_time()
output = []
if log_command is _sentinel:
log_command = globals().get('log_command')
if timing is _sentinel:
timing = globals().get('timing')
# default to python native logger if logger parameter is not used
if logger is _sentinel:
logger = globals().get('logger')
if stdout is _sentinel:
stdout = globals().get('stdout')
# logging.debug("logger=%s stdout=%s" % (logger, stdout))
f = sys.stdout
if not f.encoding or f.encoding == 'ascii':
# `ascii` is not a valid encoding by our standards, it's better to output to UTF-8 because it can encoding any Unicode text
encoding = 'utf_8'
else:
encoding = f.encoding
def filelogger(msg):
try:
# we'll use the same endline on all platforms, you like it or not
msg += '\n'
try:
f.write(msg)
except TypeError:
f.write(msg.encode("utf-8"))
except Exception:
sys.exc_info()[1]
import traceback
print(' ****** ERROR: Exception: %s\nencoding = %s' %
(e, encoding))
traceback.print_exc(file=sys.stderr)
sys.exit(-1)
pass
def nop(msg):
pass
if not logger:
mylogger = nop
elif isinstance(logger, string_types):
f = codecs.open(logger, "a+b", 'utf_8')
mylogger = filelogger
elif isinstance(logger, (types.FunctionType, types.MethodType, types.BuiltinFunctionType)):
mylogger = logger
else:
method_write = getattr(logger, "write", None)
# if we can call write() we'll aceppt it :D
# this should work for filehandles
if hasattr(method_write, '__call__'):
f = logger
mylogger = filelogger
else:
sys.exit("tee() does not support this type of logger=%s" %
type(logger))
if cwd is not None and not os.path.isdir(cwd):
os.makedirs(cwd) # this throws exception if fails
cmd = quote_command(cmd) # to prevent _popen() bug
p = subprocess.Popen(
cmd, cwd=cwd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if log_command:
mylogger("Running: %s" % cmd)
while True:
line = ""
try:
line = p.stdout.readline()
line = line.decode(encoding)
except Exception:
e = sys.exc_info()[1]
logging.error(e)
logging.error("The output of the command could not be decoded as %s\ncmd: %s\n line ignored: %s" %
(encoding, cmd, repr(line)))
pass
output.append(line)
if not line:
break
line = line.rstrip('\n\r')
mylogger(line) # they are added by logging anyway
if stdout:
print(line)
returncode = p.wait()
if log_command:
if timing:
def secondsToStr(t):
return time.strftime('%H:%M:%S', time.gmtime(t))
mylogger("Returned: %d (execution time %s)\n" %
(returncode, secondsToStr(time.process_time() - t)))
else:
mylogger("Returned: %d\n" % returncode)
# running a tool that returns non-zero? this deserves a warning
if not returncode == 0:
logging.warning("Returned: %d from: %s\nOutput %s" %
(returncode, cmd, '\n'.join(output)))
return returncode, output
def system(cmd, cwd=None, logger=None, stdout=None, log_command=_sentinel, timing=_sentinel):
"""This works similar to :py:func:`os.system` but add some useful optional parameters.
* ``cmd`` - command to be executed
* ``cwd`` - optional working directory to be set before running cmd
* ``logger`` - None, a filename, handle or a function like print or :py:meth:`logging.Logger.warning`
Returns the exit code reported by the execution of the command, 0 means success.
>>> import os, logging
... import tendo.tee
... tee.system("echo test", logger=logging.error) # output using python logging
... tee.system("echo test", logger="log.txt") # output to a file
... f = open("log.txt", "w")
... tee.system("echo test", logger=f) # output to a filehandle
... tee.system("echo test", logger=print) # use the print() function for output
"""
(returncode, output) = system2(cmd, cwd=cwd, logger=logger,
stdout=stdout, log_command=log_command, timing=timing)
return returncode
class testTee(unittest.TestCase):
def test_1(self):
"""No CMD os.system()
1 sort /? ok ok
2 "sort" /? ok ok
3 sort "/?" ok ok
4 "sort" "/?" ok [bad]
5 ""sort /?"" ok [bad]
6 "sort /?" [bad] ok
7 "sort "/?"" [bad] ok
8 ""sort" "/?"" [bad] ok
"""
quotes = {
'dir >nul': 'dir >nul',
'cd /D "C:\\Program Files\\"': '"cd /D "C:\\Program Files\\""',
'python -c "import os" dummy': '"python -c "import os" dummy"',
'sort': 'sort',
}
# we fake the os name because we want to run the test on any platform
save = os.name
os.name = 'nt'
for key, value in quotes.items():
resulted_value = quote_command(key)
self.assertEqual(
value, resulted_value, "Returned <%s>, expected <%s>" % (resulted_value, value))
# ret = os.system(resulted_value)
# if not ret==0:
# print("failed")
os.name = save
def test_2(self):
self.assertEqual(system(['python', '-V']), 0)
def test_3(self):
self.assertEqual(system2(['python', '-V'])[0], 0)
def test_4(self):
self.assertEqual(system(['python', '-c', "print('c c')"]), 0)
if __name__ == '__main__':
# unittest.main()
import pytest
pytest.main([__file__])
# import pytest
# pytest.main(['--pyargs', __name__])
"""
import colorer
import tempfile, os
logging.basicConfig(level=logging.NOTSET,
format='%(message)s')
# default (stdout)
print("#1")
system("python --version")
# function/method
print("#2")
system("python --version", logger=logging.error)
# function (this is the same as default)
print("#3")
system("python --version", logger=print)
# handler
print("#4")
f = tempfile.NamedTemporaryFile()
system("python --version", logger=f)
f.close()
# test with string (filename)
print("#5")
(f, fname) = tempfile.mkstemp()
system("python --version", logger=fname)
os.close(f)
os.unlink(fname)
print("#6")
stdout = False
logger = None
system("echo test")
print("#7")
stdout = True
system("echo test2")
"""