summaryrefslogtreecommitdiffstats
path: root/git_remote_helpers/util.py
blob: dce83e60660825ba115c503ce0776955d90c1a44 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
#!/usr/bin/env python

"""Misc. useful functionality used by the rest of this package.

This module provides common functionality used by the other modules in
this package.

"""

import sys
import os
import subprocess


# Whether or not to show debug messages
DEBUG = False

def notify(msg, *args):
    """Print a message to stderr."""
    print >> sys.stderr, msg % args

def debug (msg, *args):
    """Print a debug message to stderr when DEBUG is enabled."""
    if DEBUG:
        print >> sys.stderr, msg % args

def error (msg, *args):
    """Print an error message to stderr."""
    print >> sys.stderr, "ERROR:", msg % args

def warn(msg, *args):
    """Print a warning message to stderr."""
    print >> sys.stderr, "warning:", msg % args

def die (msg, *args):
    """Print as error message to stderr and exit the program."""
    error(msg, *args)
    sys.exit(1)


class ProgressIndicator(object):

    """Simple progress indicator.

    Displayed as a spinning character by default, but can be customized
    by passing custom messages that overrides the spinning character.

    """

    States = ("|", "/", "-", "\\")

    def __init__ (self, prefix = "", f = sys.stdout):
        """Create a new ProgressIndicator, bound to the given file object."""
        self.n = 0  # Simple progress counter
        self.f = f  # Progress is written to this file object
        self.prev_len = 0  # Length of previous msg (to be overwritten)
        self.prefix = prefix  # Prefix prepended to each progress message
        self.prefix_lens = [] # Stack of prefix string lengths

    def pushprefix (self, prefix):
        """Append the given prefix onto the prefix stack."""
        self.prefix_lens.append(len(self.prefix))
        self.prefix += prefix

    def popprefix (self):
        """Remove the last prefix from the prefix stack."""
        prev_len = self.prefix_lens.pop()
        self.prefix = self.prefix[:prev_len]

    def __call__ (self, msg = None, lf = False):
        """Indicate progress, possibly with a custom message."""
        if msg is None:
            msg = self.States[self.n % len(self.States)]
        msg = self.prefix + msg
        print >> self.f, "\r%-*s" % (self.prev_len, msg),
        self.prev_len = len(msg.expandtabs())
        if lf:
            print >> self.f
            self.prev_len = 0
        self.n += 1

    def finish (self, msg = "done", noprefix = False):
        """Finalize progress indication with the given message."""
        if noprefix:
            self.prefix = ""
        self(msg, True)


def start_command (args, cwd = None, shell = False, add_env = None,
                   stdin = subprocess.PIPE, stdout = subprocess.PIPE,
                   stderr = subprocess.PIPE):
    """Start the given command, and return a subprocess object.

    This provides a simpler interface to the subprocess module.

    """
    env = None
    if add_env is not None:
        env = os.environ.copy()
        env.update(add_env)
    return subprocess.Popen(args, bufsize = 1, stdin = stdin, stdout = stdout,
                            stderr = stderr, cwd = cwd, shell = shell,
                            env = env, universal_newlines = True)


def run_command (args, cwd = None, shell = False, add_env = None,
                 flag_error = True):
    """Run the given command to completion, and return its results.

    This provides a simpler interface to the subprocess module.

    The results are formatted as a 3-tuple: (exit_code, output, errors)

    If flag_error is enabled, Error messages will be produced if the
    subprocess terminated with a non-zero exit code and/or stderr
    output.

    The other arguments are passed on to start_command().

    """
    process = start_command(args, cwd, shell, add_env)
    (output, errors) = process.communicate()
    exit_code = process.returncode
    if flag_error and errors:
        error("'%s' returned errors:\n---\n%s---", " ".join(args), errors)
    if flag_error and exit_code:
        error("'%s' returned exit code %i", " ".join(args), exit_code)
    return (exit_code, output, errors)


def file_reader_method (missing_ok = False):
    """Decorator for simplifying reading of files.

    If missing_ok is True, a failure to open a file for reading will
    not raise the usual IOError, but instead the wrapped method will be
    called with f == None.  The method must in this case properly
    handle f == None.

    """
    def _wrap (method):
        """Teach given method to handle both filenames and file objects.

        The given method must take a file object as its second argument
        (the first argument being 'self', of course).  This decorator
        will take a filename given as the second argument and promote
        it to a file object.

        """
        def _wrapped_method (self, filename, *args, **kwargs):
            if isinstance(filename, file):
                f = filename
            else:
                try:
                    f = open(filename, 'r')
                except IOError:
                    if missing_ok:
                        f = None
                    else:
                        raise
            try:
                return method(self, f, *args, **kwargs)
            finally:
                if not isinstance(filename, file) and f:
                    f.close()
        return _wrapped_method
    return _wrap


def file_writer_method (method):
    """Decorator for simplifying writing of files.

    Enables the given method to handle both filenames and file objects.

    The given method must take a file object as its second argument
    (the first argument being 'self', of course).  This decorator will
    take a filename given as the second argument and promote it to a
    file object.

    """
    def _new_method (self, filename, *args, **kwargs):
        if isinstance(filename, file):
            f = filename
        else:
            # Make sure the containing directory exists
            parent_dir = os.path.dirname(filename)
            if not os.path.isdir(parent_dir):
                os.makedirs(parent_dir)
            f = open(filename, 'w')
        try:
            return method(self, f, *args, **kwargs)
        finally:
            if not isinstance(filename, file):
                f.close()
    return _new_method