Source code for raytraverse.lightfield.sunsplanekd

# -*- coding: utf-8 -*-
# Copyright (c) 2020 Stephen Wasilewski, HSLU and EPFL
# =======================================================================
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# =======================================================================
import numpy as np
from scipy.spatial import cKDTree, distance_matrix

from raytraverse.lightfield.sets import MultiLightPointSet
from raytraverse.lightfield.lightfield import LightField
from raytraverse.translate import theta2chord, chord2theta
from raytraverse.utility import pool_call


[docs]class SunsPlaneKD(LightField): """collection of lightplanes with KDtree structure for sun position query """ @property def vecs(self): """indexing vectors (sx, sy, sz, px, py, pz)""" return self._vecs @vecs.setter def vecs(self, pt): if pt is None: pt = f"{self.scene.outdir}/{self.pm.name}/{self.src}positions.tsv" pts, idx, samplelevel = self._load_vecs(pt) s_pts, s_lev, s_idx, nmz = self._load_points(idx, pts, samplelevel) self._normalization = nmz self._vecs = s_pts self._samplelevel = s_lev self.data = s_idx self._kd = None self._suns = pts self._sunkd = None self.omega = None def _load_points(self, idx, pts, samplelevel): """loop over source point files to build indices""" s_pts = [] s_idx = [] s_lev = [] pdist = [] # fall back to guesstimate of level 0 in case component planes # do not have levels indexed l0 = self.pm.point_grid(False).shape[0] haslevels = np.sum(samplelevel) > 0 if haslevels: # calculate sun sampling resolution for weighting query vecs sund = self._estimate_grid_size(pts[samplelevel == 0]) else: sund = self._estimate_grid_size(pts[:45]) for i, pt, sl in zip(idx, pts, samplelevel): spt, sidx, slev = self._load_point(f"{self.src}_{i:04d}") # only the unmasked level 0 suns will provide a good # estimate for level0 point distances if haslevels and sl == 0: if np.sum(slev) > 0: s = spt[slev == 0] else: s = spt[:l0] pdist.append(self._estimate_grid_size(s)) s_pts.append(np.hstack((np.broadcast_to(pt[None], spt.shape), spt))) s_idx.append(np.stack((np.broadcast_to([i], sidx.shape), sidx)).T) s_lev.append(np.stack((np.broadcast_to([sl], slev.shape), slev)).T) if haslevels: normalization = sund/np.average(pdist) else: pdist = self.pm.ptres * (np.sqrt(2)/2+.5) normalization = sund/pdist s_pts = np.concatenate(s_pts) s_lev = np.concatenate(s_lev) s_idx = np.concatenate(s_idx) return s_pts, s_lev, s_idx, normalization def _load_point(self, source): ptf = f"{self.scene.outdir}/{self.pm.name}/{source}_points.tsv" return self._load_vecs(ptf) @staticmethod def _estimate_grid_size(v): dm = distance_matrix(v, v) cm = np.ma.MaskedArray(dm, np.eye(dm.shape[0])) return np.average(np.min(cm, axis=0).data) @property def suns(self): return self._suns @property def data(self): """LightPlaneSet""" return self._data @data.setter def data(self, idx): self._data = MultiLightPointSet(self.scene, self.vecs, idx, self.src, self.pm.name) @property def kd(self): """kdtree for spatial queries built on demand""" if self._kd is None: weighted_vecs = np.copy(self.vecs) weighted_vecs[:, 3:] *= self._normalization self._kd = cKDTree(weighted_vecs) return self._kd @property def sunkd(self): """kdtree for sun position queries built on demand""" if self._sunkd is None: self._sunkd = cKDTree(self.suns) return self._sunkd
[docs] def query(self, vecs): """return the index and distance of the nearest vec to each of vecs Parameters ---------- vecs: np.array shape (N, 6) vectors to query. Returns ------- i: np.array integer indices of closest ray to each query d: np.array distance from query to point, positional distance is normalized by the average chord-length between level 0 sun samples divided by the average distance between level 0 pt samples. """ weighted_vecs = np.copy(np.atleast_2d(vecs)).astype(float) weighted_vecs[:, 3:] *= self._normalization d, i = self.kd.query(weighted_vecs) return i, d
[docs] def query_by_sun(self, sunvec, fixed_points=None, stol=10, minsun=1): """for finding vectors across zone, sun vector based query Parameters ---------- sunvec: Sequence sun direction vector (normalized, xyz) fixed_points: Sequence, optional 2d array like, shape (N, 3) of additional fixed points to return use for example with a matching sky query. Note that if point filter is to large not all of these points are necessarily returned. stol: Union[float, int], optional maximum angle (in degrees) for matching sun vectors minsun: int, optional if atleast these many suns are not returned based on stol, directly query for this number of results (regardless of sun error) Returns ------- vecs: np.array shape (N, 6) final vectors, because of fixed_points, this may not match exactly with self.vecs[i] so this array mus be used in further processing i: np.array integer indices of the closest rays to each query d: np.array angle (in degrees) between queried sunvec and returned index """ return _query_by_sun(sunvec, self.sunkd, self.vecs, self.data.idx[:, 0], fixed_points=fixed_points, stol=stol, minsun=minsun, normalization=self._normalization)
[docs] def query_by_suns(self, sunvecs, fixed_points=None, stol=10, minsun=1): """parallel processing call to query_by_sun for 2d array of sunvecs Parameters ---------- sunvecs: np.array shape (N, 3) sun direction vectors (normalized, xyz) fixed_points: Sequence, optional 2d array like, shape (N, 3) of additional fixed points to return use for example with a matching sky query. Note that if point filter is to large not all of these points are necessarily returned. stol: Union[float, int], optional maximum angle (in degrees) for matching sun vectors minsun: int, optional if atleast these many suns are not returned based on stol, directly query for this number of results (regardless of sun error) Returns ------- vecs: list list of np.array, one for each sunvec (see query_by_sun) idx: list list of np.array, one for each sunvec (see query_by_sun) d: list list of np.array, one for each sunvec (see query_by_sun) """ result = pool_call(_query_by_sun, sunvecs, self.sunkd, self.vecs, self.data.idx[:, 0], expandarg=False, desc="finding SunPlane points", pbar=self.scene.dolog, fixed_points=fixed_points, stol=stol, minsun=minsun, normalization=self._normalization) vecs, idx, d = zip(*result) return vecs, idx, d
def _query_by_sun(sunvec, sunkd, svecs, sidx, fixed_points=None, stol=10, minsun=1, normalization=1.0): """for finding vectors across zone, sun vector based query""" # first find all points with suns within tolerance cstol = theta2chord(stol*np.pi/180) i = sunkd.query_ball_point(np.ravel(sunvec), cstol) if len(i) < minsun: d, i = sunkd.query(sunvec, minsun) if minsun == 1: i = [i] # filter points corresponding to these suns sunfilt = np.isin(sidx, i) vecs = svecs[sunfilt] idx = np.argwhere(sunfilt).ravel() # prepend fixed_points (using closest values) if fixed_points is not None: fixed_points = np.atleast_2d(fixed_points) pkd = cKDTree(vecs[:, 3:]) d, i = pkd.query(fixed_points) fvecs = np.hstack((vecs[i, 0:3], fixed_points)) vecs = np.concatenate((fvecs, vecs), axis=0) idx = np.concatenate((idx[i], idx)) # calculate sun errors sund = np.linalg.norm(vecs[:, 0:3] - np.atleast_2d(sunvec), axis=1) cpts = np.copy(vecs[:, 3:]) # add sun error penalty cpts[:, 2] += sund/normalization ckd = cKDTree(cpts) # reassign original vectors based on penalized query d, i = ckd.query(vecs[:, 3:]) # eliminate unused vectors i = np.unique(i) d = chord2theta(sund[i]) * 180/np.pi return vecs[i], idx[i], d