""" RHEAS module for generating drought products.
.. module:: drought
:synopsis: Module that contains functionality for generating drought products
.. moduleauthor:: Kostas Andreadis <kandread@jpl.nasa.gov>
"""
import numpy as np
from dateutil.relativedelta import relativedelta
import scipy.stats as stats
from datetime import date, datetime, timedelta
import pandas
import dbio
import logging
def _movingAverage(data, n):
"""Calculate the moving average from a time series."""
out = np.cumsum(data)
out[n:] = out[n:] - out[:-n]
return out[n - 1:] / n
[docs]def calcSRI(duration, model, cid):
"""Calculate Standardized Runoff Index for specified month
*duration*."""
log = logging.getLogger(__name__)
outvars = model.getOutputStruct(model.model_path + "/global.txt")
startdate = date(model.startyear + model.skipyear, model.startmonth, model.startday)
enddate = date(model.endyear, model.endmonth, model.endday)
nt = (enddate - startdate).days + 1
ndays = ((startdate + relativedelta(months=duration)) - startdate).days + 1
if duration < 1 or ndays > nt:
log.warning("Cannot calculate SRI with {0} months duration.".format(duration))
sri = np.zeros(nt)
else:
p = np.loadtxt("{0}/{1}_{2:.{4}f}_{3:.{4}f}".format(model.model_path, outvars['runoff'][0], model.gid[cid][0], model.gid[cid][1], model.grid_decimal))[:, outvars['runoff'][1]]
p = pandas.Series(p, [datetime(model.startyear, model.startmonth, model.startday) + timedelta(t) for t in range(len(p))])
pm = p.rolling(duration*30).mean() # assume each month is 30 days
g1, g2, g3 = stats.gamma.fit(pm[duration*30:])
cdf = stats.gamma.cdf(pm, g1, g2, g3)
sri = stats.norm.ppf(cdf)
sri[np.isnan(sri)] = 0.0
sri[np.isinf(sri)] = 0.0
return sri
[docs]def calcSPI(duration, model, cid):
"""Calculate Standardized Precipitation Index for specified month
*duration*."""
log = logging.getLogger(__name__)
startdate = date(model.startyear + model.skipyear, model.startmonth, model.startday)
enddate = date(model.endyear, model.endmonth, model.endday)
nt = (enddate - startdate).days + 1
ndays = ((startdate + relativedelta(months=duration)) - startdate).days + 1
# tablename = "precip."+model.precip
if duration < 1 or ndays > nt:
log.warning("Cannot calculate SPI with {0} months duration.".format(duration))
spi = np.zeros(nt)
else:
p = np.loadtxt("{0}/forcings/data_{1:.{3}f}_{2:.{3}f}".format(model.model_path,
model.gid[cid][0], model.gid[cid][1], model.grid_decimal))[:, 0]
p = pandas.Series(p, [datetime(model.startyear, model.startmonth, model.startday) + timedelta(t) for t in range(len(p))])
pm = p.rolling(duration*30).mean() # assume each month is 30 days
g1, g2, g3 = stats.gamma.fit(pm[duration*30:])
cdf = stats.gamma.cdf(pm, g1, g2, g3)
spi = stats.norm.ppf(cdf)
spi[np.isnan(spi)] = 0.0
spi[np.isinf(spi)] = 0.0
return spi
[docs]def calcSeverity(model, cid, varname="soil_moist"):
"""Calculate drought severity from *climatology* table stored in database."""
log = logging.getLogger(__name__)
outvars = model.getOutputStruct(model.model_path + "/global.txt")
col = outvars[varname][1]
if varname in ["soil_moist"]:
p = np.loadtxt("{0}/{1}_{2:.{4}f}_{3:.{4}f}".format(model.model_path, outvars['runoff'][0], model.gid[cid][0], model.gid[cid][1], model.grid_decimal))[:, col:col+model.nlayers]
p = pandas.Series(np.sum(p, axis=1), [datetime(model.startyear, model.startmonth, model.startday) + timedelta(t) for t in range(len(p))])
else:
p = np.loadtxt("{0}/{1}_{2:.{4}f}_{3:.{4}f}".format(model.model_path, outvars['runoff'][0], model.gid[cid][0], model.gid[cid][1], model.grid_decimal))[:, col]
p = pandas.Series(p, [datetime(model.startyear, model.startmonth, model.startday) + timedelta(t) for t in range(len(p))])
db = dbio.connect(model.dbname)
cur = db.cursor()
if dbio.tableExists(model.dbname, model.name, varname):
if varname in ["soil_moist"]:
lvar = ",layer"
else:
lvar = ""
if dbio.columnExists(model.dbname, model.name, varname, "ensemble"):
fsql = "with f as (select fdate{3},avg(st_value(rast,st_geomfromtext('POINT({0} {1})',4326))) as vals from {2}.{4} where st_intersects(rast,st_geomfromtext('POINT({0} {1})',4326)) group by fdate{3})".format(model.gid[cid][1], model.gid[cid][0], model.name, lvar, varname)
else:
fsql = "with f as (select fdate{3},st_value(rast,st_geomfromtext('POINT({0} {1})',4326)) as vals from {2}.{4} where st_intersects(rast,st_geomfromtext('POINT({0} {1})',4326)))".format(model.gid[cid][1], model.gid[cid][0], model.name, lvar, varname)
sql = "{0} select fdate,sum(vals) from f group by fdate".format(fsql)
cur.execute(sql)
if bool(cur.rowcount):
results = cur.fetchall()
clim = pandas.Series([r[1] for r in results], [r[0] for r in results])
else:
clim = p
else:
log.warning("Climatology table does not exist. Severity calculation will be inaccurate!")
clim = p
s = 100.0 - np.array(map(lambda v: stats.percentileofscore(clim, v), p))
return s
[docs]def calcDrySpells(model, cid, droughtfun=np.mean, duration=14, recovduration=2):
"""Calculate maps of number of dry spells during simulation period."""
# FIXME: Currently only uses precipitation to identify dry spells. Need to change it to also use soil moisture and runoff
p = np.loadtxt("{0}/forcings/data_{1:.{3}f}_{2:.{3}f}".format(model.model_path, model.gid[cid][0], model.gid[cid][1], model.grid_decimal))[:, 0]
drought_thresh = droughtfun(p)
ndroughts = np.zeros(len(p))
days = 0
for i in range(recovduration-1, len(p)):
if p[i] <= drought_thresh:
days += 1
elif all(p[i-j] > drought_thresh for j in range(recovduration)):
days = 0
else:
days += 1
if days == duration:
ndroughts[i] = 1
return np.cumsum(ndroughts)
[docs]def calcSMDI(model, cid):
"""Calculate Soil Moisture Deficit Index (Narasimhan & Srinivasan, 2005)."""
log = logging.getLogger(__name__)
outvars = model.getOutputStruct(model.model_path + "/global.txt")
col = outvars['soil_moist'][1]
p = np.loadtxt("{0}/{1}_{2:.{4}f}_{3:.{4}f}".format(model.model_path, outvars['soil_moist'][0], model.gid[cid][0], model.gid[cid][1], model.grid_decimal))[:, col:col+model.nlayers]
p = pandas.Series(np.sum(p, axis=1), [datetime(model.startyear, model.startmonth, model.startday) + timedelta(t) for t in range(len(p))])
db = dbio.connect(model.dbname)
cur = db.cursor()
if dbio.tableExists(model.dbname, model.name, "soil_moist"):
if dbio.columnExists(model.dbname, model.name, "soil_moist", "ensemble"):
fsql = "with f as (select fdate,layer,avg(st_value(rast,st_geomfromtext('POINT({0} {1})',4326))) as sm from {2}.soil_moist where st_intersects(rast,st_geomfromtext('POINT({0} {1})',4326)) group by fdate,layer)".format(model.gid[cid][1], model.gid[cid][0], model.name)
else:
fsql = "with f as (select fdate,layer,st_value(rast,st_geomfromtext('POINT({0} {1})',4326)) as sm from {2}.soil_moist where st_intersects(rast,st_geomfromtext('POINT({0} {1})',4326)))".format(model.gid[cid][1], model.gid[cid][0], model.name)
sql = "{0} select fdate,sum(sm) from f group by fdate".format(fsql)
cur.execute(sql)
if bool(cur.rowcount):
results = cur.fetchall()
clim = pandas.Series([r[1] for r in results], [r[0] for r in results])
else:
clim = p
else:
log.warning("Climatology table does not exist. SMDI calculation will be inaccurate!")
clim = p
smdi = np.zeros(len(p))
MSW = clim.median()
maxSW = clim.max()
minSW = clim.min()
for i in range(7, len(smdi)):
SW = np.median(p[i-7:i+1])
if SW == MSW:
SD = (SW - MSW) / (MSW - minSW) * 100.0
else:
SD = (SW - MSW) / (maxSW - MSW) * 100.0
if i > 7:
smdi[i] = 0.5 * smdi[i-1] + SD / 50.0
else:
smdi[i] = SD / 50.0
cur.close()
db.close()
return smdi
[docs]def calc(varname, model, cid):
"""Calculate drought-related variable."""
# nt = (date(model.endyear, model.endmonth, model.endday) -
# date(model.startyear + model.skipyear, model.startmonth, model.startday)).days + 1
if varname.find("spi") == 0:
duration = int(varname[3])
output = calcSPI(duration, model, cid)
elif varname.startswith("sri"):
duration = int(varname[3])
output = calcSRI(duration, model, cid)
elif varname == "severity":
output = calcSeverity(model, cid)
elif varname == "smdi":
output = calcSMDI(model, cid)
elif varname == "dryspells":
output = calcDrySpells(model, cid)
return output