278 lines
8.5 KiB
278 lines
8.5 KiB
#!/usr/bin/env python
# encoding: utf-8
import codecs
import logging
import os
import pipes
from six import string_types
import subprocess
import sys
import time
import types
import unittest
global logger
global stdout
global stderr
global timing
global log_command
logger = None
stdout = False
stderr = False
# print execution time of each command in the log, just after the return code
timing = True
# outputs the command being executed to the log (before command output)
log_command = True
_sentinel = object()
def quote_command(cmd):
"""This function does assure that the command line is entirely quoted.
This is required in order to prevent getting "The input line is too long" error message.
if not (os.name == "nt" or os.name == "dos"):
# the escaping is required only on Windows platforms, in fact it will
# break cmd line on others
return cmd
if '"' in cmd[1:-1]:
cmd = '"' + cmd + '"'
return cmd
def system2(cmd, cwd=None, logger=_sentinel, stdout=_sentinel, log_command=_sentinel, timing=_sentinel):
# def tee(cmd, cwd=None, logger=tee_logger, console=tee_console):
"""Works exactly like :func:`system` but it returns both the exit code and the output as a list of lines.
This method returns a tuple: (return_code, output_lines_as_list). The return code of 0 means success.
# if isinstance(cmd, collections.Iterable): # -- this line was replaced
# because collections.Iterable seems to be missing on Debian Python 2.5.5
# (but not on OS X 10.8 with Python 2.5.6)
if hasattr(cmd, '__iter__'):
cmd = " ".join(pipes.quote(s) for s in cmd)
t = time.process_time()
output = []
if log_command is _sentinel:
log_command = globals().get('log_command')
if timing is _sentinel:
timing = globals().get('timing')
# default to python native logger if logger parameter is not used
if logger is _sentinel:
logger = globals().get('logger')
if stdout is _sentinel:
stdout = globals().get('stdout')
# logging.debug("logger=%s stdout=%s" % (logger, stdout))
f = sys.stdout
if not f.encoding or f.encoding == 'ascii':
# `ascii` is not a valid encoding by our standards, it's better to output to UTF-8 because it can encoding any Unicode text
encoding = 'utf_8'
encoding = f.encoding
def filelogger(msg):
# we'll use the same endline on all platforms, you like it or not
msg += '\n'
except TypeError:
except Exception:
import traceback
print(' ****** ERROR: Exception: %s\nencoding = %s' %
(e, encoding))
def nop(msg):
if not logger:
mylogger = nop
elif isinstance(logger, string_types):
f = codecs.open(logger, "a+b", 'utf_8')
mylogger = filelogger
elif isinstance(logger, (types.FunctionType, types.MethodType, types.BuiltinFunctionType)):
mylogger = logger
method_write = getattr(logger, "write", None)
# if we can call write() we'll aceppt it :D
# this should work for filehandles
if hasattr(method_write, '__call__'):
f = logger
mylogger = filelogger
sys.exit("tee() does not support this type of logger=%s" %
if cwd is not None and not os.path.isdir(cwd):
os.makedirs(cwd) # this throws exception if fails
cmd = quote_command(cmd) # to prevent _popen() bug
p = subprocess.Popen(
cmd, cwd=cwd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if log_command:
mylogger("Running: %s" % cmd)
while True:
line = ""
line = p.stdout.readline()
line = line.decode(encoding)
except Exception:
e = sys.exc_info()[1]
logging.error("The output of the command could not be decoded as %s\ncmd: %s\n line ignored: %s" %
(encoding, cmd, repr(line)))
if not line:
line = line.rstrip('\n\r')
mylogger(line) # they are added by logging anyway
if stdout:
returncode = p.wait()
if log_command:
if timing:
def secondsToStr(t):
return time.strftime('%H:%M:%S', time.gmtime(t))
mylogger("Returned: %d (execution time %s)\n" %
(returncode, secondsToStr(time.process_time() - t)))
mylogger("Returned: %d\n" % returncode)
# running a tool that returns non-zero? this deserves a warning
if not returncode == 0:
logging.warning("Returned: %d from: %s\nOutput %s" %
(returncode, cmd, '\n'.join(output)))
return returncode, output
def system(cmd, cwd=None, logger=None, stdout=None, log_command=_sentinel, timing=_sentinel):
"""This works similar to :py:func:`os.system` but add some useful optional parameters.
* ``cmd`` - command to be executed
* ``cwd`` - optional working directory to be set before running cmd
* ``logger`` - None, a filename, handle or a function like print or :py:meth:`logging.Logger.warning`
Returns the exit code reported by the execution of the command, 0 means success.
>>> import os, logging
... import tendo.tee
... tee.system("echo test", logger=logging.error) # output using python logging
... tee.system("echo test", logger="log.txt") # output to a file
... f = open("log.txt", "w")
... tee.system("echo test", logger=f) # output to a filehandle
... tee.system("echo test", logger=print) # use the print() function for output
(returncode, output) = system2(cmd, cwd=cwd, logger=logger,
stdout=stdout, log_command=log_command, timing=timing)
return returncode
class testTee(unittest.TestCase):
def test_1(self):
"""No CMD os.system()
1 sort /? ok ok
2 "sort" /? ok ok
3 sort "/?" ok ok
4 "sort" "/?" ok [bad]
5 ""sort /?"" ok [bad]
6 "sort /?" [bad] ok
7 "sort "/?"" [bad] ok
8 ""sort" "/?"" [bad] ok
quotes = {
'dir >nul': 'dir >nul',
'cd /D "C:\\Program Files\\"': '"cd /D "C:\\Program Files\\""',
'python -c "import os" dummy': '"python -c "import os" dummy"',
'sort': 'sort',
# we fake the os name because we want to run the test on any platform
save = os.name
os.name = 'nt'
for key, value in quotes.items():
resulted_value = quote_command(key)
value, resulted_value, "Returned <%s>, expected <%s>" % (resulted_value, value))
# ret = os.system(resulted_value)
# if not ret==0:
# print("failed")
os.name = save
def test_2(self):
self.assertEqual(system(['python', '-V']), 0)
def test_3(self):
self.assertEqual(system2(['python', '-V'])[0], 0)
def test_4(self):
self.assertEqual(system(['python', '-c', "print('c c')"]), 0)
if __name__ == '__main__':
# unittest.main()
import pytest
# import pytest
# pytest.main(['--pyargs', __name__])
import colorer
import tempfile, os
# default (stdout)
system("python --version")
# function/method
system("python --version", logger=logging.error)
# function (this is the same as default)
system("python --version", logger=print)
# handler
f = tempfile.NamedTemporaryFile()
system("python --version", logger=f)
# test with string (filename)
(f, fname) = tempfile.mkstemp()
system("python --version", logger=fname)
stdout = False
logger = None
system("echo test")
stdout = True
system("echo test2")