# -*- coding: utf-8 -*-
# Copyright (c) 2020 Stephen Wasilewski, HSLU and EPFL
# =======================================================================
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# =======================================================================
import numpy as np
from raytools import translate
from raytools.utility import pool_call
from raytraverse.lightfield import ZonalLightResult, ResultAxis, LightResult
from raytraverse.integrator.integrator import Integrator
[docs]
class SensorIntegrator(Integrator):
"""collection of sensorplanes with evaluation routines
Parameters
----------
lightplanes: Sequence[raytraverse.lightfield.SensorPlaneKD]
ptype: Sequence[str]
matching order of lightplanes, requires one for each:
- "sky": represents sky with nsrcs = skydata
- "skysun": represents sky+sun with nsrcs = skydata
- "patch": represents sun contribution as patch
- "sun": sun contribution
- "fixed": does not respond to skydata (electric lighting)
factors: Sequence[int], optional
values, for each light plane to scale contribution of eeach light plane
for example, provide (1, -1, 1) for ptype: ("skysun", "patch", "sun")
for 2-phase DDS calculation. If not give, all are set to 1
scale: float, optional
default output in lux (179)
"""
def __init__(self, *lightplanes, ptype=None, factors=None, scale=179.,
**kwargs):
# argument checking
if ptype is None:
raise ValueError("ptype is required to initialize SensorIntegrator")
if len(ptype) != len(lightplanes):
raise ValueError(f"length of ptype ({len(ptype)}) does not match "
f"# of lightplanes ({len(lightplanes)}")
ptypes = ("sky", "skysun", "patch", "sun", "fixed")
badtypes = [i for i in ptype if i not in ptypes]
if len(badtypes) > 0:
raise ValueError(f"Invalid value(s) for ptype: {badtypes} not in "
f"{ptypes}")
if factors is None:
factors = [1] * len(lightplanes)
if len(factors) != len(lightplanes):
raise ValueError(f"length of factors ({len(factors)}) does not "
f"match # of lightplanes ({len(lightplanes)}")
# make sure no sunviewengine is passed
kwargs.update(sunviewengine=None)
# set sensor specific args
self.sensors = lightplanes[0].sensors
self.factors = [f * scale for f in factors]
self.ptype = ptype
super().__init__(*lightplanes, **kwargs)
[docs]
def make_images(self, *args, **kwargs):
raise ValueError("SensorIntegrator does not make_images")
[docs]
def evaluate(self, skydata, points=None, vm=None, datainfo=False,
stol=10, minsun=1, **kwargs):
"""apply sky data and view queries to daylightplane to return metrics
parallelizes and optimizes run order.
Parameters
----------
skydata: raytraverse.sky.Skydata
points: np.array, optional
shape (N, 3), if None evaluates zone
vm: ignored
datainfo: Union[Sized[str], bool], optional
include information about source data as additional metrics. Valid
values include: ["pt_err", "pt_idx", "src_err", "src_idx"].
If True, includes all. zonal evaluation will only include src_err
and src_idx
stol: Union[float, int], optional
maximum angle (in degrees) for matching sun vectors
minsun: int, optional
if atleast these many suns are not returned based on stol, directly
query for this number of results (regardless of sun error)
Returns
-------
raytraverse.lightfield.LightResult
"""
# delegate to zonal_evaluate
if (points is None and len(self._issunplane) > 0 and
np.any(skydata.sun[:, 3] > 0)):
return self.zonal_evaluate(skydata, self.lightplanes[0].pm,
datainfo=datainfo,
stol=stol, minsun=minsun, **kwargs)
if points is None:
points, skarea = self._get_fixed_points(self.lightplanes[0].pm)
# include areas with points when available
apoints = np.hstack((points, skarea[:, None]))
else:
points = np.atleast_2d(points)
apoints = points
sensors = self.sensors
tidxs, skydatas, vecs = self._group_query(skydata, points)
oshape = (len(skydata.maskindices), len(points), len(sensors), 1)
self.scene.log(self, f"Evaluating {oshape[2]} sensors at {oshape[1]} "
f"points under {oshape[0]} skies", True)
fields = None
sidx = []
for lp, sd, ti in zip(self.lightplanes, skydatas, tidxs):
if len(ti) == len(points):
data = np.einsum("hs,pnsf->hpnf", sd, lp.data[ti])
sidx.append(np.broadcast_to(ti[None], (len(sd), len(points))).ravel())
else:
d = lp.data[ti].reshape(*oshape[0:3], sd.shape[1], oshape[3])
data = np.einsum("hs,hpnsf->hpnf", sd, d)
sidx.append(ti)
if fields is None:
fields = data
else:
fields += data
fields = fields.reshape(oshape)
sinfo, dinfo = self._sinfo(datainfo, vecs, sidx, oshape[0:2])
if sinfo is not None:
nshape = list(sinfo.shape)
nshape[2] = fields.shape[2]
sinfo = np.broadcast_to(sinfo, nshape)
fields = np.concatenate((fields, sinfo), axis=-1)
# compose axes: (skyaxis, ptaxis, viewaxis, metricaxis)
axes = (ResultAxis(skydata.rowlabel[skydata.fullmask], f"sky"),
ResultAxis(apoints, "point"),
ResultAxis(sensors, "view"),
ResultAxis(["illum"] + dinfo, "metric"))
lr = LightResult(fields, *axes, boundary=self.lightplanes[0].pm)
return lr
[docs]
def zonal_evaluate(self, skydata, pm, vm=None, datainfo=False,
stol=10, minsun=1, calcarea=True, **kwargs):
"""
Parameters
----------
see evaluate
Returns
-------
raytraverse.lightfield.ZonalLightResult
"""
# delegate back to evaluate
if len(self._issunplane) == 0:
return self.evaluate(skydata, datainfo=datainfo,
stol=stol, minsun=minsun, **kwargs)
self.scene.log(self, f"Evaluating {len(self.sensors)} sensors across "
f"zone {pm.name} under {len(skydata.maskindices)} "
f"skies", True)
(sunmask, suns, pts, sundata,
skarea) = self._zonal_group_query(skydata, pm, stol=stol,
minsun=minsun, calcarea=calcarea)
cnts = np.full(len(sunmask), len(pts))
cnts[sunmask] = [len(s) for s in sundata[0]]
tcnt = np.sum(cnts)
# seems to be sweet spot to chunk size, too large and bogged down
# too small and lots of overhead, on test this is 4x faster than single
# thread
chunks = round(tcnt/20000)
if chunks > 1:
csize = int(np.ceil(len(sunmask)/chunks))
sdi = 0
chunkrng = np.arange(0, len(sunmask), csize)
args = []
for ci in chunkrng:
slc = slice(ci, ci+csize)
sdc = np.sum(sunmask[slc])
sdlc = slice(sdi, sdi+sdc)
sdi += sdc
csundata = [j[sdlc] for j in sundata]
args.append((slc, csundata))
rs = pool_call(self._evaluate_chunks, args, skydata, sunmask, suns,
pts, skarea, desc="matrix multiplication")
fields = np.concatenate([r[0] for r in rs], axis=0)
serr = np.concatenate([r[1] for r in rs], axis=0)
tidxs = np.concatenate([r[2] for r in rs], axis=1)
areas = np.concatenate([r[3] for r in rs], axis=0)
all_vecs = np.concatenate([r[4] for r in rs], axis=0)
else:
result = self._evaluate_chunk(skydata.smtx, sunmask, suns, pts,
sundata, skarea)
fields, serr, tidxs, areas, all_vecs = result
oshape = (len(fields), len(self.sensors))
pmetrics = ['x', 'y', 'z', 'area']
if datainfo:
sinfo, dinfo = self._zonal_sinfo(serr, tidxs, oshape + (2,))
if sinfo is not None:
fields = np.concatenate((sinfo, fields), axis=-1)
pmetrics += dinfo
areas = np.broadcast_to(areas[:, None, None], oshape + (1,))
axes = [ResultAxis(skydata.rowlabel[skydata.fullmask], f"sky"),
ResultAxis([pm.name], f"zone"),
ResultAxis(self.sensors, "view"),
ResultAxis(pmetrics + ["illum"], "metric")]
strides = np.cumsum(cnts)[:-1]
fvecs = np.broadcast_to(all_vecs[:, None, 3:], oshape + (3,))
fields = np.concatenate((fvecs, areas, fields), axis=-1)
fields = np.split(fields, strides)
return ZonalLightResult(fields, *axes, boundary=pm)
def _evaluate_chunks(self, slc, csundata, skydata, sunmask, suns, pts,
skarea):
"""for parallel processing, avoids exploding memory"""
return self._evaluate_chunk(skydata.smtx[slc], sunmask[slc],
suns[slc], pts, csundata, skarea)
def _evaluate_chunk(self, dsmtx, sunmask, suns, pts, sundata, skarea):
result = self._unmask_data(dsmtx, sunmask, suns, pts, sundata, skarea)
smtx, dsns, all_vecs, sunidx, serr, areas, cnts = result
tidxs, skydatas = self._match_ragged(smtx, dsns, sunidx, all_vecs)
fields = np.zeros((len(tidxs[0]), len(self.sensors), 1))
for lp, sd, ti in zip(self.lightplanes, skydatas, tidxs):
fields += np.einsum("ps,pnsf->pnf", sd, lp.data[ti])
return fields, serr, tidxs, areas, all_vecs
def _match_ragged(self, smtx, dsns, sunidx, all_vecs):
tidxs = []
skydatas = []
if "patch" in self.ptype or "skysun" in self.ptype:
skpatch = translate.xyz2skybin(dsns[:, 0:3],
int((smtx.shape[1] - 1)**.5))
dsk = np.zeros(smtx.shape)
dsk[np.arange(len(dsk)), skpatch] = dsns[:, 4]
else:
dsk = None
for p, f, lp in zip(self.ptype, self.factors, self.lightplanes):
if p == "sky":
skydatas.append(smtx * f)
tidxs.append(lp.query(all_vecs[:, 3:])[0])
elif p == "patch":
skydatas.append(dsk * f)
tidxs.append(lp.query(all_vecs[:, 3:])[0])
elif p == "sun":
skydatas.append(dsns[:, 3:4] * f)
tidxs.append(sunidx)
elif p == "skysun":
skydatas.append((smtx + dsk) * f)
tidxs.append(lp.query(all_vecs[:, 3:])[0])
else:
skydatas.append(np.full((len(dsns), 1), f))
tidxs = np.stack(tidxs)
return tidxs, skydatas
def _group_query(self, skydata, points):
"""for sensor integration broadcasting happens in the summation, so
this function is just allocating the correct points and sky matrix
only vecs is broadcast to the actual result size"""
# query and group sun positions
suns = skydata.sun[:, None, 0:3]
pts = np.atleast_2d(points)[None, :]
vecs = np.concatenate(np.broadcast_arrays(suns, pts), -1).reshape(-1, 6)
skydatas = self._select_sky_grid(skydata)
idxs = []
for lp in self.lightplanes:
if lp.vecs.shape[1] == 6:
idxs.append(lp.query(vecs)[0])
else:
idxs.append(lp.query(points)[0])
return idxs, skydatas, vecs
def _select_sky_grid(self, skydata):
skydatas = []
for p, f, lp in zip(self.ptype, self.factors, self.lightplanes):
# broadcast skydata to match full indexing
if p == "sky":
skydatas.append(skydata.smtx * f)
elif p == "skysun":
skydatas.append(skydata.smtx_patch_sun() * f)
elif p == "patch":
skydatas.append(skydata.smtx_patch_sun(False) * f)
elif p == "sun":
skydatas.append(skydata.sun[:, 3:4] * f)
else:
skydatas.append(np.full((len(skydata.sun), 1), f))
return skydatas