#!/usr/bin/env python
"""
table_catalog module - TableCatalog implementation
TableCatalog class scans and caches the layout of the
/tablets
directory structure into a fast bitmap+list data structure pickled in
/tablet_tree.pkl file. These are used by Table.get_cells() routine to
substantially speed up the cell scan.
On very large tables (e.g., ps1_det), this class accellerates the get_cells()
~ 40x (6 seconds instead of 240).
The main acceleration data structures are a 2D (W x W) numpy array of
indices (stored in bmaps[self._pix.level]), and a 1D ndarray of (mjd, next)
pairs into which the indices in the image refer to (leaves). Leaves is
(logically) a singly linked list of all temporal cells within the spatial
cell, with the root of the list being pointed to from bmap. above is
2**lev, where lev is the bhpix level of pixelization of the table.
To look up whether a particular static cell exists and what it
contains do:
- Compute the (i,j) indices of the cell in bhpix projection,
where (i, j) range from [0, W)
- If bmap[i, j] == 0, the cell is unpopulated
- If bmap[i, j] != 0, its value is an index, 'offs' into leaves.
- leaves[offs] is a (mjd, next) tuple, with mjd being the
temporal coordinate of the cell.
- The absolute value of next is the offset to the next entry in
leaves that is a temporal cell within this static cell (i.e., offs
+= abs(leaves[offs]['next']) will position you to the next cell). Its
sign is negative if the cell contain only neighbor
caches; otherwise it's positive. This allows us to quickly cull
cache-only cells from queries that don't care about it.
A special value abs(next) = table_catalog.END_MARKER denotes there are no
more temporal cells.
- For further acceleration, there is a series of images of lower
resolution (W/2 x W/2), (W/4 x W/4), ... (1 x 1), stored in a
dict() named bmaps. These can be used to quickly rule out there
are no populated cells in a larger partitioning of space. E.g., if
pixel (0, 0) is zero in the (2 x 2) map, it means there are no
populated cells with x < 0, y < 0 bhpix coordinates (and the
search of that entire subtree can be avoided). TableCatalog
makes use of these 'mipmaps' to accelerate get_cells().
TODO:
- the catalog should be build/maintained whenever the database is
modified. Modifications to Table._create_tablet and Table.append
should do the trick.
- Note however that there needs to be a synchronization
mechanism if multiple processes are operating on the same
table (we probably need a server)
"""
import logging
import cPickle, os, glob
import tables
import pool2
import numpy as np
import bounds as bn
import bhpix
import utils
from pixelization import Pixelization
from interval import intervalset
from collections import defaultdict
from itertools import izip
logger = logging.getLogger('lsd.table_catalog')
END_MARKER=0x7FFFFFFF
def _add_bounds(outcells, cell_id, xybounds, tbounds):
# cells is a dictionary of cell_id -> dict objects,
# where each dict object is another dictionary of xybounds -> tbounds
#
# This function adds the (xybounds, tbounds) boundaries
# to the dictionary, for a given cell_id, correctly merging it
# with any bounds that already exist
if tbounds is None:
outcells[cell_id][xybounds] = None # Cover all times (no temporal bounds)
elif xybounds not in outcells[cell_id]: # New entry
outcells[cell_id][xybounds] = tbounds
elif outcells[cell_id][xybounds] is not None: # Merge tbounds with existing entry
outcells[cell_id][xybounds] |= tbounds
def _get_cells_kernel(ij, lev, cc, bounds):
i, j = ij
cells = defaultdict(dict)
for bounds_xy, bounds_t in bounds:
cc._get_cells_recursive(cells, bounds_xy, bounds_t, i, j, lev, bhpix.pix_size(lev))
yield cells
def _scan_recursive_kernel(xy, lev, cc):
x, y = xy
w = bhpix.width(cc._pix.level)
bmap = np.zeros((w, w), dtype=object)
cc._scan_recursive(bmap, lev, x, y)
yield bmap
def iter_siblings(a, at):
# Iterate through a linked list
offs = 0
while offs != END_MARKER:
at += offs
row = tuple(a[at])
yield row
offs = abs(a['next'][at])
def get_snapshot_path(table_path, snapid):
if snapid == 0:
return table_path
else:
return os.path.join(table_path, 'snapshots', snapid)
def isnapshots(table_path, return_path=False, first=None, last=None, no_first=False):
""" Returns an iterable of committed snapshots, in no particular order """
# Adjust boundaries
if last is None:
last = "z"*80 # No snapshot ID should be lexicographically greater than this
# pre v0.5.0 tables compatibility:
if first in [0, None]:
path = os.path.join(table_path, 'tablets')
if os.path.isdir(path):
yield (0 if not return_path else (0, table_path))
# enumerate snapshots
for path in glob.iglob('%s/snapshots/*/.committed' % (table_path)):
t = path.split('/')[-2]
if first <= t and t <= last:
if no_first and first == t:
continue
yield (t if not return_path else (t, os.path.dirname(path)))
class TableCatalog:
_bmaps = None
_leaves = None
_pix = None
#################
def _get_cells_recursive(self, outcells, bounds_xy, bounds_t, i = 0, j = 0, lev = 0, dx = 2.):
""" Helper for get_cells(). See documentation of
get_cells() for usage
"""
w2 = 1 << (lev-1)
x, y = (i - w2 + 0.5)*dx, (j - w2 + 0.5)*dx
# Check for nonzero overlap
box = self._pix._cell_bounds_xy(x, y, dx)
bounds_xy = bounds_xy & box
if not bounds_xy:
return
# Check if the cell directory exists (give up if it doesn't)
bmap = self._bmaps[lev]
offs = bmap[i, j]
if not offs:
return
# If re reached the bottom of the hierarchy
if offs > 1:
# Get the cell_ids for leaf cells matching pattern
xybounds = None if(bounds_xy.area() == box.area()) else bounds_xy
next = 0
while next != END_MARKER:
(t, _, _, next) = self._leaves[offs]
has_data = next > 0
next = abs(next)
if next != END_MARKER: # Not really necessary, but numpy warns of overflow otherwise.
offs += next
if not has_data and not self._include_cached:
continue
if t != self._pix.t0:
# Cut on the time component
tival = intervalset((t, t+self._pix.dt))
tolap = bounds_t & tival
if len(tolap):
(l, r) = tolap[-1] # Get the right-most interval component
if l == r == t+self._pix.dt: # Is it a single point?
tolap = intervalset(*tolap[:-1]) # Since objects in this cell have time in [t, t+dt), remove the t+dt point
if len(tolap) == 0: # No overlap between the intervals -- skip this cell
continue;
# Return None if the cell is fully contained in the requested interval
tbounds = None if tival == tolap else tolap
else:
# Static cell
tbounds = bounds_t
assert next == END_MARKER, "There can be only one static cell (x,y,t=%s,%s,%s)" % (x, y, t)
# Compute cell ID
cell_id = (x, y, t)
# Add to output
_add_bounds(outcells, cell_id, xybounds, tbounds)
else:
# Recursively subdivide the four subpixels
for (di, dj) in [(0, 0), (0, 1), (1, 0), (1, 1)]:
i2 = i*2 + di
j2 = j*2 + dj
self._get_cells_recursive(outcells, bounds_xy & box, bounds_t, i2, j2, lev+1, dx/2)
def get_cells(self, bounds=None, return_bounds=False, include_cached=True):
""" Return a list of (cell_id, bounds) tuples completely
covering the requested bounds.
bounds must be a list of (Polygon, intervalset) tuples.
Output is a list of (cell_id, xybounds, tbounds) tuples,
unless return_bounds=False when the output is just a
list of cell_ids.
"""
self._include_cached = include_cached
# Special case of bounds=None (all sky)
if bounds == None:
bounds = [(bn.ALLSKY, intervalset((-np.inf, np.inf)))]
# Find all existing cells satisfying the bounds
cells = defaultdict(dict)
if False:
# Single-threaded implementation
for bounds_xy, bounds_t in bounds:
self._get_cells_recursive(cells, bounds_xy, bounds_t, 0, 0, 1, 1.)
self._get_cells_recursive(cells, bounds_xy, bounds_t, 0, 1, 1, 1.)
self._get_cells_recursive(cells, bounds_xy, bounds_t, 1, 0, 1, 1.)
self._get_cells_recursive(cells, bounds_xy, bounds_t, 1, 1, 1, 1.)
else:
# Multi-process implementation (appears to be as good or better than single thread in
# nearly all cases of interest)
pool = pool2.Pool()
lev = min(4, self._pix.level)
ij = np.indices((2**lev,2**lev)).reshape(2, -1).T # List of i,j coordinates
for cells_ in pool.imap_unordered(ij, _get_cells_kernel, (lev, self, bounds), progress_callback=pool2.progress_pass):
for cell_id, b in cells_.iteritems():
for xyb, tb in b.iteritems():
_add_bounds(cells, cell_id, xyb, tb)
del pool
if len(cells):
# Transform (x, y, t) tuples to cell_ids
xyt = np.array(cells.keys()).transpose()
cell_ids = self._pix._cell_id_for_xyt(xyt[0], xyt[1], xyt[2])
# Reorder cells to be a dict of cell: [(poly, time), (poly, time)] entries
cells = dict(( (cell_id, v.items()) for (cell_id, (k, v)) in izip(cell_ids, cells.iteritems()) ))
if not return_bounds:
return cells.keys()
else:
return cells
def get_cells_in_snapshot(self, snapid, include_cached=True):
""" Return a list of cells that are physically stored in snapshot snapid """
keep = self._leaves['snapid'] == snapid
if not include_cached:
keep &= self._leaves['next'] > 0
cells = self._leaves['cell_id'][keep]
return cells
def snapshot_of_cell(self, cell_id):
try:
return self._cell_to_snapshot[cell_id]
except KeyError:
raise LookupError()
#################
def _get_temporal_siblings(self, path, pattern):
""" Given a cell_id, get all sibling temporal cells (including the static
sky cell) that exist in it.
"""
pattern = "%s/*/%s" % (path, pattern)
for fn in glob.iglob(pattern):
# parse out the time, construct cell ID
(kind, _) = fn.split('/')[-2:]
t = self._pix.t0 if kind == 'static' else float(kind[1:])
yield t, fn
def _scan_recursive(self, bmap, lev=0, x=0., y=0.):
# Check if the cell directory exists in any allowable snapshot
# Snapshots are ordered newest-to-oldest, so we can avoid touching the
# siblings of older snapshots if they exists in newer ones
dx = bhpix.pix_size(lev)
# .../snapshots/XXXXXX/tablets/-0.5+0.5/..../
paths = [ (snapid, '%s/tablets/%s' % (snapshot_path, self._pix._path_to_cell_base_xy(x, y, lev))) for (snapid, snapshot_path) in self.__snapshots ]
paths = [ (snapid, path) for (snapid, path) in paths if os.path.isdir(path) ] # Keep only those that exist
if not len(paths): # Dead end
return
if lev != self._pix.level:
# Recursively subdivide the four subpixels
dx = dx / 2
for d in np.array([(0.5, 0.5), (-0.5, 0.5), (-0.5, -0.5), (0.5, -0.5)]):
(x2, y2) = (x, y) + dx*d
self._scan_recursive(bmap, lev+1, x2, y2)
else:
# Leaf
w2 = bhpix.width(lev) // 2
i, j = (x // dx + w2, y // dx + w2)
# Collect the data we have, and add them to the set of cells that exist
siblings = dict()
for snapid, path in paths:
for tcell, fn in self._get_temporal_siblings(path, self.__pattern):
if tcell not in siblings: # Add only if there's no newer version
# check if there are any non-cached data in here
try:
with tables.openFile(fn) as fp:
has_data = len(fp.root.main.table) > 0
except tables.exceptions.NoSuchNodeError:
has_data = False
siblings[tcell] = snapid, has_data
# Add any relevant pre-existing data
offs = self._bmaps[self._pix.level][i, j]
if offs != 0:
for mjd, snapid, _, next in iter_siblings(self._leaves, offs):
if mjd not in siblings:
siblings[mjd] = snapid, next > 0
# Add this list to bitmap
assert bmap[i, j] == 0
bmap[i, j] = [ (tcell, snapid, self._pix._cell_id_for_xyt(x, y, tcell), has_data) for (tcell, (snapid, has_data)) in siblings.iteritems() ]
def _update(self, table_path, snapid):
# Find what we already have loaded
prevsnap = np.max(self._leaves['snapid']) if len(self._leaves) > 2 else None
assert prevsnap <= snapid, "Cannot update a catalog to an older snapshot"
## Enumerate all existing snapshots older or equal to snapid, and newer than prevsnap, and sort them, newest first
self.__snapshots = sorted(isnapshots(table_path, first=prevsnap, last=snapid, no_first=True, return_path=True), reverse=True)
# Add the snapid snapshot by hand, if not in the list of committed snapshots
# (because it is permissible to update to a yet-uncommitted snapshot)
if len(self.__snapshots) == 0 or self.__snapshots[0][0] != snapid:
self.__snapshots.insert(0, (snapid, get_snapshot_path(table_path, snapid)))
# Recursively scan, in parallel
w = bhpix.width(self._pix.level)
bmap = np.zeros((w, w), dtype=object)
lev = min(4, self._pix.level)
dx = bhpix.pix_size(lev)
i, j = np.indices((2**lev,2**lev)).reshape(2, -1) # List of i,j coordinates
w2 = 1 << (lev-1)
x, y = (i - w2 + 0.5)*dx, (j - w2 + 0.5)*dx
pool = pool2.Pool()
for bmap2 in pool.imap_unordered(zip(x, y), _scan_recursive_kernel, (lev, self)):
assert not np.any((bmap != 0) & (bmap2 != 0))
mask = bmap2 != 0
bmap[mask] = bmap2[mask]
del pool
# Add data about cells that were not touched by this update
bmap_cur = self._bmaps[self._pix.level]
mask_cur = (bmap_cur != 0) & (bmap == 0)
lists_cur = [ [ (mjd, snapid, cell_id, next > 0) for (mjd, snapid, cell_id, next) in iter_siblings(self._leaves, offs) ] for offs in bmap_cur[mask_cur] ]
try:
bmap[mask_cur] = lists_cur
except ValueError:
# Workaround for a numpy 1.6.0 bug (http://projects.scipy.org/numpy/ticket/1870)
coords = np.mgrid[0:w,0:w][:,mask_cur].T # List of coordinates corresponding to True entries in mask_cur
for (ii, jj), vv in izip(coords, lists_cur):
bmap[ii, jj] = vv
# Repack the temporal siblings to a single numpy array, emulating a linked list
lists = bmap[bmap != 0]
llens = np.fromiter( (len(l) for l in lists), dtype=np.int32 )
leaves = np.empty(np.sum(llens)+2, dtype=[('mjd', 'f4'), ('snapid', object), ('cell_id', 'u8'), ('next', 'i4')])
leaves[:2] = [(np.inf, 0, 0, END_MARKER)]*2 # We start with two dummy entries, so that offs=0 and 1 are invalid and can take other meanings.
seen = dict()
at = 2
for l in lists:
last_i = len(l) - 1
for (i, (mjd, snapid, cell_id, has_data)) in enumerate(l):
# Make equal strings refer to the same string object
try:
snapid = seen[snapid]
except KeyError:
seen[snapid] = snapid
next = 1 if has_data else -1
if i == last_i:
next *= END_MARKER
leaves[at] = (mjd, snapid, cell_id, next)
at += 1
# Construct bmap that has offsets to head of the linked list of siblings
offs = np.zeros(len(llens), dtype=np.int64)
offs[1:] = np.cumsum(llens)[:-1]
offs += 2 # Pointers to beginnings of individual lists
assert np.all(abs(leaves['next'][offs-1]) == END_MARKER)
obmap = np.zeros(bmap.shape, dtype=np.int32)
obmap[bmap != 0] = offs
# Recompute mipmaps
bmaps = self._compute_mipmaps(obmap)
return bmaps, leaves
def _compute_mipmaps(self, bmap):
# Create mip-maps
bmaps = {self._pix.level: bmap} # Mip-maps of bmap with True if the cell has data in its leaves, and False otherwise
for lev in xrange(self._pix.level-1, -1, -1):
w = bhpix.width(lev)
m0 = np.zeros((w, w), dtype=np.int32)
m1 = bmaps[lev+1]
for (i, j) in [(0, 0), (0, 1), (1, 0), (1, 1)]:
m0[:,:] |= m1[i::2, j::2]
m0 = m0 != 0
bmaps[lev] = m0
return bmaps
def _rebuild_internal_state(self):
self._cell_to_snapshot = dict(v for v in izip(self._leaves['cell_id'][2:], self._leaves['snapid'][2:]))
assert len(self._cell_to_snapshot) == len(self._leaves)-2, (len(self._cell_to_snapshot), len(self._leaves))
def update(self, table_path, pattern, snapid):
self.__pattern = pattern
self._bmaps, self._leaves = self._update(table_path, snapid)
self._rebuild_internal_state()
def save(self, fn):
dir = os.path.dirname(os.path.normpath(fn))
if dir != '':
utils.mkdir_p(dir)
cPickle.dump((self._bmaps, self._leaves, self._pix), file(fn, mode='w'), -1)
def load(self, fn):
self._bmaps, self._leaves, self._pix = cPickle.load(file(fn))
self._rebuild_internal_state()
def clear(self):
# Initialize an empty table
w = bhpix.width(self._pix.level)
self._bmaps = self._compute_mipmaps(np.zeros((w, w), dtype=int))
self._leaves = np.empty(2, dtype=[('mjd', 'f4'), ('snapid', object), ('cell_id', 'u8'), ('next', 'i4')])
self._leaves[:2] = [(np.inf, 0, 0, END_MARKER)]*2
self._rebuild_internal_state()
def __init__(self, fn=None, pix=None, snapid=None):
if fn is not None:
assert pix is None
self.load(fn)
else:
assert pix is not None
assert fn is None
self._pix = pix
self.clear()
def __eq__(self, b):
# Compare Pixelization objects
if self._pix != b._pix:
return False
# Compare low-resolution (boolean) mipmaps
for i in xrange(self._pix.level):
if np.all(self._bmaps[i] != b._bmaps[i]):
return False
# Compare the overall layout of the base bitmap
bmap1 = self._bmaps[self._pix.level]
bmap2 = b._bmaps[b._pix.level]
if not np.all( (bmap1 > 1) == (bmap2 > 1) ):
return False
# Compare the temporal siblings in each bitmap
for offs1, offs2 in izip(bmap1[bmap1 > 1], bmap2[bmap2 > 1]):
list1 = sorted((mjd, snap_id, cell_id) for (mjd, snap_id, cell_id, _) in iter_siblings(self._leaves, offs1))
list2 = sorted((mjd, snap_id, cell_id) for (mjd, snap_id, cell_id, _) in iter_siblings(b._leaves, offs2))
if list1 != list2:
return False
# Compare _leaves, all columns but 'next'
if self._leaves.dtype != b._leaves.dtype:
return False
names = [ name for name in self._leaves.dtype.names if name != 'next' ]
s1 = np.sort(self._leaves, order=names)
s2 = np.sort(b._leaves, order=names)
for name in names:
if not np.all(s1[name] == s2[name]):
return False
# Compare the signs of the 'next' column (== has_data)
if not np.all((s1['next'] > 0) == (s2['next'] > 0)):
return False
# The two objects are identical
return True
def __ne__(self, b):
return not (self == b)
def check_table_catalog(table_path, pattern, snapid=None):
""" Verify the consistency of the table catalog
Load the table catalog given by snapid (or the lates one,
if none is given), and compare it to a catalog reconstructed
from scratch.
"""
# Load existing catalog
snapshots = dict(isnapshots(table_path, return_path=True))
if snapid is None:
snapid = max(snapshots.keys())
fn = os.path.join(snapshots[snapid], 'catalog.pkl')
cc1 = TableCatalog(fn=fn)
# Construct one from scratch
cc2 = TableCatalog(pix=cc1._pix)
cc2.update(table_path, pattern, snapid)
assert cc1 == cc2
if __name__ == '__main__':
tpath = '/n/pan/mjuric/lsd_test5/ps1_det'
#check_table_catalog(tpath, 'ps1_det.astrometry.h5'); exit()
if False:
import sys
pix = Pixelization(6, 54335, 1)
cc = TableCatalog(pix=pix)
if True:
snaps = sorted(isnapshots(tpath))
for i, snapid in enumerate(snaps):
print >>sys.stderr, snapid,
if i+1 == len(snaps) or (i % 2 == 0):
cc.update(tpath, 'ps1_det.astrometry.h5', snapid)
else:
print >>sys.stderr, "skipping."
else:
cc.load('catalog.pkl')
cc.update(tpath, 'ps1_det.astrometry.h5', '20110617094228.164285')
###cc.update(tpath, 'ps1_det.astrometry.h5', '20110617084754.101965')
###cc.update(tpath, 'ps1_det.astrometry.h5', '20110617094228.164285')
cc.save('catalog.pkl')
#cc2 = TableCatalog(fn='/n/pan/mjuric/lsd_test5/ps1_det/snapshots/20110617084754.101965/catalog.pkl')
cc2 = TableCatalog(fn='/n/pan/mjuric/lsd_test5/ps1_det/snapshots/20110617094228.164285/catalog.pkl')
assert cc == cc2
print "OK"
else:
cc1 = TableCatalog(fn='catalog.pkl')
#cc2 = TableCatalog(fn='/n/pan/mjuric/lsd_test5/ps1_det/snapshots/20110617084754.101965/catalog.pkl')
cc2 = TableCatalog(fn='/n/pan/mjuric/lsd_test5/ps1_det/snapshots/20110617094228.164285/catalog.pkl')
assert cc1 == cc2
print "OK"