You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
pacgraph/pacgraph

1156 lines
41 KiB

#! /usr/bin/env python3
import io, os, copy, math, codecs, tarfile, optparse, subprocess
from math import cos, sin, atan2, hypot
from random import random, randint
from itertools import *
from collections import deque, defaultdict
tau = math.pi * 2
pj = os.path.join
# depends contains %CONFLICTS%, %DEPENDS%, %OPTDEPENDS%, %PROVIDES%
# desc contains %URL%, %REPLACES%, %LICENSE%,
# %NAME%, %GROUPS%, %BUILDDATE%, %REASON%, %DESC%,
# %SIZE%, %PACKAGER%, %ARCH%, %INSTALLDATE%, %VERSION%
class Node(object):
def __init__(self, **kwargs):
self.name = '' # matches dict key
self.links = set() # of name strings
self.inverse = set() # of name strings
self.size = 0 # bytes, usually
self.font_pt = 0 # scaled from size
self.explicit = False # True if explicitly installed package
self.center = (0,0) # (x,y)
self.color = '#000000'
self.__dict__.update(kwargs)
def __repr__(self):
return repr({'links' : self.links,
'size' : self.size})
# cache the slowest of these?
@property
def all(self):
return self.links | self.inverse
@property
def dim(self):
return pt2dim(self.name, self.font_pt)
@property
def box(self):
return bbox(self.center, self.dim)
@property
def cd(self):
# phase this out
return [self.center, self.dim]
class MockOptions(object):
def __init__(self):
self.opt_deps = False
def unrip(path):
"path -> dict of Nodes"
tree = eval(open(path).read())
return bilink_tree(dict((p,Node(**n)) for p,n in list(tree.items())))
class Arch(object):
def clean(self, n):
n = n.strip()
for c in '><:=':
n = n.partition(c)[0]
return n
def load_info(self, arch_file):
info = defaultdict(list)
mode = None
for line in (self.clean(l) for l in arch_file):
if not line:
continue
if line.startswith('%'):
mode = line
continue
info[mode].append(line)
arch_file.close()
return info
def strip_info(self, info):
keep = set(['DEPENDS', 'OPTDEPENDS', 'PROVIDES', 'SIZE', 'ISIZE', 'REASON'])
info = dict((k.strip('%'),v) for k,v in info.items())
name = info['NAME'][0]
info = dict((k,v) for k,v in info.items() if k in keep)
if 'ISIZE' in info:
info['SIZE'] = info['ISIZE']
if 'SIZE' in info:
info['SIZE'] = int(info['SIZE'][0], 10)
else:
info['SIZE'] = 0
if 'REASON' in info:
info['REASON'] = info['REASON'] == '1'
else:
info['REASON'] = False
return name, info
def load_tarball(self, tgz):
db = tarfile.open(tgz)
packages = defaultdict(dict)
for member in db.getmembers():
if not member.isfile():
continue
path = member.name
#raw = codecs.getreader('utf8')(db.extractfile(member))
raw = io.StringIO(db.extractfile(member).read().decode('utf8'))
name,sub = os.path.split(path)
packages[name][sub] = raw
tree = {}
for p in packages.values():
info = {}
[info.update(self.load_info(v)) for v in list(p.values())]
name,info = self.strip_info(info)
tree[name] = info
return tree
def load_tree(self, dirs):
dirs = set(filter(os.path.isdir, dirs))
packages = set(p for r in dirs for p,d,f in os.walk(r) if f)
tree = {}
for p in packages - dirs:
try:
info = {}
if os.path.isfile(pj(p, 'depends')):
arch_file = open(pj(p,'depends'), 'r')
info.update(self.load_info(arch_file))
arch_file = open(pj(p,'desc'), 'r')
info.update(self.load_info(arch_file))
name, info = self.strip_info(info)
tree[name] = info
except KeyboardInterrupt:
raise
except:
print('Error reading package', p)
return tree
def actually_installed_fn(self, tree):
"use only on load_tree data, returns function"
p_set = set(tree)
errors = {}
p_tree = dict((p,tree[p]['PROVIDES']) for p in tree if 'PROVIDES' in tree[p])
def actually(packages):
installed = set(packages) & p_set
maybe = set(packages) - installed
for pack in maybe:
if pack in errors:
installed.add(errors[pack])
continue
provides = sorted(p for p in p_tree if pack in p_tree[p])
if len(provides) > 1:
print('warning: %s found in %s, assuming %s' % (pack, provides, provides[0]))
errors[pack] = provides[0]
if provides:
installed.add(provides[0])
# len 0 means not installed optdep
return list(installed)
return actually
def merge_tree(self, tree):
"merge provides, depends, optdepends"
options, args = parse()
tree2 = {}
actually_installed = self.actually_installed_fn(tree)
# merge
for p,info in tree.items():
tp = defaultdict(list, info)
deps = tp['DEPENDS']
if options.opt_deps:
deps += tp['OPTDEPENDS']
# remove unused optdeps
deps = actually_installed(deps)
tree2[p] = Node(name=p, size=info['SIZE'], links=set(deps), explicit=info['REASON'])
return tree2
def dbpath(self):
"super hacky"
if not os.path.isfile('/etc/pacman.conf'):
return '/var/lib/pacman/'
for line in open('/etc/pacman.conf'):
line = line.strip()
if not line.startswith('DBPath'):
continue
return line.partition('=')[2].strip()
return '/var/lib/pacman/'
def local_load(self):
dbpath = self.dbpath()
dirs = ['local']
dirs = [pj(dbpath, d) for d in dirs]
tree = compress_chains(bilink_tree(self.merge_tree(self.load_tree(dirs))))
return legal_links(tree)
def repo_load(self):
options, args = parse()
packages = args
dbpath = self.dbpath()
dirs = ['community', 'core', 'extra', 'multilib']
dirs = [pj(dbpath, 'sync', d) for d in dirs]
tars = ['community', 'core', 'extra', 'multilib']
tars = [pj(dbpath, 'sync', '%s.db' % t) for t in tars]
#tars = [pj(dbpath, '%s.db.tar.gz' % t) for t in tars]
tars = [t for t in tars if os.path.isfile(t)]
if tars:
tree = {}
[tree.update(self.load_tarball(t)) for t in tars]
else:
tree = self.load_tree(dirs)
tree = self.merge_tree(tree)
if not packages:
return legal_links(compress_chains(bilink_tree(tree)))
reqs = set()
if options.show_req:
reqs = set(k for k,v in tree.items() if set(packages) & v.links)
deps = set()
for p in set(packages) | reqs:
deps.update(full_deps(p, tree))
tree2 = dict((k,v) for k,v in tree.items() if k in deps | reqs)
return legal_links(bilink_tree(tree2))
class Frugal(Arch):
def dbpath(self):
"super hacky"
if not os.path.isfile('/etc/pacman-g2.conf'):
return '/var/lib/pacman-g2/'
for line in open('/etc/pacman-g2.conf'):
line = line.strip()
if not line.startswith('DBPath'):
continue
return line.partition('=')[2].strip()
return '/var/lib/pacman-g2/'
class Debian(object):
"Expect lots of 'error: unknown' messages. May be due to pseudo-dependencies being filled by 'Provides:' entries."
def load_tree(self, status_path = None):
"New implementation not extensively tested, your mileage may vary"
tree = {}
installed = False
deps = set()
name = ''
size = 0
if status_path is None:
status_path = '/var/lib/dpkg/status'
data = open(status_path, 'r')
for line in data:
if line.startswith('Package:'):
name = line.split(':')[1].strip()
elif line.startswith('Status'):
installed = ('install ok installed' in line) or ('install user installed' in line)
elif line.startswith('Installed-Size:') or line.startswith('Size:'):
size = int(line.split(':')[1].strip(), 10) * 1024
elif line.startswith('Depends:'):
deps = line.partition(':')[2]
deps = [d for e in deps.split(',') for d in e.split('|')]
deps = [d.partition('(')[0].strip() for d in deps]
deps = set(deps) - set('')
elif line == '\n' and installed and name and size:
tree[name] = Node(name = name, links = deps, size = size)
installed = False
deps = set()
name = ''
size = 0
data.close()
return tree
def local_load(self, status_path=None):
tree = legal_links(self.load_tree(status_path))
tree = compress_chains(bilink_tree(tree))
return legal_links(tree)
def repo_load(self):
print('not implemented')
raise
class Redhat(object):
"RPM uses a capability/provider model. load_tree() has room for improvement"
def load_tree(self):
stdout = call_stdout('rpm --query --all --queryformat "%{NAME}<(^_^)>%{SIZE}<(^_^)>[%{REQUIRENAME}, ]<(^_^)>[%{PROVIDENAME}, ]\n"')
tree = {}
requirer_lookup = {}
provider_lookup = {}
for line in stdout:
if line == '':
break
name,nullo,line = line.partition('<(^_^)>')
size,nullo,line = line.partition('<(^_^)>')
size = int(size, 10)
requires,nullo,provides = line.partition('<(^_^)>')
requires = set(capability.strip() for capability in requires.split(',')) - set([''])
for capability in requires:
if capability not in requirer_lookup:
requirer_lookup[capability] = set()
requirer_lookup[capability] |= set([name])
provides = set(capability.strip() for capability in provides.split(',')) - set([''])
for capability in provides:
if capability not in provider_lookup:
provider_lookup[capability] = set()
provider_lookup[capability].add(name)
if name in tree:
tree[name].size += size
else:
tree[name] = Node(name = name, links = set(), size = size)
for capability in set(requirer_lookup) & set(provider_lookup):
for package in requirer_lookup[capability]:
tree[package].links |= provider_lookup[capability]
return tree
def local_load(self):
tree = legal_links(self.load_tree())
tree = compress_chains(bilink_tree(tree))
return legal_links(tree)
def repo_load(self):
print('not implemented')
raise
class Crux(object):
"based entirely on docs, needs to be tested"
def find_all(self, package):
raw = call_stdout('pkginfo -i')
for line in raw.split('\n'):
yield line.split(' ')[0]
def find_size(self, package):
return call_stdout('pkgsize %s' % package)
def find_deps(self, package):
return call_stdout('finddeps %s' % package)
def local_load(self):
"unfinished"
tree = {}
all_packages = []
for name in all_packages:
size = self.find_size(name)
deps = self.find_deps(name)
tree[name] = Node(name=name, size=size, links=set(deps))
return tree
class Gentoo(object):
def get_name(self, pkg):
name = pkg.strip()
name = pkg.partition('/')[2]
ver = name.rfind('-')
if name[ver+1] == 'r':
name = name[:name.rfind('-', 0, ver)]
else:
name = name[:ver]
return name
def load_tree(self):
print("Getting packages size")
tree = {}
out = call_stdout('qsize --bytes --nocolor --all')
for line in out:
if line == '':
continue
words = line.split()
name = self.get_name(words[0])
size = words[5]
tree[name] = Node(name=name, size=int(size), link=set())
print("Getting dependencies")
out = call_stdout('emerge --pretend --verbose --depclean')
for line in out:
if line[:4] == ' ':
if line[4] == '@':
if line[4:] == '@selected':
tree[name].explicit = True
else:
name = self.get_name(line)
tree[name].links.add(dep)
elif line[:2] == ' ':
dep = self.get_name(line)
elif line[:3] == '>>>':
break
return tree
def local_load(self):
tree = legal_links(self.load_tree())
tree = compress_chains(bilink_tree(tree))
return legal_links(tree)
def repo_load(self):
print('not implemented')
raise
class Textfile(object):
""" VERY ALPHA, frequent to change!
loads a space delineated text file
file should be named "data", each line looks like
node_name size link1 link2 link3 ..."""
def load_tree(self):
tree = {}
filename = 'data'
for line in open(filename):
words = line.split()
node = words[0]
size = int(words[1])
links = words[2:]
tree[node] = Node(name=node, size=size, links=set(links))
return tree
def local_load(self):
tree = legal_links(self.load_tree())
tree = compress_chains(bilink_tree(tree))
return legal_links(tree)
class Rtree(object):
"Why? Pyrtree is insert-then-query (but mine has no delete), Rtree has crazy C deps."
def __init__(self, parent=None, box=None, name=None):
"Only root has no parent, only leaves have names."
# Half an R-tree. Not height balanced! Missing delete().
bigm = 5
self.parent = parent
self.name = name
self.bigm = bigm
self.lilm = bigm//2
self.children = []
self.box = (0,0,0,0)
self.last_hit = None
if box:
self.box = tuple(box)
def __lt__(self, other):
return True # haaaack
def show(self, depth=0):
head = ' ' * depth + str(self.box) + ' '
if self.name:
return head + self.name + '\n'
head += str(len(self.children))
return head + '\n' + ''.join(c.show(depth+1) for c in self.children)
def root(self):
if self.parent is None:
return self
return self.parent.root()
def best_node(self, to_add):
"smallest place for box to_add, does not change anything"
return smallest_merge(self.children, to_add)
def search(self, box=None):
"yields matching leaf objects, no box for all leaves"
todo = [self]
while todo:
node = todo.pop()
if box and not in_box(node.box, box):
continue
if node.name:
self.last_hit = node
yield node
todo.extend(node.children)
def search_cache(self, box):
"assumes consecutive searches will be near each other"
todo = [self.root()]
if self.last_hit:
todo.append(self.last_hit.parent)
todo.append(self.last_hit)
#while todo[0].parent:
# todo.insert(0, todo[0].parent)
while todo:
node = todo.pop()
if not in_box(node.box, box):
continue
if node.name:
self.last_hit = node
yield node
todo.extend(node.children)
def insert(self, box, name=None):
"makes a new node"
target = self.root().choose_leaf(box)
target.children.append(Rtree(target, box, name))
target.merge_up(box)
if len(target.children) > target.bigm:
target.divide_children()
def merge_up(self, box):
# only grows
new_box = merge(self.box, box)
if new_box == self.box:
return
self.box = new_box
if self.parent:
self.parent.merge_up(self.box)
def is_leaf_node(self):
return not self.children or bool(self.children[0].name)
def choose_leaf(self, box):
n = self
while not n.is_leaf_node():
n = n.best_node(box)
return n
def adjust(self):
if len(self.children) > self.bigm:
self.overflow()
if self.parent and len(self.children) < self.lilm:
self.underflow()
def divide_children(self):
children2 = [c for c in self.children]
self.children = []
while children2:
c1 = children2.pop()
newp = Rtree(self, c1.box)
newp.children.append(c1)
c1.parent = newp
self.children.append(newp)
if not children2:
break
c2 = smallest_merge(children2, newp.box)
newp.children.append(c2)
children2.remove(c2)
c2.parent = newp
newp.box = merge(c1.box, c2.box)
self.box = merge(*(c.box for c in self.children))
self.merge_up(self.box)
# Leaves are too small, but grow soon. Probably.
def underflow(self):
"Not implemented"
raise
def delete(self, box):
"Not implemented."
raise
def unbalance(self):
"histogram of leaf depths"
hist = dict((i,0) for i in range(100))
stack = [[self]]
depth = 0
while stack:
if not stack[-1]:
depth -= 1
stack.pop()
continue
c = stack[-1].pop()
assert bool(c.name) ^ bool(c.children)
if c.name:
hist[depth] += 1
else:
stack.append(c.children)
depth += 1
return dict((k,v) for k,v in hist.items() if v)
def leafiness(self):
"histogram of branch size"
hist = dict((i,0) for i in range(100))
stack = [self]
while stack:
c = stack.pop()
if c.name:
continue
hist[len(c.children)] += 1
stack.extend(c.children)
return dict((k,v) for k,v in hist.items() if v)
def merge(*boxes):
"new box surrounding inputs"
fns = (min, min, max, max)
return tuple(f(p) for f,p in zip(fns, list(zip(*boxes))))
def area(box):
return box[2]-box[0] * box[3]-box[1]
def smallest_merge(nodes, box):
"return best node from nodes"
return min((area(merge(n.box, box)), area(n.box), n) for n in nodes)[2]
def full_deps(package, tree, reverse=False):
"returns every package in dep tree"
deps = set()
to_crawl = deque([package])
while to_crawl:
current = to_crawl.popleft()
if current in deps:
continue
deps.add(current)
if not reverse:
current_deps = tree[current].links
else:
current_deps = tree[current].inverse
to_crawl.extend(current_deps - deps)
return list(deps)
def bilink_tree(tree):
"adds inverse from links"
for p in tree:
deps = tree[p].links
[tree[d].inverse.add(p) for d in deps]
return tree
def flatten(list_of_lists):
return list(chain(*list_of_lists))
def single_depends(tree, preserve_explicit=False):
if preserve_explicit:
return (p for p in tree if len(tree[p].inverse) == 1 and not tree[p].explicit)
else:
return (p for p in tree if len(tree[p].inverse) == 1)
def compress_chains(tree):
"single depends are absorbed into parent"
options, args = parse()
if not options.use_compression:
return tree
while True:
singles = single_depends(tree, options.preserve_explicit)
try:
s = next(singles)
except StopIteration:
return tree
parent = list(tree[s].inverse)[0]
if s == parent:
#print 'self loop', s
tree[s].links.remove(s)
tree[s].inverse.remove(s)
continue
#print 'merge', s, 'into', parent
tree[parent].size += tree[s].size
#tree[parent].name += ' ' + tree[s].name
for dep in tree[s].links:
tree[dep].inverse.remove(s)
tree[dep].inverse.add(parent)
tree[parent].links.update(tree[s].links)
tree[parent].links.remove(s)
tree.pop(s)
def sum_sizes(packages, tree):
return sum(tree[p].size for p in packages if p in tree)
def shared_size(package, tree):
"package and all deps"
return sum_sizes(full_deps(package, tree), tree)
def biggest_packs(tree):
packs = [(shared_size(p, tree), p) for p in tree]
return [p for s,p in sorted(packs, reverse=True)]
def toplevel_packs(tree):
return set(p for p in tree if not tree[p].inverse)
def packs_by_size(tree, pack_list):
by_sizes = [(tree[n].size, n) for n in pack_list]
return sorted(by_sizes, reverse=True)
def legal_links(tree):
"removes/reports dangling references"
valid = set(tree)
for k,v in tree.items():
invalid1 = v.links - valid
invalid2 = v.inverse - valid
if invalid1:
print('error: unknown', list(invalid1), 'in', k)
v.links = v.links - invalid1
if invalid2:
print('error: unknown', list(invalid2), 'in', k)
v.inverse = v.inverse - invalid2
return tree
#print('worst shared packages:', biggest_packs(tree)[:20])
#print('most crucial packages:', biggest_packs(invert_tree(tree))[:20])
def pt_sizes(tree, min_pt=10, max_pt=100, by_area=False):
"size in bytes -> size in points"
norm = lambda n: n.size
if by_area:
norm = lambda n: n.size / len(n.name)
sizes = [norm(node) for p,node in tree.items()]
min_s,max_s = min(sizes), max(sizes)
convert = lambda s: int((max_pt-min_pt)*(s-min_s)/(max_s-min_s) + min_pt)
for p, node in tree.items():
tree[p].font_pt = convert(norm(node))
return tree
def prioritized(packs):
"iter of names, sorted by priority"
# first are the most central, with lots of links
stats = [(len(node.all), k) for k,node in packs.items()]
stats = [n for l,n in sorted(stats, reverse=True)]
# but slip in anyone who's deps are met early
plotted = set()
for n in (n for n in stats if n not in plotted):
yield n
plotted.add(n)
deps_met = set(k for k,node in packs.items() if node.all <= plotted)
for n2 in deps_met - plotted:
yield n2
plotted.update(deps_met)
def ran_rad():
return random() * tau
def bbox(center, dim):
c,d = center,dim
cx,cy = center
dx,dy = dim
x1,x2 = cx - dx//2, cx + dx//2
y1,y2 = cy - dy , cy + dy//3
return [x1, y1, x2, y2]
def in_box(bboxA, bboxB):
a1x,a2y,a3x,a4y = bboxA
b1x,b2y,b3x,b4y = bboxB
return (a3x > b1x if a1x<b1x else b3x>a1x) and (a4y > b2y if a2y<b2y else b4y>a2y)
def all_bboxes(packs):
return [packs[name].box for name in packs]
def normalize(point, origin):
"weighted"
dx,dy = point[0]-origin[0], point[1]-origin[1]
length_sq = dx**2 + dy**2
return dx/length_sq, dy/length_sq
def link_pull(name, origin_n, packs):
"average of angles of links"
origin = packs[origin_n].center
norm_ps = lambda ps: [normalize(c, origin) for c in ps if c not in [(0,0), origin]]
rot90 = lambda x,y: (-y,x) if randint(0,1) else (y,-x)
good_links = packs[name].all
skew_links = set(packs.keys()) - good_links
g_centers = norm_ps(packs[l].center for l in good_links)
s_centers = norm_ps(packs[l].center for l in skew_links)
s_centers = [rot90(*xy) for xy in s_centers]
centers = g_centers + s_centers
if not centers:
# new branch, try to avoid existing branches
centers = norm_ps(packs[l].center for l in list(packs.keys()))
if not centers:
return (0,0)
centers = [(-x,-y) for x,y in centers]
return list(map(sum, list(zip(*centers))))
def xy2rad(x, y):
if (x,y) == (0,0):
return ran_rad()
return math.atan2(y,x)
def pol2xy(o, a, r):
return int(o[0] + r * cos(a)), int(o[1] + r * sin(a))
def pt2dim(name, pt):
x_scale = 0.65
y_scale = 0.85
return int(len(name)*pt*x_scale), int(pt*y_scale)
def best_origin(name, pri_hist, packs):
"returns largest sibling, or root"
possible = [n for n in pri_hist if n in packs[name].all]
if not possible:
return pri_hist[0] # root package
return max((packs[n].size, n) for n in possible)[1]
# add search_hyperbola and means of switching
def search(cd, origin, heading, scale, rt):
"linear binary search recursive closure thingy, returns radius"
history = []
def probe(r):
"returns true if clear"
cd[0] = pol2xy(origin, heading, r)
history.append(cd[0])
bb1 = bbox(*cd)
#return not any(rt.search_cache(bb1))
return not any(rt.search(bb1))
def search3(step, r):
s2 = step // 2
if probe(r - s2):
if step < scale:
return r-s2
else:
return search3(s2, r-s2)
if probe(r + s2):
step = s2
else:
step *= 2
return search3(step, r + step)
return search3(scale, scale), history
def search_spiral(cd, origin, scale, rt, step=5, aspect=0.5):
"returns angle, radius, history"
# step controls speed/granularity
# really should be a logarithmic spiral
# spiral aspect -> final aspect... holy grail?
history = []
def probe(heading, r):
"returns true if clear"
cd[0] = pol2xy(origin, heading, r)
history.append(cd[0])
bb1 = bbox(*cd)
return not any(rt.search(bb1))
angle = ran_rad()
r = scale
while not probe(angle, r):
x = cos(angle) * r
y = sin(angle) * r
m = angle + tau/4
x += cos(m) * scale * step / aspect
y += sin(m) * scale * step
angle = xy2rad(x, y)
r = hypot(x, y)
return angle, r, history
def place(packs, detail=False):
"radial placement algo, returns non-overlapping coords"
# maybe try two different link_pulls for each placement?
# maybe some control inversion to avoid queue checks?
pri_cache = []
def pri():
for p in prioritized(packs):
pri_cache.append(p)
yield p
prii = pri()
rt = Rtree()
p = next(prii)
rt.insert(bbox((0,0), packs[p].dim), p)
if detail:
yield pri_cache[0], [(0,0)]
else:
yield pri_cache[0], (0,0)
for name in prii:
node = packs[name]
origin_name = best_origin(name, pri_cache, packs)
#print('placing', name, 'around', origin_name)
origin = packs[origin_name].center
# unplaced links and big text need more room
scale = len(node.all-set(pri_cache))+1
scale = max(scale, node.dim[1])
#heading = xy2rad(*link_pull(name, origin_name, packs))
#r,history = search(node.cd, origin, heading, scale, rt)
heading,r,history = search_spiral(node.cd, origin, scale, rt)
center = pol2xy(origin, heading, r)
rt.insert(bbox(center, packs[name].dim), name)
if detail:
yield name, history
else:
yield name, center
#print(rt.show())
#print(copy.deepcopy(rt).unbalance())
#print(copy.deepcopy(rt).leafiness())
def offset_coord(c, d):
"corrects textbox origin"
return c[0]-d[0]//2, c[1] #+d[1]//2
def xml_wrap(tag, inner=None, **kwargs):
kw = ' '.join('%s="%s"' % (str(k), str(v)) for k,v in kwargs.items())
if inner is None:
return '<%s %s/>' % (tag, kw)
return '<%s %s>%s</%s>' % (tag, kw, inner, tag)
def control_point(p1, p2, drop=None):
dx = abs(p2[0] - p1[0])
lower = (p1,p2)[p1[1]<p2[1]]
higher = (p2,p1)[p1[1]<p2[1]]
if drop is None:
drop = 0.5 + 0.5 * random()
return (lower[0]+higher[0])//2, lower[1]+dx*drop//2
def quad_spline(p1, p2):
"boofor DSL in XML"
c = control_point(p1, p2)
return 'M%i,%i Q%i,%i %i,%i' % (p1 + c + p2)
def svg_spline(point1, point2):
return xml_wrap('path', None, d=quad_spline(point1, point2))
def arc_path(p1, p2):
"boofor DSL in XML"
# http://paulbourke.net/geometry/circlefrom3/
return 'M%i,%i A%i,%i %i,%i' % (p1 + c + p2)
def svg_arc(point1, point2):
return xml_wrap('path', None, d=arc_path(point1, point2))
def svg_text(node):
x,y = offset_coord(node.center, node.dim)
kw = {'font-size': node.font_pt}
return xml_wrap('text', node.name, x=x, y=y, fill=node.color, **kw)
def all_points(packs):
"slightly incomplete, guestimates the splines"
all_bb = flatten((bb[:2],bb[2:]) for bb in all_bboxes(packs))
all_endpoints = [(packs[p].center, packs[l].center) for p in packs for l in packs[p].all]
all_controls = [control_point(ep[0], ep[1], 0.5) for ep in all_endpoints]
return all_bb + all_controls
def recenter(packs, points):
"shift everything into quadrant 1"
min_x,min_y = list(map(min, list(zip(*points))))
for node in packs.values():
x,y = node.center
node.center = x-min_x, y-min_y
return packs
def window_size(points):
xs,ys = list(zip(*points))
return max(xs)-min(xs), max(ys)-min(ys)
def all_links(packs):
paths = []
for pack in packs:
links = packs[pack].all
p1 = packs[pack].center
paths.extend((p1,packs[l].center) for l in links if l<pack)
return paths
def svgify(packs):
options, args = parse()
toplevel = toplevel_packs(packs)
bottomlevel = set(packs) - toplevel
needs, needed_by = [], []
if args and ((options.mode != 'arch-repo') ^ options.show_req):
needs = flatten(full_deps(p, packs, reverse=False) for p in args)
needed_by = flatten(full_deps(p, packs, reverse=True) for p in args)
all_ps = all_points(packs)
packs = recenter(packs, all_ps)
width,height = window_size(all_ps)
for pack,node in packs.items():
# replace with a lookup structure of some sort
# { test_fn(): 'color', ... }? [ (pack_set, 'color'), ... ]?
# also, order sensitive
if pack in bottomlevel:
node.color = options.dependency
if pack in toplevel:
node.color = options.toplevel
if pack in needs:
node.color = options.highlight[1]
if pack in needed_by:
node.color = options.highlight[2]
if pack in args and ((options.mode != 'arch-repo') ^ options.show_req):
node.color = options.highlight[0]
texts = [svg_text(packs[p]) for p in packs]
paths = [svg_spline(*line) for line in all_links(packs)]
svg = xml_wrap('rect', None, x=0, y=0, width=width, height=height, style='fill:%s;' % options.background)
svg += xml_wrap('g', '\n'.join(paths), style='stroke:%s; stroke-opacity:0.15; fill:none;' % options.link)
svg += xml_wrap('g', '\n'.join(texts), **{'font-family':'Monospace'})
svg = xml_wrap('svg', svg, width=width, height=height, xmlns='http://www.w3.org/2000/svg')
svg = '<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.0//EN" "http://www.w3.org/TR/SVG/DTD/svg10.dtd">\n' + svg
svg = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n' + svg
filename = os.path.expanduser(options.filename)
open(filename + '.svg', 'w').write(svg)
def call_status(cmd):
"returns exit status"
spp = subprocess.PIPE
return subprocess.Popen(cmd, shell=True, stdout=spp, stderr=spp).wait()
def call_stdout(cmd):
"returns stdout"
subp = subprocess.Popen(cmd, shell=True, stdin=None, stdout=subprocess.PIPE)
return subp.communicate()[0].decode().split('\n')
def parse():
default_action = 'autodetect'
parser = optparse.OptionParser(description='Produces two files, pacgraph.svg and pacgraph.png. Colors should be entered as hex values like "#ffffff". SVG named colors may also work, see http://en.wikipedia.org/wiki/Web_colors . Packages listed in the args are highlighted.')
parser.add_option('-s', '--svg', dest='svg_only', action='store_true', default=False,
help='Produce the SVG but do not attempt to rasterize it.')
parser.add_option('-o', '--opt-deps', dest='opt_deps', action='store_true', default=False,
help='Include optional dependencies. May produce a less compact graph.')
parser.add_option('-e', '--explicits', dest='preserve_explicit', action='store_true', default=False,
help='Preserve explicitly installed applications from dependency compression. May produce a less compact graph.')
parser.add_option('-c', '--console', dest='console', action='store_true', default=False,
help='Print summary to console, does not draw a graph. Good for very slow computers.')
parser.add_option('-r', '--rip', dest='rip', action='store_true', default=False,
help='Rips a copy of your installed packages to pacgraph.txt. Good for debugging.')
parser.add_option('-f', '--file', dest='filename', default='pacgraph',
help='Override default filename/location. Do not specify an extension.')
parser.add_option('--disable-palette', dest='indexed', action='store_false', default=True,
help='Disables lossy palette compression.')
colors = optparse.OptionGroup(parser, "Theming Options")
colors.add_option('-b', '--background', dest='background', metavar='COLOR', default='#ffffff',
help='Background color.')
colors.add_option('-l', '--link', dest='link', metavar='COLOR', default='#606060',
help='Color of links between packages.')
colors.add_option('-t', '--top', dest='toplevel', metavar='COLOR', default='#0000ff',
help='Color of packages which are not dependencies.')
colors.add_option('-d', '--dep', dest='dependency', metavar='COLOR', default='#6a6aa2',
help='Color of packages which are dependencies.')
colors.add_option('-i', '--highlight', dest='highlight', metavar='COLOR COLOR COLOR', nargs=3,
default=('#00cc00', '#ff0000', '#700060'),
help='Color of selected package, selected dependencies, selected needed-by.')
colors.add_option('-p', '--point', dest='point_size', metavar='INT INT', type='int', nargs=2, default=(10,100),
help='Takes two integers, for the smallest and largest font size. Default is -p 10 100.')
parser.add_option_group(colors)
beta = optparse.OptionGroup(parser, "Experimental Options")
beta.add_option('-m', '--mode', dest='mode', default=default_action,
help='Curently supported modes are arch, arch-repo, debian, redhat, ipkg, gentoo, frugalware and frugal-repo. Default is %s.' % default_action)
beta.add_option('-n', '--no-compression', dest='use_compression', action='store_false', default=True,
help='Disable all chain compression.')
beta.add_option('--shared', dest='shared', action='store_true', default=False,
help='Compare shared libraries.')
beta.add_option('--show-req-by', dest='show_req', action='store_true', default=False,
help='Includes required-by of specified packages. Only works for arch-repo.')
beta.add_option('--by-area', dest='by_area', action='store_true', default=False,
help='Represent sizes by the area of the name, not the pt of the name.')
parser.add_option_group(beta)
options, args = parser.parse_args()
return options, args
def human_si(number):
powers = [(2**10,''), (2**20,'k'), (2**30,'M')]
limit,si = ([p for p in powers if p[0] > number] + powers[-1:])[0]
n2 = str(round(number / (limit//2**10)))
if si == 'M':
n2 = '%0.1f' % round((number / float(limit//2**10)), 1)
return n2 + ' ' + si +'B'
def console_dump(tree):
options, args = parse()
stat_line = human_si(sum(node.size for node in tree.values()))
print('Total size:', stat_line)
if options.use_compression:
tops = toplevel_packs(tree)
else:
tops = list(tree.keys())
if len(tops) == 1:
tops = list(tree.keys())
for s,n in packs_by_size(tree, tops):
print(human_si(s), n)
return
def secondlevel_sizes(tree):
# this is slightly lazy and only handles the common case
# it ignores oddly shaped dep trees
tops = toplevel_packs(tree)
seconds = set()
[seconds.update(tree[t].links) for t in tops]
sec_info = []
for s in seconds:
aboves = tree[s].inverse & tops # lazy!
size = tree[s].size + sum(tree[i].size for i in aboves)
sec_info.append((size/len(aboves), s, size, aboves))
sec_info.sort(reverse=True)
for null, s, size, aboves in sec_info:
print('%s: %s ' % (s, human_si(size)))
print('\t', ' '.join(sorted(aboves)))
def exists(app):
return call_status('which %s' % app) == 0
def distro_detect():
# todo, replace this with an /etc/issue scanner
if exists('pacman'):
print ('Autodetected Arch.')
return 'arch'
if exists('ipkg'):
print ('Autodetected ipkg.')
return 'ipkg'
if any(map(exists, 'dpkg apt-get aptitude'.split())):
print ('Autodetected deb.')
return 'debian'
if exists('rpm'):
print ('Autodetected rpm.')
return 'redhat'
if exists('poldek'):
print ('Autodetected PLD.')
return 'redhat'
if exists('emerge'):
print ('Autodetected gentoo.')
return 'gentoo'
return None
def distro_detect2():
"prototype /etc scanner"
is_r = lambda f: f.endswith('release') or f.endswith('version')
releases = [f for f in next(os.walk('/etc/'))[2] if is_r(f)]
releases = [f.partition('_')[0] for f in releases]
releases = [f.partition('-')[0] for f in releases]
if 'arch' in releases:
print ('Autodetected Arch')
return 'arch'
if 'redhat' in releases or 'fedora' in releases:
print ('Autodetected rpm')
return 'redhat'
if 'debian' in releases:
print ('Autodetected deb')
return 'debian'
if 'gentoo' in releases:
print ('Autodetected gentoo')
return 'gentoo'
print ('Warning: Could not autodetect from /etc')
return None
def main():
arch = Arch()
debian = Debian()
redhat = Redhat()
gentoo = Gentoo()
frugal = Frugal()
textfile = Textfile()
options, args = parse()
filename = os.path.expanduser(options.filename)
tree = None
if options.mode == 'autodetect':
options.mode = distro_detect2()
if not options.mode:
print ('Trying fallback package manager detection')
options.mode = distro_detect()
print('Loading package info')
if options.mode == 'arch':
tree = arch.local_load()
if options.mode == 'arch-repo':
tree = arch.repo_load()
if options.mode == 'debian':
tree = debian.local_load()
if options.mode == 'redhat':
tree = redhat.local_load()
if options.mode == 'gentoo':
tree = gentoo.local_load()
if options.mode == 'frugalware':
tree = frugal.local_load()
if options.mode == 'frugal-repo':
tree = frugal.repo_load()
if options.mode == 'textfile':
tree = textfile.local_load()
if options.mode == 'ipkg':
tree = debian.local_load(status_path='/usr/lib/ipkg/status')
# ipkg reports bytes, not KB
for node in tree.values():
node.size /= 1024
if tree is None:
print('Could not load packages. Manually specify --mode.')
return
stat_line = human_si(sum(v.size for k,v in list(tree.items())))
if options.rip:
open(filename + '.txt', 'w').write(repr(tree))
return
if options.console:
console_dump(tree)
return
if options.shared:
secondlevel_sizes(tree)
return
print('Placing %i nodes' % len(tree))
tree = pt_sizes(tree, *options.point_size)
for n,c in place(tree):
tree[n].center = c
tree = recenter(tree, all_points(tree))
tree[stat_line] = Node(name=stat_line, font_pt=sum(options.point_size)//2)
print('Saving SVG')
svgify(tree)
if options.svg_only:
return
print('Rendering PNG')
png_conversion = ['inkscape -D -e %(fn)s.png %(fn)s.svg',
'svg2png %(fn)s.svg %(fn)s.png',
'convert %(fn)s.svg %(fn)s.png']
for cmd_line in png_conversion:
app = cmd_line.split()[0]
if not exists(app):
continue
call_status(cmd_line % {'fn':filename})
if options.indexed and exists('mogrify'):
print('Indexing PNG')
call_status('mogrify -quality 100 -type optimize -flatten +matte -colors 127 %s.png' % filename)
return # after first success
print('No way to convert SVG to PNG.')
print('Inkscape, svg2png or imagemagick would be nice.')
print('Alternatives: "pacgraph-tk" or "pacgraph --console".')
print()
console_dump(tree)
if __name__ == "__main__":
main()
#import cProfile; cProfile.run("main()", sort=1)