#!/usr/bin/env python2 # Copyright 2011 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 import curses.ascii import itertools import optparse import os import re import subprocess import sys import textwrap import xml.etree sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'third_party', 'pybugz-0.9.3')) import bugz.bugzilla import portage.versions from common import Bug, chunks def unicode_sanitize(text): """Converts a possibly unicode text to a regular string.""" if type(text) == unicode: real_output = text else: real_output = unicode(text, errors='replace') return real_output.encode("utf-8") class TermTooSmall(Exception): pass class BugQueue: def __init__(self): self.__bug_list = [] self.__bug_set = set() def add_bug(self, bug): if self.has_bug(bug): return self.__bug_list.append(bug) self.__bug_set.add(bug.id_number()) def has_bug(self, bug): return bug.id_number() in self.__bug_set def generate_stabilization_list(self): result = [] for bug in self.__bug_list: result.append("# Bug %d: %s\n" % (bug.id_number(), bug.summary())) for cpv in bug.cpvs(): result.append("=" + cpv + "\n") return ''.join(result) # Main class (called with curses.wrapper later). class MainWindow: def __init__(self, screen, bugs, bugs_dict, related_bugs, repoman_dict, bug_queue): self.bugs = bugs self.bugs_dict = bugs_dict self.related_bugs = related_bugs self.repoman_dict = repoman_dict self.bug_queue = bug_queue curses.curs_set(0) self.screen = screen curses.use_default_colors() self.init_screen() c = self.screen.getch() while c not in (ord("q"), curses.ascii.ESC): if c == ord("j"): self.scroll_bugs_pad(1) elif c == ord("k"): self.scroll_bugs_pad(-1) elif c == curses.KEY_DOWN: self.scroll_contents_pad(1) elif c == curses.KEY_UP: self.scroll_contents_pad(-1) elif c == curses.KEY_RESIZE: self.init_screen() elif c == ord("a"): self.add_bug_to_queue() c = self.screen.getch() def bug_for_id(self, bug_id): if bug_id in self.bugs_dict: return self.bugs_dict[bug_id] return Bug(id_number=bug_id, summary='(summary unavailable)', status='UNKNOWN') def init_screen(self): (self.height, self.width) = self.screen.getmaxyx() self.bugs_pad_width = self.width / 3 - 1 self.contents_pad_width = self.width - self.bugs_pad_width - 1 if self.height < 12 or self.width < 80: raise TermTooSmall() self.screen.border() self.screen.vline(1, self.bugs_pad_width + 1, curses.ACS_VLINE, self.height - 2) self.screen.refresh() self.fill_bugs_pad() self.refresh_bugs_pad() self.fill_contents_pad() self.refresh_contents_pad() def fill_bugs_pad(self): self.bugs_pad = curses.newpad(len(self.bugs),self.width) self.bugs_pad.erase() self.bugs_pad_pos = 0 for i in range(len(self.bugs)): self.bugs_pad.addstr(i, 0, unicode_sanitize(" %d %s" % (self.bugs[i].id_number(), self.bugs[i].summary()))) def scroll_bugs_pad(self, amount): height = len(self.bugs) self.bugs_pad_pos += amount if self.bugs_pad_pos < 0: self.bugs_pad_pos = 0 if self.bugs_pad_pos >= height: self.bugs_pad_pos = height - 1 self.refresh_bugs_pad() self.fill_contents_pad() self.refresh_contents_pad() def refresh_bugs_pad(self): (height, width) = self.bugs_pad.getmaxyx() for i in range(height): self.bugs_pad.addstr(i, 0, " ") if self.bug_queue.has_bug(self.bugs[i]): self.bugs_pad.addch(i, 2, "+") self.bugs_pad.addch(self.bugs_pad_pos, 0, "*") pos = min(height - self.height + 2, max(0, self.bugs_pad_pos - (self.height / 2))) self.bugs_pad.refresh( pos, 0, 1, 1, self.height - 2, self.bugs_pad_width) def fill_contents_pad(self): bug = self.bugs[self.bugs_pad_pos] output = [] output += textwrap.wrap(bug.summary(), width=self.contents_pad_width-2) output.append("-" * (self.contents_pad_width - 2)) cpvs = bug.cpvs() if cpvs: output += textwrap.wrap("Found package cpvs:", width=self.contents_pad_width-2) for cpv in cpvs: output += textwrap.wrap(cpv, width=self.contents_pad_width-2) output += textwrap.wrap("Press 'a' to add them to the stabilization queue.", width=self.contents_pad_width-2) output.append("-" * (self.contents_pad_width - 2)) deps = [self.bug_for_id(dep_id) for dep_id in bug.depends_on()] if deps: output += textwrap.wrap("Depends on:", width=self.contents_pad_width-2) for dep in deps: desc = "%d %s %s" % (dep.id_number(), dep.status(), dep.summary()) output += textwrap.wrap(desc, width=self.contents_pad_width-2) output.append("-" * (self.contents_pad_width - 2)) related = self.related_bugs[bug.id_number()] if related: output += textwrap.wrap("Related bugs:", width=self.contents_pad_width-2) for related_bug in related: if str(related_bug['bugid']) == str(bug.id_number()): continue desc = related_bug['bugid'] + " " + related_bug['desc'] output += textwrap.wrap(desc, width=self.contents_pad_width-2) output.append("-" * (self.contents_pad_width - 2)) if bug.id_number() in repoman_dict and repoman_dict[bug.id_number()]: output += textwrap.wrap("Repoman output:", width=self.contents_pad_width-2) lines = repoman_dict[bug.id_number()].split("\n") for line in lines: output += textwrap.wrap(line, width=self.contents_pad_width-2) output.append("-" * (self.contents_pad_width - 2)) for comment in bug.comments(): for line in comment.split("\n"): output += textwrap.wrap(line, width=self.contents_pad_width-2) output.append("-" * (self.contents_pad_width - 2)) self.contents_pad_length = len(output) self.contents_pad = curses.newpad(max(self.contents_pad_length, self.height), self.contents_pad_width) self.contents_pad.erase() self.contents_pad_pos = 0 for i in range(len(output)): self.contents_pad.addstr(i, 0, unicode_sanitize(output[i])) def scroll_contents_pad(self, amount): height = self.contents_pad_length - self.height + 3 self.contents_pad_pos += amount if self.contents_pad_pos < 0: self.contents_pad_pos = 0 if self.contents_pad_pos >= height: self.contents_pad_pos = height - 1 self.refresh_contents_pad() def refresh_contents_pad(self): self.contents_pad.refresh( self.contents_pad_pos, 0, 1, self.bugs_pad_width + 2, self.height - 2, self.width - 2) self.screen.refresh() def add_bug_to_queue(self): bug = self.bugs[self.bugs_pad_pos] # For now we only support auto-detected CPVs. if not bug.cpvs(): return self.bug_queue.add_bug(bug) self.refresh_bugs_pad() if __name__ == "__main__": parser = optparse.OptionParser() parser.add_option("--arch", dest="arch", help="Gentoo arch to use, e.g. x86, amd64, ...") parser.add_option("-o", "--output", dest="output_filename", default="package.keywords", help="Output filename for generated package.keywords file [default=%default]") parser.add_option("--repo", dest="repo", help="Path to portage CVS repository") parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False, help="Include more output, e.g. related bugs") parser.add_option("--security", dest="security", action="store_true", default=False, help="Restrict search to security bugs.") (options, args) = parser.parse_args() if not options.arch: parser.error("--arch option is required") if args: parser.error("unrecognized command-line args") bug_queue = BugQueue() bugzilla = bugz.bugzilla.Bugz('http://bugs.gentoo.org', skip_auth=True) print "Searching for arch bugs..." criteria = { 'cc': '%s@gentoo.org' % options.arch, 'keywords': 'STABLEREQ', 'status': None } if options.security: criteria['assigned_to'] = 'security@gentoo.org' bugs = [] raw_bugs = bugzilla.search("", **criteria) for chunk in chunks(raw_bugs, 100): bugs += [Bug(xml) for xml in bugzilla.get([bug['bugid'] for bug in chunk]).findall("bug")] if not bugs: print 'The bug list is empty. Exiting.' sys.exit(0) dep_bug_ids = [] bugs_dict = {} related_bugs = {} repoman_dict = {} for bug in bugs: print "Processing bug %d: %s" % (bug.id_number(), bug.summary()) bugs_dict[bug.id_number()] = bug related_bugs[bug.id_number()] = [] repoman_dict[bug.id_number()] = "" bug.detect_cpvs() for cpv in bug.cpvs(): pv = portage.versions.cpv_getkey(cpv) if options.verbose: related_bugs[bug.id_number()] += bugzilla.search(pv, status=None) if options.repo: to_restore = {} try: output = repoman_dict[bug.id_number()] for cpv in bug.cpvs(): pv = portage.versions.cpv_getkey(cpv) cvs_path = os.path.join(options.repo, pv) ebuild_name = portage.versions.catsplit(cpv)[1] + ".ebuild" ebuild_path = os.path.join(cvs_path, ebuild_name) manifest_path = os.path.join(cvs_path, 'Manifest') if os.path.exists(ebuild_path): if ebuild_path not in to_restore: to_restore[ebuild_path] = open(ebuild_path).read() if manifest_path not in to_restore: to_restore[manifest_path] = open(manifest_path).read() output += subprocess.Popen(["ekeyword", options.arch, ebuild_name], cwd=cvs_path, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()[0] # repoman manifest may fail if files are unfetchable. It shouldn't abort this script. output += subprocess.Popen(["repoman", "manifest"], cwd=cvs_path, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()[0] pvs = list(set([portage.versions.cpv_getkey(cpv) for cpv in bug.cpvs()])) for pv in pvs: cvs_path = os.path.join(options.repo, pv) output += subprocess.Popen(["repoman", "full"], cwd=cvs_path, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()[0] repoman_dict[bug.id_number()] = output finally: for path in to_restore: with open(path, "w") as f: f.write(to_restore[path]) dep_bug_ids += bug.depends_on() dep_bug_ids = list(set(dep_bug_ids)) dep_bugs = [Bug(xml) for xml in bugzilla.get(dep_bug_ids).findall("bug")] for bug in dep_bugs: bugs_dict[bug.id_number()] = bug try: curses.wrapper(MainWindow, bugs=bugs, bugs_dict=bugs_dict, related_bugs=related_bugs, repoman_dict=repoman_dict, bug_queue=bug_queue) except TermTooSmall: print "Your terminal window is too small, please try to enlarge it" sys.exit(1) stabilization_list = bug_queue.generate_stabilization_list() if stabilization_list: with open(options.output_filename, "w") as f: f.write(stabilization_list) print "Writing stabilization list to %s" % options.output_filename