Source code for raytraverse.sampler.isamplersuns

# -*- coding: utf-8 -*-
# Copyright (c) 2020 Stephen Wasilewski, HSLU and EPFL
# =======================================================================
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# =======================================================================
import numpy as np
from clasp.script_tools import try_mkdir

from raytraverse.mapper import MaskedPlanMapper
from raytraverse.sampler import ISamplerArea, Sensor
from raytraverse.sampler.samplersuns import SamplerSuns
from raytraverse.lightfield import SunSensorPlaneKD
from raytools.utility import pool_call


[docs] class ISamplerSuns(SamplerSuns): """wavelet based sun position sampling class Parameters ---------- scene: raytraverse.scene.Scene scene class containing geometry and formatter compatible with engine engine: raytraverse.sampler.Sensor with initialized renderer instance (with scene loaded, no sources) accuracy: float, optional parameter to set threshold at sampling level relative to final level threshold (smaller number will increase sampling, default is 1.0) nlev: int, optional number of levels to sample jitter: bool, optional jitter samples ptkwargs: dict, optional kwargs for raytraveerse.sampler.SunSamplerPt initialization areakwargs: dict, optional kwargs for raytravrse.sampler.SamplerArea initialization metricset: iterable, optional subset of samplerarea.metric set to use for sun detail calculation. """ def __init__(self, scene, engine, accuracy=1.0, nlev=3, jitter=True, areakwargs=None, t0=.05, t1=.125): super(SamplerSuns, self).__init__(scene, engine, accuracy, stype=f'sunpositions', t0=t0, t1=t1) if areakwargs is None: areakwargs = {} areakwargs.update(samplerlevel=self._slevel + 1) self._areakwargs = dict(featurefunc=np.max, edgemode=0.0) self._areakwargs.update(areakwargs) self.nlev = nlev self.jitter = jitter # initialize runtime variables: #: extra variables since sampler also needs to track each areasampler self._areadraws = None self._areadetail = None self._areaweights = None #: raytraverse.mapper.SkyMapper (set in self.run()) self._skymapper = None #: raytraverse.mapper.PlanMapper (set in self.run()) self._areamapper = None self._mask = slice(None) self._candidates = None self.slices = [] self._recovery_data = None
[docs] def get_existing_run(self, skymapper, areamapper): raise ValueError("get_existing_run not supported for ISamplerArea")
[docs] def run(self, skymapper, areamapper, **kwargs): """adaptively sample sun positions for an area (also adaptively sampled) Parameters ---------- skymapper: raytraverse.mapper.SkyMapper the mapping for drawing suns areamapper: raytraverse.mapper.PlanMapper the mapping for drawing points kwargs: passed to self.run() Returns ------- raytraverse.lightlplane.LightPlaneKD """ # reset/initialize runtime properties self._skymapper = skymapper self._areamapper = areamapper self._areaweights = None try_mkdir(f"{self.scene.outdir}/{areamapper.name}") levels = self.sampling_scheme(skymapper) super(SamplerSuns, self).run(skymapper, areamapper.name, levels, **kwargs) src = f"{self.engine.name}_{self._skymapper.name}_sun" return SunSensorPlaneKD(self.scene, self.idxvecs(), self._areamapper, src)
def _run_sample(self, idx, vecs, level_desc): if self.engine.nproc is None or self.engine.nproc > 1: cap = 1 else: cap = None sk = dict(dirs=self.engine.dirs, offsets=self.engine.offsets, name=self.engine.name, sunview=self.engine.sunview) stype = f"{self._skymapper.name}_sun" lums = pool_call(_sample_sun, list(zip(range(*idx), vecs, self._areadraws)), self.scene, self.engine.engine, self._areamapper, stype=stype, desc=level_desc, cap=cap, pbar=self.scene.dolog, sensorkwargs=sk, areakwargs=self._areakwargs) return np.array(lums) def _normed_weights(self): nmin = np.min(self._areaweights) norm = np.max(self._areaweights) - nmin if norm > 0: nweights = (self._areaweights - nmin)/norm else: nweights = self._areaweights return nweights def _dump_vecs(self, vecs): if self.vecs is None: self.vecs = vecs v0 = 0 else: self.vecs = np.concatenate((self.vecs, vecs)) v0 = self.slices[-1].stop self.slices.append(slice(v0, v0 + len(vecs))) vfile = (f"{self.scene.outdir}/{self._areamapper.name}/" f"{self.engine.name}_{self._skymapper.name}_{self.stype}.tsv") # file format: level idx sx sy sz np.savetxt(vfile, self.idxvecs(), ("%d", "%d", "%.4f", "%.4f", "%.4f")) def _plot_weights(self, level, vm, name, suffix=".hdr", fisheye=True, **kwargs): if self._areaweights is not None: normw = self._normed_weights() for m, w in enumerate(normw): outp = (f"{self.scene.outdir}_{name}_{self.stype}_{m}" f"_{level:02d}{suffix}") self._plot_dist(np.max(w, axis=(0, 1)), vm, outp, fisheye)
def _sample_sun(suni, sunpos, adraws, scene, engine, mapper, stype="sun", sensorkwargs=None, areakwargs=None): """this function is for calling with a process pool, by declaring the sun sampler point after the child process is forked the call to engine.load_source happens on an isolated memory instance, allowing for concurrency on different scenes, despite the singleton/ global namespace issues of the cRtrace instance.""" if sensorkwargs is None: sensorkwargs = {} if areakwargs is None: areakwargs = None ambfile = f"{scene.outdir}/{stype}_{suni:04d}.amb" engine.load_solar_source(scene, sunpos, ambfile) sensor = Sensor(engine, **sensorkwargs) areasampler = ISamplerArea(scene, sensor, stype=f"{stype}_{suni:04d}", **areakwargs) if adraws is not None: amapper = MaskedPlanMapper(mapper, adraws, 0) else: amapper = mapper # amapper = mapper lf = areasampler.run(amapper, log=False) shp = (*areasampler.levels[-1], areasampler.features) # build weights based on final sampling candidates = amapper.point_grid_uv(jitter=False, level=areasampler.nlev - 1, masked=False) mask = amapper.in_view_uv(candidates, False) wvecs = amapper.uv2xyz(candidates) idx, d = lf.query(wvecs) if areasampler.lum.ndim == 4: slum = areasampler.weightfunc(areasampler.lum, axis=2) else: slum = areasampler.lum weights = slum[idx].reshape(shp) weights = np.moveaxis(weights, 2, 0) for w in weights: w.flat[np.logical_not(mask)] = -1 return weights