summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJohannes Berg <johannes@sipsolutions.net>2013-03-30 00:37:13 +0100
committerJohannes Berg <johannes@sipsolutions.net>2013-03-30 01:07:52 +0100
commit4b8bdcd1de46e7b7cfabcd971adac444510171a1 (patch)
tree4ed07bd295c53a68a89a7d96294a4e0a549649d1 /lib
parent44706a520f6527c67e7f8b741812f11464babda2 (diff)
add first attempt at new copy script and build infrastructure
Signed-off-by: Johannes Berg <johannes@sipsolutions.net>
Diffstat (limited to 'lib')
-rw-r--r--lib/git.py40
-rw-r--r--lib/kconfig.py78
-rw-r--r--lib/patch.py1051
3 files changed, 1169 insertions, 0 deletions
diff --git a/lib/git.py b/lib/git.py
new file mode 100644
index 00000000..19fbc30e
--- /dev/null
+++ b/lib/git.py
@@ -0,0 +1,40 @@
+import subprocess, re, os
+
+class GitError(Exception):
+ pass
+class SHAError(GitError):
+ pass
+class ExecutionError(GitError):
+ def __init__(self, errcode):
+ self.error_code = errcode
+
+def _check(process):
+ if process.returncode != 0:
+ raise ExecutionError(process.returncode)
+
+_sha_re = re.compile('^[0-9a-fA-F]*$')
+
+def rev_parse(rev='HEAD', tree=None):
+ olddir = os.getcwd()
+ process = subprocess.Popen(['git', 'rev-parse', rev],
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+ close_fds=True, universal_newlines=True, cwd=tree)
+ stdout = process.communicate()[0]
+ process.wait()
+ _check(process)
+
+ sha = stdout.strip()
+ if not _sha_re.match(sha):
+ raise SHAError()
+ return sha
+
+def describe(rev='HEAD', tree=None):
+ olddir = os.getcwd()
+ process = subprocess.Popen(['git', 'describe', '--always', '--long', rev],
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+ close_fds=True, universal_newlines=True, cwd=tree)
+ stdout = process.communicate()[0]
+ process.wait()
+ _check(process)
+
+ return stdout.strip()
diff --git a/lib/kconfig.py b/lib/kconfig.py
new file mode 100644
index 00000000..25bc8113
--- /dev/null
+++ b/lib/kconfig.py
@@ -0,0 +1,78 @@
+#
+# Small helper library to manipulate Kconfig files
+#
+
+import os, re
+
+src_line = re.compile(r'^source\s+"?(?P<src>[^\s"]*)"?\s*$')
+tri_line = re.compile(r'^(?P<spc>\s+)tristate')
+cfg_line = re.compile(r'^config\s+(?P<sym>[^\s]*)')
+sel_line = re.compile(r'^(?P<spc>\s+)select\s+(?P<sym>[^\s]*)\s*$')
+
+class ConfigTree(object):
+ def __init__(self, rootfile):
+ self.basedir = os.path.dirname(rootfile)
+ self.rootfile = os.path.basename(rootfile)
+
+ def _walk(self, f):
+ list = [f]
+ for l in open(os.path.join(self.basedir, f), 'r'):
+ m = src_line.match(l)
+ if m and os.path.exists(os.path.join(self.basedir, m.group('src'))):
+ list.extend(self._walk(m.group('src')))
+ return list
+
+ def _prune_sources(self, f, ignore):
+ for nf in self._walk(f):
+ out = ''
+ for l in open(os.path.join(self.basedir, nf), 'r'):
+ m = src_line.match(l)
+ if not m:
+ out += l
+ continue
+ src = m.group('src')
+ if src in ignore or os.path.exists(os.path.join(self.basedir, src)):
+ out += l
+ else:
+ out += '#' + l
+ outf = open(os.path.join(self.basedir, nf), 'w')
+ outf.write(out)
+ outf.close()
+
+ def prune_sources(self, ignore=[]):
+ self._prune_sources(self.rootfile, ignore)
+
+ def force_tristate_modular(self):
+ for nf in self._walk(self.rootfile):
+ out = ''
+ for l in open(os.path.join(self.basedir, nf), 'r'):
+ m = tri_line.match(l)
+ out += l
+ if m:
+ out += m.group('spc') + "depends on m\n"
+ outf = open(os.path.join(self.basedir, nf), 'w')
+ outf.write(out)
+ outf.close()
+
+ def symbols(self):
+ syms = []
+ for nf in self._walk(self.rootfile):
+ for l in open(os.path.join(self.basedir, nf), 'r'):
+ m = cfg_line.match(l)
+ if m:
+ syms.append(m.group('sym'))
+ return syms
+
+ def modify_selects(self):
+ syms = self.symbols()
+ for nf in self._walk(self.rootfile):
+ out = ''
+ for l in open(os.path.join(self.basedir, nf), 'r'):
+ m = sel_line.match(l)
+ if m and not m.group('sym') in syms:
+ out += m.group('spc') + "depends on " + m.group('sym') + '\n'
+ else:
+ out += l
+ outf = open(os.path.join(self.basedir, nf), 'w')
+ outf.write(out)
+ outf.close()
diff --git a/lib/patch.py b/lib/patch.py
new file mode 100644
index 00000000..878aa3c3
--- /dev/null
+++ b/lib/patch.py
@@ -0,0 +1,1051 @@
+#!/usr/bin/env python
+""" Patch utility to apply unified diffs
+
+ Brute-force line-by-line non-recursive parsing
+
+ Copyright (c) 2008-2012 anatoly techtonik
+ Available under the terms of MIT license
+
+ Project home: http://code.google.com/p/python-patch/
+
+
+ $Id: patch.py 181 2012-11-23 16:03:05Z techtonik $
+ $HeadURL: https://python-patch.googlecode.com/svn/trunk/patch.py $
+"""
+
+__author__ = "anatoly techtonik <techtonik@gmail.com>"
+__version__ = "1.12.11"
+
+import copy
+import logging
+import re
+# cStringIO doesn't support unicode in 2.5
+from StringIO import StringIO
+import urllib2
+
+from os.path import exists, isfile, abspath
+import os
+import shutil
+
+
+#------------------------------------------------
+# Logging is controlled by logger named after the
+# module name (e.g. 'patch' for patch.py module)
+
+debugmode = False
+
+logger = logging.getLogger(__name__)
+
+debug = logger.debug
+info = logger.info
+warning = logger.warning
+
+class NullHandler(logging.Handler):
+ """ Copied from Python 2.7 to avoid getting
+ `No handlers could be found for logger "patch"`
+ http://bugs.python.org/issue16539
+ """
+ def handle(self, record):
+ pass
+ def emit(self, record):
+ pass
+ def createLock(self):
+ self.lock = None
+
+logger.addHandler(NullHandler())
+
+#------------------------------------------------
+# Constants for Patch/PatchSet types
+
+DIFF = PLAIN = "plain"
+GIT = "git"
+HG = MERCURIAL = "mercurial"
+SVN = SUBVERSION = "svn"
+# mixed type is only actual when PatchSet contains
+# Patches of different type
+MIXED = MIXED = "mixed"
+
+
+#------------------------------------------------
+# Helpers (these could come with Python stdlib)
+
+# x...() function are used to work with paths in
+# cross-platform manner - all paths use forward
+# slashes even on Windows.
+
+def xisabs(filename):
+ """ Cross-platform version of `os.path.isabs()`
+ Returns True if `filename` is absolute on
+ Linux, OS X or Windows.
+ """
+ if filename.startswith('/'): # Linux/Unix
+ return True
+ elif filename.startswith('\\'): # Windows
+ return True
+ elif re.match(r'\w:[\\/]', filename): # Windows
+ return True
+ return False
+
+def xnormpath(path):
+ """ Cross-platform version of os.path.normpath """
+ return os.path.normpath(path).replace(os.sep, '/')
+
+def xstrip(filename):
+ """ Make relative path out of absolute by stripping
+ prefixes used on Linux, OS X and Windows.
+
+ This function is critical for security.
+ """
+ while xisabs(filename):
+ # strip windows drive with all slashes
+ if re.match(r'\w:[\\/]', filename):
+ filename = re.sub(r'^\w+:[\\/]+', '', filename)
+ # strip all slashes
+ elif re.match(r'[\\/]', filename):
+ filename = re.sub(r'^[\\/]+', '', filename)
+ return filename
+
+#-----------------------------------------------
+# Main API functions
+
+def fromfile(filename):
+ """ Parse patch file. If successful, returns
+ PatchSet() object. Otherwise returns False.
+ """
+ patchset = PatchSet()
+ debug("reading %s" % filename)
+ fp = open(filename, "rb")
+ res = patchset.parse(fp)
+ fp.close()
+ if res == True:
+ return patchset
+ return False
+
+
+def fromstring(s):
+ """ Parse text string and return PatchSet()
+ object (or False if parsing fails)
+ """
+ ps = PatchSet( StringIO(s) )
+ if ps.errors == 0:
+ return ps
+ return False
+
+
+def fromurl(url):
+ """ Parse patch from an URL, return False
+ if an error occured. Note that this also
+ can throw urlopen() exceptions.
+ """
+ ps = PatchSet( urllib2.urlopen(url) )
+ if ps.errors == 0:
+ return ps
+ return False
+
+
+# --- Utility functions ---
+# [ ] reuse more universal pathsplit()
+def pathstrip(path, n):
+ """ Strip n leading components from the given path """
+ pathlist = [path]
+ while os.path.dirname(pathlist[0]) != '':
+ pathlist[0:1] = os.path.split(pathlist[0])
+ return '/'.join(pathlist[n:])
+# --- /Utility function ---
+
+
+class Hunk(object):
+ """ Parsed hunk data container (hunk starts with @@ -R +R @@) """
+
+ def __init__(self):
+ self.startsrc=None #: line count starts with 1
+ self.linessrc=None
+ self.starttgt=None
+ self.linestgt=None
+ self.invalid=False
+ self.text=[]
+
+# def apply(self, estream):
+# """ write hunk data into enumerable stream
+# return strings one by one until hunk is
+# over
+#
+# enumerable stream are tuples (lineno, line)
+# where lineno starts with 0
+# """
+# pass
+
+
+class Patch(object):
+ """ Patch for a single file """
+ def __init__(self):
+ self.source = None
+ self.target = None
+ self.hunks = []
+ self.hunkends = []
+ self.header = []
+
+ self.type = None
+
+
+class PatchSet(object):
+
+ def __init__(self, stream=None):
+ # --- API accessible fields ---
+
+ # name of the PatchSet (filename or ...)
+ self.name = None
+ # patch set type - one of constants
+ self.type = None
+
+ # list of Patch objects
+ self.items = []
+
+ self.errors = 0 # fatal parsing errors
+ self.warnings = 0 # non-critical warnings
+ # --- /API ---
+
+ if stream:
+ self.parse(stream)
+
+ def __len__(self):
+ return len(self.items)
+
+ def parse(self, stream):
+ """ parse unified diff
+ return True on success
+ """
+ lineends = dict(lf=0, crlf=0, cr=0)
+ nexthunkno = 0 #: even if index starts with 0 user messages number hunks from 1
+
+ p = None
+ hunk = None
+ # hunkactual variable is used to calculate hunk lines for comparison
+ hunkactual = dict(linessrc=None, linestgt=None)
+
+
+ class wrapumerate(enumerate):
+ """Enumerate wrapper that uses boolean end of stream status instead of
+ StopIteration exception, and properties to access line information.
+ """
+
+ def __init__(self, *args, **kwargs):
+ # we don't call parent, it is magically created by __new__ method
+
+ self._exhausted = False
+ self._lineno = False # after end of stream equal to the num of lines
+ self._line = False # will be reset to False after end of stream
+
+ def next(self):
+ """Try to read the next line and return True if it is available,
+ False if end of stream is reached."""
+ if self._exhausted:
+ return False
+
+ try:
+ self._lineno, self._line = super(wrapumerate, self).next()
+ except StopIteration:
+ self._exhausted = True
+ self._line = False
+ return False
+ return True
+
+ @property
+ def is_empty(self):
+ return self._exhausted
+
+ @property
+ def line(self):
+ return self._line
+
+ @property
+ def lineno(self):
+ return self._lineno
+
+ # define states (possible file regions) that direct parse flow
+ headscan = True # start with scanning header
+ filenames = False # lines starting with --- and +++
+
+ hunkhead = False # @@ -R +R @@ sequence
+ hunkbody = False #
+ hunkskip = False # skipping invalid hunk mode
+
+ hunkparsed = False # state after successfully parsed hunk
+
+ # regexp to match start of hunk, used groups - 1,3,4,6
+ re_hunk_start = re.compile("^@@ -(\d+)(,(\d+))? \+(\d+)(,(\d+))?")
+
+ self.errors = 0
+ # temp buffers for header and filenames info
+ header = []
+ srcname = None
+ tgtname = None
+
+ # start of main cycle
+ # each parsing block already has line available in fe.line
+ fe = wrapumerate(stream)
+ while fe.next():
+
+ # -- deciders: these only switch state to decide who should process
+ # -- line fetched at the start of this cycle
+ if hunkparsed:
+ hunkparsed = False
+ if re_hunk_start.match(fe.line):
+ hunkhead = True
+ elif fe.line.startswith("--- "):
+ filenames = True
+ else:
+ headscan = True
+ # -- ------------------------------------
+
+ # read out header
+ if headscan:
+ while not fe.is_empty and not fe.line.startswith("--- "):
+ header.append(fe.line)
+ fe.next()
+ if fe.is_empty:
+ if p == None:
+ debug("no patch data found") # error is shown later
+ self.errors += 1
+ else:
+ info("%d unparsed bytes left at the end of stream" % len(''.join(header)))
+ self.warnings += 1
+ # TODO check for \No new line at the end..
+ # TODO test for unparsed bytes
+ # otherwise error += 1
+ # this is actually a loop exit
+ continue
+
+ headscan = False
+ # switch to filenames state
+ filenames = True
+
+ line = fe.line
+ lineno = fe.lineno
+
+
+ # hunkskip and hunkbody code skipped until definition of hunkhead is parsed
+ if hunkbody:
+ # [x] treat empty lines inside hunks as containing single space
+ # (this happens when diff is saved by copy/pasting to editor
+ # that strips trailing whitespace)
+ if line.strip("\r\n") == "":
+ debug("expanding empty line in a middle of hunk body")
+ self.warnings += 1
+ line = ' ' + line
+
+ # process line first
+ if re.match(r"^[- \+\\]", line):
+ # gather stats about line endings
+ if line.endswith("\r\n"):
+ p.hunkends["crlf"] += 1
+ elif line.endswith("\n"):
+ p.hunkends["lf"] += 1
+ elif line.endswith("\r"):
+ p.hunkends["cr"] += 1
+
+ if line.startswith("-"):
+ hunkactual["linessrc"] += 1
+ elif line.startswith("+"):
+ hunkactual["linestgt"] += 1
+ elif not line.startswith("\\"):
+ hunkactual["linessrc"] += 1
+ hunkactual["linestgt"] += 1
+ hunk.text.append(line)
+ # todo: handle \ No newline cases
+ else:
+ warning("invalid hunk no.%d at %d for target file %s" % (nexthunkno, lineno+1, p.target))
+ # add hunk status node
+ hunk.invalid = True
+ p.hunks.append(hunk)
+ self.errors += 1
+ # switch to hunkskip state
+ hunkbody = False
+ hunkskip = True
+
+ # check exit conditions
+ if hunkactual["linessrc"] > hunk.linessrc or hunkactual["linestgt"] > hunk.linestgt:
+ warning("extra lines for hunk no.%d at %d for target %s" % (nexthunkno, lineno+1, p.target))
+ # add hunk status node
+ hunk.invalid = True
+ p.hunks.append(hunk)
+ self.errors += 1
+ # switch to hunkskip state
+ hunkbody = False
+ hunkskip = True
+ elif hunk.linessrc == hunkactual["linessrc"] and hunk.linestgt == hunkactual["linestgt"]:
+ # hunk parsed successfully
+ p.hunks.append(hunk)
+ # switch to hunkparsed state
+ hunkbody = False
+ hunkparsed = True
+
+ # detect mixed window/unix line ends
+ ends = p.hunkends
+ if ((ends["cr"]!=0) + (ends["crlf"]!=0) + (ends["lf"]!=0)) > 1:
+ warning("inconsistent line ends in patch hunks for %s" % p.source)
+ self.warnings += 1
+ if debugmode:
+ debuglines = dict(ends)
+ debuglines.update(file=p.target, hunk=nexthunkno)
+ debug("crlf: %(crlf)d lf: %(lf)d cr: %(cr)d\t - file: %(file)s hunk: %(hunk)d" % debuglines)
+ # fetch next line
+ continue
+
+ if hunkskip:
+ if re_hunk_start.match(line):
+ # switch to hunkhead state
+ hunkskip = False
+ hunkhead = True
+ elif line.startswith("--- "):
+ # switch to filenames state
+ hunkskip = False
+ filenames = True
+ if debugmode and len(self.items) > 0:
+ debug("- %2d hunks for %s" % (len(p.hunks), p.source))
+
+ if filenames:
+ if line.startswith("--- "):
+ if srcname != None:
+ # XXX testcase
+ warning("skipping false patch for %s" % srcname)
+ srcname = None
+ # XXX header += srcname
+ # double source filename line is encountered
+ # attempt to restart from this second line
+ re_filename = "^--- ([^\t]+)"
+ match = re.match(re_filename, line)
+ # todo: support spaces in filenames
+ if match:
+ srcname = match.group(1).strip()
+ else:
+ warning("skipping invalid filename at line %d" % lineno)
+ self.errors += 1
+ # XXX p.header += line
+ # switch back to headscan state
+ filenames = False
+ headscan = True
+ elif not line.startswith("+++ "):
+ if srcname != None:
+ warning("skipping invalid patch with no target for %s" % srcname)
+ self.errors += 1
+ srcname = None
+ # XXX header += srcname
+ # XXX header += line
+ else:
+ # this should be unreachable
+ warning("skipping invalid target patch")
+ filenames = False
+ headscan = True
+ else:
+ if tgtname != None:
+ # XXX seems to be a dead branch
+ warning("skipping invalid patch - double target at line %d" % lineno)
+ self.errors += 1
+ srcname = None
+ tgtname = None
+ # XXX header += srcname
+ # XXX header += tgtname
+ # XXX header += line
+ # double target filename line is encountered
+ # switch back to headscan state
+ filenames = False
+ headscan = True
+ else:
+ re_filename = "^\+\+\+ ([^\t]+)"
+ match = re.match(re_filename, line)
+ if not match:
+ warning("skipping invalid patch - no target filename at line %d" % lineno)
+ self.errors += 1
+ srcname = None
+ # switch back to headscan state
+ filenames = False
+ headscan = True
+ else:
+ if p: # for the first run p is None
+ self.items.append(p)
+ p = Patch()
+ p.source = srcname
+ srcname = None
+ p.target = match.group(1).strip()
+ p.header = header
+ header = []
+ # switch to hunkhead state
+ filenames = False
+ hunkhead = True
+ nexthunkno = 0
+ p.hunkends = lineends.copy()
+ continue
+
+ if hunkhead:
+ match = re.match("^@@ -(\d+)(,(\d+))? \+(\d+)(,(\d+))?", line)
+ if not match:
+ if not p.hunks:
+ warning("skipping invalid patch with no hunks for file %s" % p.source)
+ self.errors += 1
+ # XXX review switch
+ # switch to headscan state
+ hunkhead = False
+ headscan = True
+ continue
+ else:
+ # TODO review condition case
+ # switch to headscan state
+ hunkhead = False
+ headscan = True
+ else:
+ hunk = Hunk()
+ hunk.startsrc = int(match.group(1))
+ hunk.linessrc = 1
+ if match.group(3): hunk.linessrc = int(match.group(3))
+ hunk.starttgt = int(match.group(4))
+ hunk.linestgt = 1
+ if match.group(6): hunk.linestgt = int(match.group(6))
+ hunk.invalid = False
+ hunk.text = []
+
+ hunkactual["linessrc"] = hunkactual["linestgt"] = 0
+
+ # switch to hunkbody state
+ hunkhead = False
+ hunkbody = True
+ nexthunkno += 1
+ continue
+
+ # /while fe.next()
+
+ if p:
+ self.items.append(p)
+
+ if not hunkparsed:
+ if hunkskip:
+ warning("warning: finished with errors, some hunks may be invalid")
+ elif headscan:
+ if len(self.items) == 0:
+ warning("error: no patch data found!")
+ return False
+ else: # extra data at the end of file
+ pass
+ else:
+ warning("error: patch stream is incomplete!")
+ self.errors += 1
+ if len(self.items) == 0:
+ return False
+
+ if debugmode and len(self.items) > 0:
+ debug("- %2d hunks for %s" % (len(p.hunks), p.source))
+
+ # XXX fix total hunks calculation
+ debug("total files: %d total hunks: %d" % (len(self.items),
+ sum(len(p.hunks) for p in self.items)))
+
+ # ---- detect patch and patchset types ----
+ for idx, p in enumerate(self.items):
+ self.items[idx].type = self._detect_type(p)
+
+ types = set([p.type for p in self.items])
+ if len(types) > 1:
+ self.type = MIXED
+ else:
+ self.type = types.pop()
+ # --------
+
+ self._normalize_filenames()
+
+ return (self.errors == 0)
+
+ def _detect_type(self, p):
+ """ detect and return type for the specified Patch object
+ analyzes header and filenames info
+
+ NOTE: must be run before filenames are normalized
+ """
+
+ # check for SVN
+ # - header starts with Index:
+ # - next line is ===... delimiter
+ # - filename is followed by revision number
+ # TODO add SVN revision
+ if (len(p.header) > 1 and p.header[-2].startswith("Index: ")
+ and p.header[-1].startswith("="*67)):
+ return SVN
+
+ # common checks for both HG and GIT
+ DVCS = ((p.source.startswith('a/') or p.source == '/dev/null')
+ and (p.target.startswith('b/') or p.target == '/dev/null'))
+
+ # GIT type check
+ # - header[-2] is like "diff --git a/oldname b/newname"
+ # - header[-1] is like "index <hash>..<hash> <mode>"
+ # TODO add git rename diffs and add/remove diffs
+ # add git diff with spaced filename
+ # TODO http://www.kernel.org/pub/software/scm/git/docs/git-diff.html
+
+ # detect the start of diff header - there might be some comments before
+ if len(p.header) > 1:
+ for idx in reversed(range(len(p.header))):
+ if p.header[idx].startswith("diff --git"):
+ break
+ if re.match(r'diff --git a/[\w/.]+ b/[\w/.]+', p.header[idx]):
+ if (idx+1 < len(p.header)
+ and re.match(r'index \w{7}..\w{7} \d{6}', p.header[idx+1])):
+ if DVCS:
+ return GIT
+
+ # HG check
+ #
+ # - for plain HG format header is like "diff -r b2d9961ff1f5 filename"
+ # - for Git-style HG patches it is "diff --git a/oldname b/newname"
+ # - filename starts with a/, b/ or is equal to /dev/null
+ # - exported changesets also contain the header
+ # # HG changeset patch
+ # # User name@example.com
+ # ...
+ # TODO add MQ
+ # TODO add revision info
+ if len(p.header) > 0:
+ if DVCS and re.match(r'diff -r \w{12} .*', p.header[-1]):
+ return HG
+ if DVCS and p.header[-1].startswith('diff --git a/'):
+ if len(p.header) == 1: # native Git patch header len is 2
+ return HG
+ elif p.header[0].startswith('# HG changeset patch'):
+ return HG
+
+ return PLAIN
+
+
+ def _normalize_filenames(self):
+ """ sanitize filenames, normalizing paths, i.e.:
+ 1. strip a/ and b/ prefixes from GIT and HG style patches
+ 2. remove all references to parent directories (with warning)
+ 3. translate any absolute paths to relative (with warning)
+
+ [x] always use forward slashes to be crossplatform
+ (diff/patch were born as a unix utility after all)
+
+ return None
+ """
+ for i,p in enumerate(self.items):
+ if p.type in (HG, GIT):
+ # TODO: figure out how to deal with /dev/null entries
+ debug("stripping a/ and b/ prefixes")
+ if p.source != '/dev/null':
+ if not p.source.startswith("a/"):
+ warning("invalid source filename")
+ else:
+ p.source = p.source[2:]
+ if p.target != '/dev/null':
+ if not p.target.startswith("b/"):
+ warning("invalid target filename")
+ else:
+ p.target = p.target[2:]
+
+ p.source = xnormpath(p.source)
+ p.target = xnormpath(p.target)
+
+ sep = '/' # sep value can be hardcoded, but it looks nice this way
+
+ # references to parent are not allowed
+ if p.source.startswith(".." + sep):
+ warning("error: stripping parent path for source file patch no.%d" % (i+1))
+ self.warnings += 1
+ while p.source.startswith(".." + sep):
+ p.source = p.source.partition(sep)[2]
+ if p.target.startswith(".." + sep):
+ warning("error: stripping parent path for target file patch no.%d" % (i+1))
+ self.warnings += 1
+ while p.target.startswith(".." + sep):
+ p.target = p.target.partition(sep)[2]
+ # absolute paths are not allowed
+ if xisabs(p.source) or xisabs(p.target):
+ warning("error: absolute paths are not allowed - file no.%d" % (i+1))
+ self.warnings += 1
+ if xisabs(p.source):
+ warning("stripping absolute path from source name '%s'" % p.source)
+ p.source = xstrip(p.source)
+ if xisabs(p.target):
+ warning("stripping absolute path from target name '%s'" % p.target)
+ p.target = xstrip(p.target)
+
+ self.items[i].source = p.source
+ self.items[i].target = p.target
+
+
+ def diffstat(self):
+ """ calculate diffstat and return as a string
+ Notes:
+ - original diffstat ouputs target filename
+ - single + or - shouldn't escape histogram
+ """
+ names = []
+ insert = []
+ delete = []
+ namelen = 0
+ maxdiff = 0 # max number of changes for single file
+ # (for histogram width calculation)
+ for patch in self.items:
+ i,d = 0,0
+ for hunk in patch.hunks:
+ for line in hunk.text:
+ if line.startswith('+'):
+ i += 1
+ elif line.startswith('-'):
+ d += 1
+ names.append(patch.target)
+ insert.append(i)
+ delete.append(d)
+ namelen = max(namelen, len(patch.target))
+ maxdiff = max(maxdiff, i+d)
+ output = ''
+ statlen = len(str(maxdiff)) # stats column width
+ for i,n in enumerate(names):
+ # %-19s | %-4d %s
+ format = " %-" + str(namelen) + "s | %" + str(statlen) + "s %s\n"
+
+ hist = ''
+ # -- calculating histogram --
+ width = len(format % ('', '', ''))
+ histwidth = max(2, 80 - width)
+ if maxdiff < histwidth:
+ hist = "+"*insert[i] + "-"*delete[i]
+ else:
+ iratio = (float(insert[i]) / maxdiff) * histwidth
+ dratio = (float(delete[i]) / maxdiff) * histwidth
+
+ # make sure every entry gets at least one + or -
+ iwidth = 1 if 0 < iratio < 1 else int(iratio)
+ dwidth = 1 if 0 < dratio < 1 else int(dratio)
+ #print iratio, dratio, iwidth, dwidth, histwidth
+ hist = "+"*int(iwidth) + "-"*int(dwidth)
+ # -- /calculating +- histogram --
+ output += (format % (names[i], insert[i] + delete[i], hist))
+
+ output += (" %d files changed, %d insertions(+), %d deletions(-)"
+ % (len(names), sum(insert), sum(delete)))
+ return output
+
+
+ def apply(self, strip=0):
+ """ apply parsed patch
+ return True on success
+ """
+
+ total = len(self.items)
+ errors = 0
+ if strip:
+ # [ ] test strip level exceeds nesting level
+ # [ ] test the same only for selected files
+ # [ ] test if files end up being on the same level
+ try:
+ strip = int(strip)
+ except ValueError:
+ errors += 1
+ warning("error: strip parameter '%s' must be an integer" % strip)
+ strip = 0
+
+ #for fileno, filename in enumerate(self.source):
+ for i,p in enumerate(self.items):
+ f2patch = p.source
+ if strip:
+ debug("stripping %s leading component from '%s'" % (strip, f2patch))
+ f2patch = pathstrip(f2patch, strip)
+ if not exists(f2patch):
+ f2patch = p.target
+ if strip:
+ debug("stripping %s leading component from '%s'" % (strip, f2patch))
+ f2patch = pathstrip(f2patch, strip)
+ if not exists(f2patch):
+ warning("source/target file does not exist\n--- %s\n+++ %s" % (p.source, f2patch))
+ errors += 1
+ continue
+ if not isfile(f2patch):
+ warning("not a file - %s" % f2patch)
+ errors += 1
+ continue
+ filename = f2patch
+
+ debug("processing %d/%d:\t %s" % (i+1, total, filename))
+
+ # validate before patching
+ f2fp = open(filename)
+ hunkno = 0
+ hunk = p.hunks[hunkno]
+ hunkfind = []
+ hunkreplace = []
+ validhunks = 0
+ canpatch = False
+ for lineno, line in enumerate(f2fp):
+ if lineno+1 < hunk.startsrc:
+ continue
+ elif lineno+1 == hunk.startsrc:
+ hunkfind = [x[1:].rstrip("\r\n") for x in hunk.text if x[0] in " -"]
+ hunkreplace = [x[1:].rstrip("\r\n") for x in hunk.text if x[0] in " +"]
+ #pprint(hunkreplace)
+ hunklineno = 0
+
+ # todo \ No newline at end of file
+
+ # check hunks in source file
+ if lineno+1 < hunk.startsrc+len(hunkfind)-1:
+ if line.rstrip("\r\n") == hunkfind[hunklineno]:
+ hunklineno+=1
+ else:
+ info("file %d/%d:\t %s" % (i+1, total, filename))
+ info(" hunk no.%d doesn't match source file at line %d" % (hunkno+1, lineno))
+ info(" expected: %s" % hunkfind[hunklineno])
+ info(" actual : %s" % line.rstrip("\r\n"))
+ # not counting this as error, because file may already be patched.
+ # check if file is already patched is done after the number of
+ # invalid hunks if found
+ # TODO: check hunks against source/target file in one pass
+ # API - check(stream, srchunks, tgthunks)
+ # return tuple (srcerrs, tgterrs)
+
+ # continue to check other hunks for completeness
+ hunkno += 1
+ if hunkno < len(p.hunks):
+ hunk = p.hunks[hunkno]
+ continue
+ else:
+ break
+
+ # check if processed line is the last line
+ if lineno+1 == hunk.startsrc+len(hunkfind)-1:
+ debug(" hunk no.%d for file %s -- is ready to be patched" % (hunkno+1, filename))
+ hunkno+=1
+ validhunks+=1
+ if hunkno < len(p.hunks):
+ hunk = p.hunks[hunkno]
+ else:
+ if validhunks == len(p.hunks):
+ # patch file
+ canpatch = True
+ break
+ else:
+ if hunkno < len(p.hunks):
+ warning("premature end of source file %s at hunk %d" % (filename, hunkno+1))
+ errors += 1
+
+ f2fp.close()
+
+ if validhunks < len(p.hunks):
+ if self._match_file_hunks(filename, p.hunks):
+ warning("already patched %s" % filename)
+ else:
+ warning("source file is different - %s" % filename)
+ errors += 1
+ if canpatch:
+ backupname = filename+".orig"
+ if exists(backupname):
+ warning("can't backup original file to %s - aborting" % backupname)
+ else:
+ import shutil
+ shutil.move(filename, backupname)
+ if self.write_hunks(backupname, filename, p.hunks):
+ info("successfully patched %d/%d:\t %s" % (i+1, total, filename))
+ os.unlink(backupname)
+ else:
+ errors += 1
+ warning("error patching file %s" % filename)
+ shutil.copy(filename, filename+".invalid")
+ warning("invalid version is saved to %s" % filename+".invalid")
+ # todo: proper rejects
+ shutil.move(backupname, filename)
+
+ # todo: check for premature eof
+ return (errors == 0)
+
+
+ def can_patch(self, filename):
+ """ Check if specified filename can be patched. Returns None if file can
+ not be found among source filenames. False if patch can not be applied
+ clearly. True otherwise.
+
+ :returns: True, False or None
+ """
+ filename = abspath(filename)
+ for p in self.items:
+ if filename == abspath(p.source):
+ return self._match_file_hunks(filename, p.hunks)
+ return None
+
+
+ def _match_file_hunks(self, filepath, hunks):
+ matched = True
+ fp = open(abspath(filepath))
+
+ class NoMatch(Exception):
+ pass
+
+ lineno = 1
+ line = fp.readline()
+ hno = None
+ try:
+ for hno, h in enumerate(hunks):
+ # skip to first line of the hunk
+ while lineno < h.starttgt:
+ if not len(line): # eof
+ debug("check failed - premature eof before hunk: %d" % (hno+1))
+ raise NoMatch
+ line = fp.readline()
+ lineno += 1
+ for hline in h.text:
+ if hline.startswith("-"):
+ continue
+ if not len(line):
+ debug("check failed - premature eof on hunk: %d" % (hno+1))
+ # todo: \ No newline at the end of file
+ raise NoMatch
+ if line.rstrip("\r\n") != hline[1:].rstrip("\r\n"):
+ debug("file is not patched - failed hunk: %d" % (hno+1))
+ raise NoMatch
+ line = fp.readline()
+ lineno += 1
+
+ except NoMatch:
+ matched = False
+ # todo: display failed hunk, i.e. expected/found
+
+ fp.close()
+ return matched
+
+
+ def patch_stream(self, instream, hunks):
+ """ Generator that yields stream patched with hunks iterable
+
+ Converts lineends in hunk lines to the best suitable format
+ autodetected from input
+ """
+
+ # todo: At the moment substituted lineends may not be the same
+ # at the start and at the end of patching. Also issue a
+ # warning/throw about mixed lineends (is it really needed?)
+
+ hunks = iter(hunks)
+
+ srclineno = 1
+
+ lineends = {'\n':0, '\r\n':0, '\r':0}
+ def get_line():
+ """
+ local utility function - return line from source stream
+ collecting line end statistics on the way
+ """
+ line = instream.readline()
+ # 'U' mode works only with text files
+ if line.endswith("\r\n"):
+ lineends["\r\n"] += 1
+ elif line.endswith("\n"):
+ lineends["\n"] += 1
+ elif line.endswith("\r"):
+ lineends["\r"] += 1
+ return line
+
+ for hno, h in enumerate(hunks):
+ debug("hunk %d" % (hno+1))
+ # skip to line just before hunk starts
+ while srclineno < h.startsrc:
+ yield get_line()
+ srclineno += 1
+
+ for hline in h.text:
+ # todo: check \ No newline at the end of file
+ if hline.startswith("-") or hline.startswith("\\"):
+ get_line()
+ srclineno += 1
+ continue
+ else:
+ if not hline.startswith("+"):
+ get_line()
+ srclineno += 1
+ line2write = hline[1:]
+ # detect if line ends are consistent in source file
+ if sum([bool(lineends[x]) for x in lineends]) == 1:
+ newline = [x for x in lineends if lineends[x] != 0][0]
+ yield line2write.rstrip("\r\n")+newline
+ else: # newlines are mixed
+ yield line2write
+
+ for line in instream:
+ yield line
+
+
+ def write_hunks(self, srcname, tgtname, hunks):
+ src = open(srcname, "rb")
+ tgt = open(tgtname, "wb")
+
+ debug("processing target file %s" % tgtname)
+
+ tgt.writelines(self.patch_stream(src, hunks))
+
+ tgt.close()
+ src.close()
+ # [ ] TODO: add test for permission copy
+ shutil.copymode(srcname, tgtname)
+ return True
+
+
+
+if __name__ == "__main__":
+ from optparse import OptionParser
+ from os.path import exists
+ import sys
+
+ opt = OptionParser(usage="1. %prog [options] unified.diff\n"
+ " 2. %prog [options] http://host/patch\n"
+ " 3. %prog [options] -- < unified.diff",
+ version="python-patch %s" % __version__)
+ opt.add_option("-q", "--quiet", action="store_const", dest="verbosity",
+ const=0, help="print only warnings and errors", default=1)
+ opt.add_option("-v", "--verbose", action="store_const", dest="verbosity",
+ const=2, help="be verbose")
+ opt.add_option("--debug", action="store_true", dest="debugmode", help="debug mode")
+ opt.add_option("--diffstat", action="store_true", dest="diffstat",
+ help="print diffstat and exit")
+ opt.add_option("-p", "--strip", type="int", metavar='N', default=0,
+ help="strip N path components from filenames")
+ (options, args) = opt.parse_args()
+
+ if not args and sys.argv[-1:] != ['--']:
+ opt.print_version()
+ opt.print_help()
+ sys.exit()
+ readstdin = (sys.argv[-1:] == ['--'] and not args)
+
+ debugmode = options.debugmode
+
+ verbosity_levels = {0:logging.WARNING, 1:logging.INFO, 2:logging.DEBUG}
+ loglevel = verbosity_levels[options.verbosity]
+ logformat = "%(message)s"
+ if debugmode:
+ loglevel = logging.DEBUG
+ logformat = "%(levelname)8s %(message)s"
+ logger.setLevel(loglevel)
+ loghandler = logging.StreamHandler()
+ loghandler.setFormatter(logging.Formatter(logformat))
+ logger.addHandler(loghandler)
+
+
+ if readstdin:
+ patch = PatchSet(sys.stdin)
+ else:
+ patchfile = args[0]
+ urltest = patchfile.split(':')[0]
+ if (':' in patchfile and urltest.isalpha()
+ and len(urltest) > 1): # one char before : is a windows drive letter
+ patch = fromurl(patchfile)
+ else:
+ if not exists(patchfile) or not isfile(patchfile):
+ sys.exit("patch file does not exist - %s" % patchfile)
+ patch = fromfile(patchfile)
+
+ if options.diffstat:
+ print patch.diffstat()
+ sys.exit(0)
+
+ #pprint(patch)
+ patch.apply(options.strip) or sys.exit(-1)
+
+ # todo: document and test line ends handling logic - patch.py detects proper line-endings
+ # for inserted hunks and issues a warning if patched file has incosistent line ends