Source code for assimilation

""" RHEAS module for assimilation

.. module:: assimilation
   :synopsis: Definition of the assimilation module

.. moduleauthor:: Kostas Andreadis <kandread@jpl.nasa.gov>

"""

import kalman
from datetime import date
import numpy as np
from collections import OrderedDict
from scipy.spatial.distance import cdist
from functools import partial
import re
import dbio
import logging


[docs]def observationDates(obsnames, dbname, startyear, startmonth, startday, endyear, endmonth, endday, update): """Return dates when observation *obsname* is available during the simulation period.""" if update is not None and isinstance(update, str): if update.find("week") >= 0: update = 7 elif update.find("month") >= 0: update = 30 else: update = -1 else: update = 1 dates = [] db = dbio.connect(dbname) cur = db.cursor() for name in obsnames: name = name.lower().strip() obsmod = __import__("datasets." + name, fromlist=[name]) obsobj = getattr(obsmod, name.capitalize()) obs = obsobj() sql = "select distinct(fdate) from {0} where fdate>=date '{1}-{2}-{3}' and fdate<=date '{4}-{5}-{6}'".format( obs.tablename, startyear, startmonth, startday, endyear, endmonth, endday) cur.execute(sql) results = cur.fetchall() for ri, r in enumerate(results): if not r[0] in dates: if isinstance(update, date) and r[0] is update: dates.append(r[0]) elif isinstance(update, int): if (ri > 0 and (r[0] - dates[-1]).days >= update) or ri < 1: dates.append(r[0]) else: dates.append(r[0]) dates.sort() for dt in [date(startyear, startmonth, startday), date(endyear, endmonth, endday)]: if dt in dates: # remove first and last day of simulation since it will not impact # results saved dates.remove(dt) cur.close() db.close() return dates
[docs]def assimilate(options, dt, models, method="letkf"): """Assimilate multiple observations into the VIC model.""" log = logging.getLogger(__name__) obsnames = options['vic']['observations'].split(",") X = OrderedDict() Xlat = OrderedDict() Xlon = OrderedDict() Xgid = OrderedDict() HX = OrderedDict() Y = OrderedDict() Ylat = OrderedDict() Ylon = OrderedDict() for name in obsnames: name = name.lower().strip() # dynamically load observation module and get data obsmod = __import__("datasets." + name, fromlist=[name]) obsobj = getattr(obsmod, name.capitalize()) # check whether user has set uncertainty parameters for observation if 'observations' in options and name in options['observations']: try: sname = re.split(" |,", options['observations']['name]'])[0].lower() params = map(float, re.split(" |,", options['observations']['name]'])[1:]) smod = __import__("scipy.stats", fromlist=[sname]) sdist = getattr(smod, sname) except: log.warning("No distribution {0} available for dataset {1}, falling back to default.".format(sname, name)) else: rvs = partial(sdist.rvs, *params) obs = obsobj(rvs) else: obs = obsobj() data, lat, lon = obs.get(dt, models) if data is not None: if obs.obsvar not in Y: Y[obs.obsvar] = data Ylat[obs.obsvar] = lat[:, 0] Ylon[obs.obsvar] = lon[:, 0] data, lat, lon, gid = obs.x(dt, models) for s in obs.statevar: if s not in X: X[s] = data[s] Xlat[s] = lat[:, 0] Xlon[s] = lon[:, 0] Xgid[s] = gid[:, 0] data, _, _ = obs.hx(models, dt) if obs.obsvar not in HX: HX[obs.obsvar] = data if bool(X): x = np.vstack((X[k] for k in X)) hx = np.vstack((HX[k] for k in HX)) y = np.vstack((Y[k] for k in Y)) xlat = np.vstack((Xlat[k] for k in Xlat)) xlon = np.vstack((Xlon[k] for k in Xlon)) ylat = np.vstack((Ylat[k] for k in Ylat)) ylon = np.vstack((Ylon[k] for k in Ylon)) dists = cdist(np.vstack((xlat, xlon)).T, np.vstack((ylat, ylon)).T) kfobj = getattr(kalman, method.upper()) E = obs.E(models.nens) kf = kfobj(x, hx, y, E) kf.analysis(dists) i = 0 for k in X: for j in range(i, X[k].shape[0] + i): X[k][j - i, :] = kf.Aa[j, :] i += X[k].shape[0] return X, Xlat, Xlon, Xgid