Note

Last update 21/07/2021

Application: implementation of existing conceptual models

This page describes the SuperflexPy implementation of several existing conceptual hydrological models. The “translation” of a model into SuperflexPy requires the following steps:

  1. Design of a structure that reflects the original model but satisfies the requirements of SuperflexPy (e.g. does not contain mutual interaction between elements, see Unit);
  2. Extension of the framework, coding the required elements (as explained in the page Expand SuperflexPy: Build customized elements)
  3. Construction of the model structure using the elements implemented at step 2

Model M4 from Kavetski and Fenicia, WRR, 2011

M4 is a simple lumped conceptual model presented, as part of a model comparison study, in the article

Kavetski, D., and F. Fenicia (2011), Elements of a flexible approach for conceptual hydrological modeling: 2. Application and experimental insights, WaterResour.Res.,47, W11511, doi:10.1029/2011WR010748.

Design of the model structure

M4 has a simple structure that can be implemented in SuperflexPy without using connection elements. The figure shows, on the left, the structure as presented in the original M4 publication; on the right, a schematic of the SuperflexPy implementation is shown.

_images/M4.png

The upstream element, namely, the unsaturated reservoir (UR), is intended to represent runoff generation processes (e.g. separation between evaporation and runoff). It is controlled by the differential equation

\[\begin{split}& \overline{S} = \frac{S_{\textrm{UR}}}{S_{\textrm{max}}} \\ & \frac{\textrm{d}S_{\textrm{UR}}}{\textrm{d}t} = P - E_{\textrm{P}} \left( \frac{\overline{S} \left(1+m\right)}{\overline{S} + m} \right) - P \left(\overline{S}\right)^\beta \\\end{split}\]

The downstream element, namely, the fast reservoir (FR), is intended to represent runoff propagation processes (e.g. routing). It is controlled by the differential equation

\[\begin{split}& \frac{\textrm{d}S_{\textrm{FR}}}{\textrm{d}t} = P - kS_{\textrm{FR}}^\alpha \\\end{split}\]

\(S_{\textrm{UR}}\) and \(S_{\textrm{FR}}\) are the model states, \(P\) is the precipitation input flux, \(E_{\textrm{P}}\) is the potential evapotranspiration (a model input), and \(S_{\textrm{max}}\), \(m\), \(\beta\), \(k\), \(\alpha\) are the model parameters.

Element creation

We now show how to use SuperflexPy to implement the elements described in the previous section. A detailed explanation of how to use the framework to build new elements can be found in the page Expand SuperflexPy: Build customized elements.

Note that, most of the times, when implementing a model structure with SuperflexPym the elements have already been implemented in SuperflexPy and, therefore, the modeller does not need to implement them. A list of the currently implemented elements is provided in the page List of currently implemented elements.

Unsaturated reservoir

This element can be implemented by extending the class ODEsElement.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class UnsaturatedReservoir(ODEsElement):

    def __init__(self, parameters, states, approximation, id):

        ODEsElement.__init__(self,
                             parameters=parameters,
                             states=states,
                             approximation=approximation,
                             id=id)

        self._fluxes_python = [self._fluxes_function_python]

        if approximation.architecture == 'numba':
            self._fluxes = [self._fluxes_function_numba]
        elif approximation.architecture == 'python':
            self._fluxes = [self._fluxes_function_python]

    def set_input(self, input):

        self.input = {'P': input[0],
                      'PET': input[1]}

    def get_output(self, solve=True):

        if solve:
            self._solver_states = [self._states[self._prefix_states + 'S0']]

            self._solve_differential_equation()

            # Update the state
            self.set_states({self._prefix_states + 'S0': self.state_array[-1, 0]})

        fluxes = self._num_app.get_fluxes(fluxes=self._fluxes_python,
                                          S=self.state_array,
                                          S0=self._solver_states,
                                          dt=self._dt,
                                          **self.input,
                                          **{k[len(self._prefix_parameters):]: self._parameters[k] for k in self._parameters},
                                          )
        return [-fluxes[0][2]]

    def get_AET(self):

        try:
            S = self.state_array
        except AttributeError:
            message = '{}get_aet method has to be run after running '.format(self._error_message)
            message += 'the model using the method get_output'
            raise AttributeError(message)

        fluxes = self._num_app.get_fluxes(fluxes=self._fluxes_python,
                                          S=S,
                                          S0=self._solver_states,
                                          dt=self._dt,
                                          **self.input,
                                          **{k[len(self._prefix_parameters):]: self._parameters[k] for k in self._parameters},
                                          )
        return [- fluxes[0][1]]

    # PROTECTED METHODS

    @staticmethod
    def _fluxes_function_python(S, S0, ind, P, Smax, m, beta, PET, dt):

        if ind is None:
            return (
                [
                    P,
                    - PET * ((S / Smax) * (1 + m)) / ((S / Smax) + m),
                    - P * (S / Smax)**beta,
                ],
                0.0,
                S0 + P * dt
            )
        else:
            return (
                [
                    P[ind],
                    - PET[ind] * ((S / Smax[ind]) * (1 + m[ind])) / ((S / Smax[ind]) + m[ind]),
                    - P[ind] * (S / Smax[ind])**beta[ind],
                ],
                0.0,
                S0 + P[ind] * dt[ind],
                [
                    0.0,
                    - (Ce[ind] * PET[ind] * m[ind] * (m[ind] + 1) * Smax[ind])/((S + m[ind] * Smax[ind])**2),
                    - (P[ind] * beta[ind] / Smax[ind]) * (S / Smax[ind])**(beta[ind] - 1),
                ]
            )

    @staticmethod
    @nb.jit('Tuple((UniTuple(f8, 3), f8, f8, UniTuple(f8, 3)))(optional(f8), f8, i4, f8[:], f8[:], f8[:], f8[:], f8[:], f8[:])',
            nopython=True)
    def _fluxes_function_numba(S, S0, ind, P, Smax, m, beta, PET, dt):

        return (
            (
                P[ind],
                PET[ind] * ((S / Smax[ind]) * (1 + m[ind])) / ((S / Smax[ind]) + m[ind]),
                - P[ind] * (S / Smax[ind])**beta[ind],
            ),
            0.0,
            S0 + P[ind] * dt[ind],
            (
                0.0,
                - (Ce[ind] * PET[ind] * m[ind] * (m[ind] + 1) * Smax[ind])/((S + m[ind] * Smax[ind])**2),
                - (P[ind] * beta[ind] / Smax[ind]) * (S / Smax[ind])**(beta[ind] - 1),
            )
        )

Fast reservoir

This element can be implemented by extending the class ODEsElement.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class PowerReservoir(ODEsElement):

    def __init__(self, parameters, states, approximation, id):

        ODEsElement.__init__(self,
                             parameters=parameters,
                             states=states,
                             approximation=approximation,
                             id=id)

        self._fluxes_python = [self._fluxes_function_python]  # Used by get fluxes, regardless of the architecture

        if approximation.architecture == 'numba':
            self._fluxes = [self._fluxes_function_numba]
        elif approximation.architecture == 'python':
            self._fluxes = [self._fluxes_function_python]

    # METHODS FOR THE USER

    def set_input(self, input):

        self.input = {'P': input[0]}

    def get_output(self, solve=True):

        if solve:
            self._solver_states = [self._states[self._prefix_states + 'S0']]
            self._solve_differential_equation()

            # Update the state
            self.set_states({self._prefix_states + 'S0': self.state_array[-1, 0]})

        fluxes = self._num_app.get_fluxes(fluxes=self._fluxes_python,  # I can use the python method since it is fast
                                          S=self.state_array,
                                          S0=self._solver_states,
                                          dt=self._dt,
                                          **self.input,
                                          **{k[len(self._prefix_parameters):]: self._parameters[k] for k in self._parameters},
                                          )

        return [- fluxes[0][1]]

    # PROTECTED METHODS

    @staticmethod
    def _fluxes_function_python(S, S0, ind, P, k, alpha, dt):

        if ind is None:
            return (
                [
                    P,
                    - k * S**alpha,
                ],
                0.0,
                S0 + P * dt
            )
        else:
            return (
                [
                    P[ind],
                    - k[ind] * S**alpha[ind],
                ],
                0.0,
                S0 + P[ind] * dt[ind],
                [
                    0.0,
                    - k[ind] * alpha[ind] * S**(alpha[ind] - 1)
                ]
            )

    @staticmethod
    @nb.jit('Tuple((UniTuple(f8, 2), f8, f8, UniTuple(f8, 2)))(optional(f8), f8, i4, f8[:], f8[:], f8[:], f8[:])',
            nopython=True)
    def _fluxes_function_numba(S, S0, ind, P, k, alpha, dt):

        return (
            (
                P[ind],
                - k[ind] * S**alpha[ind],
            ),
            0.0,
            S0 + P[ind] * dt[ind],
            (
                0.0,
                - k[ind] * alpha[ind] * S**(alpha[ind] - 1)
            )
        )

Model initialization

Now that all elements are implemented, they can be combined to build the model structure. For details refer to How to build a model with SuperflexPy.

First, we initialize all elements.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
root_finder = PegasusPython()
numeric_approximator = ImplicitEulerPython(root_finder=root_finder)

ur = UnsaturatedReservoir(
    parameters={'Smax': 50.0, 'Ce': 1.0, 'm': 0.01, 'beta': 2.0},
    states={'S0': 25.0},
    approximation=numeric_approximator,
    id='UR'
)

fr = PowerReservoir(
    parameters={'k': 0.1, 'alpha': 1.0},
    states={'S0': 10.0},
    approximation=numeric_approximator,
    id='FR'
)

Next, the elements can be put together to create a Unit that reflects the structure presented in the figure.

1
2
3
4
5
6
model = Unit(
    layers=[
        [ur],
        [fr]
    ],
    id='M4'

GR4J

GR4J is a widely used conceptual hydrological model introduced in the article

Perrin, C., Michel, C., and Andréassian, V.: Improvement of a parsimonious model for streamflow simulation, Journal of Hydrology, 279, 275-289, https://doi.org/10.1016/S0022-1694(03)00225-7, 2003.

The solution adopted here follows the “continuous state-space representation” presented in

Santos, L., Thirel, G., and Perrin, C.: Continuous state-space representation of a bucket-type rainfall-runoff model: a case study with the GR4 model using state-space GR4 (version 1.0), Geosci. Model Dev., 11, 1591-1605, 10.5194/gmd-11-1591-2018, 2018.

Design of the model structure

The figure shows, on the left, the model structure as presented in Perrin et al., 2003; on the right, the adaptation to SuperflexPy is shown.

_images/gr4j.png

The potential evaporation and the precipitation are “filtered” by an interception element, that calculates the net fluxes by setting the smallest to zero and the largest to the difference between the two fluxes.

\[\begin{split}& \textrm{if } P > E_{\textrm{POT}}: \\ & \quad P_{\textrm{NET}} = P -E_{\textrm{POT}} \\ & \quad E_{\textrm{NET}}=0 \\ & \textrm{else}: \\ & \quad P_{\textrm{NET}} = 0 \\ & \quad E_{\textrm{NET}}=E_{\textrm{POT}}-P \\\end{split}\]

This element is implemented in SuperflexPy using the “interception filter”.

After the interception filter, the SuperflexPy implementation starts to differ from the original. In the original implementation of GR4J, the precipitation is split between a part \(P_{\textrm{s}}\) that flows into the production store and the remaining part \(P_{\textrm{b}}\) that bypasses the reservoir. \(P_{\textrm{s}}\) and \(P_{\textrm{b}}\) are both functions of the state of the reservoir

\[\begin{split}& P_{\textrm{s}}=P_{\textrm{NET}}\left(1-\left(\frac{S_{\textrm{UR}}}{x_1}\right)^{\alpha}\right) \\ & P_{\textrm{b}}=P_{\textrm{NET}}\left(\frac{S_{\textrm{UR}}}{x_1}\right)^{\alpha} \\\end{split}\]

When we implement this part of the model in SuperflexPy, these two fluxes cannot be calculated before solving the reservoir (due to the representation of the Unit as a succession of layers).

To solve this problem, in the SuperflexPy implementation of GR4J, all precipitation (and not only \(P_{\textrm{s}}\)) flows into an element that incorporates the production store. This element takes care of dividing the precipitation internally, while solving the differential equation

\[\begin{split}& \frac{\textrm{d}S_{\textrm{UR}}}{\textrm{d}t} = P_{\textrm{NET}}\left(1-\left(\frac{S_{\textrm{UR}}}{x_1}\right)^{\alpha}\right) - E_{\textrm{NET}}\left(2\frac{S_{\textrm{UR}}}{x_1}-\left(\frac{S_{\textrm{UR}}}{x_1}\right)^\alpha\right)- \frac{x_1^{1-\beta}}{(\beta-1)} \nu^{\beta-1}S_{\textrm{UR}}^\beta \\\end{split}\]

where the first term is the precipitation \(P_s\), the second term is the actual evaporation, and the third term represents the output of the reservoir, which here corresponds to “percolation”.

Once the reservoir is solved (i.e. the values of \(S_{\textrm{UR}}\) that solve the discretized differential equation over a time step are found), the element outputs the sum of percolation and bypassing precipitation (\(P_b\)).

The flux is then divided between two lag functions, referred to as “unit hydrographs” and abbreviated UH: 90% of the flux goes to UH1 and 10% goes to UH2. In this part of the model structure the correspondence between the elements of GR4J and their SuperflexPy implementation is quite clear.

The output of UH1 provides the input of the routing store, which is a reservoir controlled by the differential equation

\[\begin{split}& \frac{\textrm{d}S_{\textrm{RR}}}{\textrm{d}t}=Q_{\textrm{UH1}} - \frac{x_3^{1-\gamma}}{(\gamma-1)}S_{\textrm{RR}}^\gamma- \frac{x_2}{x_3^\omega}S_{\textrm{RR}}^\omega\\\end{split}\]

where the second term is the output of the reservoir and the last is a gain/loss term (called \(Q_{\textrm{RF}}\)).

The gain/loss term \(Q_{\textrm{RF}}\), which is a function of the state \(S_{\textrm{RR}}\) of the reservoir, is also subtracted from the output of UH2. In SuperflexPy, this operation cannot be done in the same unit layer as the solution of the routing store, and instead it is done afterwards. For this reason, the SuperflexPy implementation of GR4J has an additional element (called “flux aggregator”) that uses a junction element to combine the output of the routing store, the gain/loss term, and the output of UH2. The flux aggregator then computes the outflow of the model using the equation

\[\begin{split}& Q = Q_{\textrm{RR}} + \max(0;Q_{\textrm{UH2}} - Q_{\textrm{RF}}) \\\end{split}\]

Elements creation

We now show how to use SuperflexPy to implement the elements described in the previous section. A detailed explanation of how to use the framework to build new elements can be found in the page Expand SuperflexPy: Build customized elements.

Note that, most of the times, when implementing a model structure with SuperflexPym the elements have already been implemented in SuperflexPy and, therefore, the modeller does not need to implement them. A list of the currently implemented elements is provided in the page List of currently implemented elements.

Interception

The interception filter can be implemented by extending the class BaseElement

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class InterceptionFilter(BaseElement):

    _num_upstream = 1
    _num_downstream = 1

    def set_input(self, input):

        self.input = {}
        self.input['PET'] = input[0]
        self.input['P'] = input[1]

    def get_output(self, solve=True):

        remove = np.minimum(self.input['PET'], self.input['P'])

        return [self.input['PET'] - remove, self.input['P'] - remove]

Production store

The production store is controlled by a differential equation and, therefore, can be constructed by extending the class ODEsElement

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class ProductionStore(ODEsElement):

    def __init__(self, parameters, states, approximation, id):

        ODEsElement.__init__(self,
                             parameters=parameters,
                             states=states,
                             approximation=approximation,
                             id=id)

        self._fluxes_python = [self._flux_function_python]

        if approximation.architecture == 'numba':
            self._fluxes = [self._flux_function_numba]
        elif approximation.architecture == 'python':
            self._fluxes = [self._flux_function_python]

    def set_input(self, input):

        self.input = {}
        self.input['PET'] = input[0]
        self.input['P'] = input[1]

    def get_output(self, solve=True):

        if solve:
            # Solve the differential equation
            self._solver_states = [self._states[self._prefix_states + 'S0']]
            self._solve_differential_equation()

            # Update the states
            self.set_states({self._prefix_states + 'S0': self.state_array[-1, 0]})

        fluxes = self._num_app.get_fluxes(fluxes=self._fluxes_python,
                                          S=self.state_array,
                                          S0=self._solver_states,
                                          dt=self._dt,
                                          **self.input,
                                          **{k[len(self._prefix_parameters):]: self._parameters[k] for k in self._parameters},
                                          )

        Pn_minus_Ps = self.input['P'] - fluxes[0][0]
        Perc = - fluxes[0][2]
        return [Pn_minus_Ps + Perc]

    def get_aet(self):

        try:
            S = self.state_array
        except AttributeError:
            message = '{}get_aet method has to be run after running '.format(self._error_message)
            message += 'the model using the method get_output'
            raise AttributeError(message)

        fluxes = self._num_app.get_fluxes(fluxes=self._fluxes_python,
                                          S=S,
                                          S0=self._solver_states,
                                          dt=self._dt,
                                          **self.input,
                                          **{k[len(self._prefix_parameters):]: self._parameters[k] for k in self._parameters},
                                          )

        return [- fluxes[0][1]]

    @staticmethod
    def _flux_function_python(S, S0, ind, P, x1, alpha, beta, ni, PET, dt):

        if ind is None:
            return(
                [
                    P * (1 - (S / x1)**alpha),  # Ps
                    - PET * (2 * (S / x1) - (S / x1)**alpha),  # Evaporation
                    - ((x1**(1 - beta)) / ((beta - 1))) * (ni**(beta - 1)) * (S**beta)  # Perc
                ],
                0.0,
                S0 + P * (1 - (S / x1)**alpha) * dt
            )
        else:
            return(
                [
                    P[ind] * (1 - (S / x1[ind])**alpha[ind]),  # Ps
                    - PET[ind] * (2 * (S / x1[ind]) - (S / x1[ind])**alpha[ind]),  # Evaporation
                    - ((x1[ind]**(1 - beta[ind])) / ((beta[ind] - 1))) * (ni[ind]**(beta[ind] - 1)) * (S**beta[ind])  # Perc
                ],
                0.0,
                S0 + P[ind] * (1 - (S / x1[ind])**alpha[ind]) * dt[ind],
                [
                    - (P[ind] * alpha[ind] / x1[ind]) * ((S / x1[ind])**(alpha[ind] - 1)),
                    - (PET[ind] / x1[ind]) * (2 - alpha[ind] * ((S / x1[ind])**(alpha[ind] - 1))),
                    - beta[ind] * ((x1[ind]**(1 - beta[ind])) / ((beta[ind] - 1) * dt[ind])) * (ni[ind]**(beta[ind] - 1)) * (S**(beta[ind] - 1))
                ]
            )

    @staticmethod
    @nb.jit('Tuple((UniTuple(f8, 3), f8, f8, UniTuple(f8, 3)))(optional(f8), f8, i4, f8[:], f8[:], f8[:], f8[:], f8[:], f8[:], f8[:])',
            nopython=True)
    def _flux_function_numba(S, S0, ind, P, x1, alpha, beta, ni, PET, dt):

        return(
            (
                P[ind] * (1 - (S / x1[ind])**alpha[ind]),  # Ps
                - PET[ind] * (2 * (S / x1[ind]) - (S / x1[ind])**alpha[ind]),  # Evaporation
                - ((x1[ind]**(1 - beta[ind])) / ((beta[ind] - 1))) * (ni[ind]**(beta[ind] - 1)) * (S**beta[ind])  # Perc
            ),
            0.0,
            S0 + P[ind] * (1 - (S / x1[ind])**alpha[ind]) * dt[ind],
            (
                - (P[ind] * alpha[ind] / x1[ind]) * ((S / x1[ind])**(alpha[ind] - 1)),
                - (PET[ind] / x1[ind]) * (2 - alpha[ind] * ((S / x1[ind])**(alpha[ind] - 1))),
                - beta[ind] * ((x1[ind]**(1 - beta[ind])) / ((beta[ind] - 1) * dt[ind])) * (ni[ind]**(beta[ind] - 1)) * (S**(beta[ind] - 1))
            )
        )

Unit hydrographs

The unit hydrographs are an extension of the LagElement, and can be implemented as follows

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class UnitHydrograph1(LagElement):

    def __init__(self, parameters, states, id):

        LagElement.__init__(self, parameters, states, id)

    def _build_weight(self, lag_time):

        weight = []

        for t in lag_time:
            array_length = np.ceil(t)
            w_i = []
            for i in range(int(array_length)):
                w_i.append(self._calculate_lag_area(i + 1, t)
                           - self._calculate_lag_area(i, t))
            weight.append(np.array(w_i))

        return weight

    @staticmethod
    def _calculate_lag_area(bin, len):
        if bin <= 0:
            value = 0
        elif bin < len:
            value = (bin / len)**2.5
        else:
            value = 1
        return value
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class UnitHydrograph2(LagElement):

    def __init__(self, parameters, states, id):

        LagElement.__init__(self, parameters, states, id)

    def _build_weight(self, lag_time):

        weight = []

        for t in lag_time:
            array_length = np.ceil(t)
            w_i = []
            for i in range(int(array_length)):
                w_i.append(self._calculate_lag_area(i + 1, t)
                           - self._calculate_lag_area(i, t))
            weight.append(np.array(w_i))

        return weight

    @staticmethod
    def _calculate_lag_area(bin, len):
        half_len = len / 2
        if bin <= 0:
            value = 0
        elif bin < half_len:
            value = 0.5 * (bin / half_len)**2.5
        elif bin < len:
            value = 1 - 0.5 * (2 - bin / half_len)**2.5
        else:
            value = 1
        return value

Routing store

The routing store is an ODEsElement

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class RoutingStore(ODEsElement):

    def __init__(self, parameters, states, approximation, id):

        ODEsElement.__init__(self,
                             parameters=parameters,
                             states=states,
                             approximation=approximation,
                             id=id)

        self._fluxes_python = [self._flux_function_python]

        if approximation.architecture == 'numba':
            self._fluxes = [self._flux_function_numba]
        elif approximation.architecture == 'python':
            self._fluxes = [self._flux_function_python]

    def set_input(self, input):

        self.input = {}
        self.input['P'] = input[0]

    def get_output(self, solve=True):

        if solve:
            # Solve the differential equation
            self._solver_states = [self._states[self._prefix_states + 'S0']]
            self._solve_differential_equation()

            # Update the states
            self.set_states({self._prefix_states + 'S0': self.state_array[-1, 0]})

        fluxes = self._num_app.get_fluxes(fluxes=self._fluxes_python,
                                          S=self.state_array,
                                          S0=self._solver_states,
                                          dt=self._dt,
                                          **self.input,
                                          **{k[len(self._prefix_parameters):]: self._parameters[k] for k in self._parameters},
                                          )

        Qr = - fluxes[0][1]
        F = -fluxes[0][2]

        return [Qr, F]

    @staticmethod
    def _flux_function_python(S, S0, ind, P, x2, x3, gamma, omega, dt):

        if ind is None:
            return(
                [
                    P,  # P
                    - ((x3**(1 - gamma)) / ((gamma - 1))) * (S**gamma),  # Qr
                    - (x2 * (S / x3)**omega),  # F
                ],
                0.0,
                S0 + P * dt
            )
        else:
            return(
                [
                    P[ind],  # P
                    - ((x3[ind]**(1 - gamma[ind])) / ((gamma[ind] - 1))) * (S**gamma[ind]),  # Qr
                    - (x2[ind] * (S / x3[ind])**omega[ind]),  # F
                ],
                0.0,
                S0 + P[ind] * dt[ind],
                [
                    0.0,
                    - ((x3[ind]**(1 - gamma[ind])) / ((gamma[ind] - 1) * dt[ind])) * (S**(gamma[ind] - 1)) * gamma[ind],
                    - (omega[ind] * x2[ind] * ((S / x3[ind])**(omega[ind] - 1)))
                ]
            )

    @staticmethod
    @nb.jit('Tuple((UniTuple(f8, 3), f8, f8, UniTuple(f8, 3)))(optional(f8), f8, i4, f8[:], f8[:], f8[:], f8[:], f8[:], f8[:])',
            nopython=True)
    def _flux_function_numba(S, S0, ind, P, x2, x3, gamma, omega, dt):

        return(
            (
                P[ind],  # P
                - ((x3[ind]**(1 - gamma[ind])) / ((gamma[ind] - 1))) * (S**gamma[ind]),  # Qr
                - (x2[ind] * (S / x3[ind])**omega[ind]),  # F
            ),
            0.0,
            S0 + P[ind] * dt[ind],
            (
                0.0,
                - ((x3[ind]**(1 - gamma[ind])) / ((gamma[ind] - 1) * dt[ind])) * (S**(gamma[ind] - 1)) * gamma[ind],
                - (omega[ind] * x2[ind] * ((S / x3[ind])**(omega[ind] - 1)))
            )
        )

Flux aggregator

The flux aggregator can be implemented by extending a BaseElement

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class FluxAggregator(BaseElement):

    _num_downstream = 1
    _num_upstream = 1

    def set_input(self, input):

        self.input = {}
        self.input['Qr'] = input[0]
        self.input['F'] = input[1]
        self.input['Q2_out'] = input[2]

    def get_output(self, solve=True):

        return [self.input['Qr']
                + np.maximum(0, self.input['Q2_out'] - self.input['F'])]

Model initialization

Now that all elements are implemented, we can combine them to build the model structure. For details refer to How to build a model with SuperflexPy.

First, we initialize all elements.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
x1, x2, x3, x4 = (50.0, 0.1, 20.0, 3.5)

root_finder = PegasusPython()  # Use the default parameters
numerical_approximation = ImplicitEulerPython(root_finder)

interception_filter = InterceptionFilter(id='ir')

production_store = ProductionStore(parameters={'x1': x1, 'alpha': 2.0,
                                               'beta': 5.0, 'ni': 4/9},
                                   states={'S0': 10.0},
                                   approximation=numerical_approximation,
                                   id='ps')

splitter = Splitter(weight=[[0.9], [0.1]],
                    direction=[[0], [0]],
                    id='spl')

unit_hydrograph_1 = UnitHydrograph1(parameters={'lag-time': x4},
                                    states={'lag': None},
                                    id='uh1')

unit_hydrograph_2 = UnitHydrograph2(parameters={'lag-time': 2*x4},
                                    states={'lag': None},
                                    id='uh2')

routing_store = RoutingStore(parameters={'x2': x2, 'x3': x3,
                                         'gamma': 5.0, 'omega': 3.5},
                             states={'S0': 10.0},
                             approximation=numerical_approximation,
                             id='rs')

transparent = Transparent(id='tr')

junction = Junction(direction=[[0, None],  # First output
                               [1, None],  # Second output
                               [None, 0]], # Third output
                    id='jun')

flux_aggregator = FluxAggregator(id='fa')

The elements are then put together to define a Unit that reflects the GR4J structure presented in the figure.

1
2
3
4
5
6
7
8
model = Unit(layers=[[interception_filter],
                     [production_store],
                     [splitter],
                     [unit_hydrograph_1, unit_hydrograph_2],
                     [routing_store, transparent],
                     [junction],
                     [flux_aggregator]],
             id='model')

HYMOD

HYMOD is another widely used conceptual hydrological model. It was first published in

Boyle, D. P. (2001). Multicriteria calibration of hydrologic models, The University of Arizona. Link

The solution proposed here follows the model structure presented in

Wagener, T., Boyle, D. P., Lees, M. J., Wheater, H. S., Gupta, H. V., and Sorooshian, S.: A framework for development and application of hydrological models, Hydrol. Earth Syst. Sci., 5, 13–26, https://doi.org/10.5194/hess-5-13-2001, 2001.

Design of the structure

_images/hymod.png

HYMOD comprises three groups of reservoirs intended to represent, respectively, the upper zone (soil dynamics), channel routing (surface runoff), and lower zone (subsurface flow).

As can be seen in the figure, the original structure of HYMOD already meets the design constrains of SuperflexPy (it does not contains feedbacks between elements). Therefore the SuperflexPy implementation of HYMOD is more straightforward than for GR4J.

The upper zone is a reservoir intended to represent streamflow generation processes and evaporation. It is controlled by the differential equation

\[\begin{split}& \overline{S} = \frac{S_{\textrm{UR}}}{S_{\textrm{max}}} \\ & \frac{\textrm{d}S_{\textrm{UR}}}{\textrm{d}t} = P - E - P \left(1 - \left(1-\overline{S}\right)^\beta\right) \\\end{split}\]

where the first term is the precipitation input, the second term is the actual evaporation (which is equal to the potential evaporation as long as there is sufficient storage in the reservoir), and the third term is the outflow from the reservoir.

The outflow from the reservoir is then split between the channel routing (3 reservoirs) and the lower zone (1 reservoir). All these elements are represented by linear reservoirs controlled by the differential equation

\[\begin{split}& \frac{\textrm{d}S}{\textrm{d}t} = P - kS \\\end{split}\]

where the first term is the input (here, the outflow from the upstream element) and the second term represents the outflow from the reservoir.

Channel routing and lower zone differ from each other in the number of reservoirs used (3 in the first case and 1 in the second), and in the value of the parameter \(k\), which controls the outflow rate. Based on intended model operation, \(k\) should have a larger value for channel routing because this element is intended to represent faster processes.

The outputs of these two flowpaths are collected by a junction, which generates the final model output.

Comparing the two panels in the figure, the only difference is the presence of the two transparent elements that are needed to fill the “gaps” in the SuperflexPy structure (see Unit).

Elements creation

We now show how to use SuperflexPy to implement the elements described in the previous section. A detailed explanation of how to use the framework to build new elements can be found in the page Expand SuperflexPy: Build customized elements.

Note that, most of the times, when implementing a model structure with SuperflexPym the elements have already been implemented in SuperflexPy and, therefore, the modeller does not need to implement them. A list of the currently implemented elements is provided in the page List of currently implemented elements.

Upper zone

The code used to simulate the upper zone present a change in the equation used to calculate the actual evaporation. In the original version (Wagener et al., 1) the equation is “described” in the text

The actual evapotranspiration is equal to the potential value if sufficient soil moisture is available; otherwise it is equal to the available soil moisture content.

which translates to the equation

\[\begin{split}& \textrm{if } S > 0: \\ & \quad E = E_{\textrm{POT}} \\ & \textrm{else}: \\ & \quad E=0 \\\end{split}\]

Note that this solution is not smooth and the resulting sharp threshold can cause problematic discontinuities in the model behavior. A smooth version of this equation is given by

\[\begin{split}& \overline{S} = \frac{S_{\textrm{UR}}}{S_{\textrm{max}}} \\ & E=E_{\textrm{POT}}\left( \frac{\overline{S}(1+m)}{\overline{S}+m} \right)\\\end{split}\]

The upper zone reservoir can be implemented by extending the class ODEsElement.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class UpperZone(ODEsElement):

    def __init__(self, parameters, states, approximation, id):

        ODEsElement.__init__(self,
                             parameters=parameters,
                             states=states,
                             approximation=approximation,
                             id=id)

        self._fluxes_python = [self._fluxes_function_python]

        if approximation.architecture == 'numba':
            self._fluxes = [self._fluxes_function_numba]
        elif approximation.architecture == 'python':
            self._fluxes = [self._fluxes_function_python]

    def set_input(self, input):

        self.input = {'P': input[0],
                      'PET': input[1]}

    def get_output(self, solve=True):

        if solve:
            self._solver_states = [self._states[self._prefix_states + 'S0']]

            self._solve_differential_equation()

            # Update the state
            self.set_states({self._prefix_states + 'S0': self.state_array[-1, 0]})

        fluxes = self._num_app.get_fluxes(fluxes=self._fluxes_python,
                                          S=self.state_array,
                                          S0=self._solver_states,
                                          dt=self._dt,
                                          **self.input,
                                          **{k[len(self._prefix_parameters):]: self._parameters[k] for k in self._parameters},
                                          )
        return [-fluxes[0][2]]

    def get_AET(self):

        try:
            S = self.state_array
        except AttributeError:
            message = '{}get_aet method has to be run after running '.format(self._error_message)
            message += 'the model using the method get_output'
            raise AttributeError(message)

        fluxes = self._num_app.get_fluxes(fluxes=self._fluxes_python,
                                          S=S,
                                          S0=self._solver_states,
                                          dt=self._dt,
                                          **self.input,
                                          **{k[len(self._prefix_parameters):]: self._parameters[k] for k in self._parameters},
                                          )
        return [- fluxes[0][1]]

    # PROTECTED METHODS

    @staticmethod
    def _fluxes_function_python(S, S0, ind, P, Smax, m, beta, PET, dt):

        if ind is None:
            return (
                [
                    P,
                    - PET * ((S / Smax) * (1 + m)) / ((S / Smax) + m),
                    - P * (1 - (1 - (S / Smax))**beta),
                ],
                0.0,
                S0 + P * dt
            )
        else:
            return (
                [
                    P[ind],
                    - PET[ind] * ((S / Smax[ind]) * (1 + m[ind])) / ((S / Smax[ind]) + m[ind]),
                    - P[ind] * (1 - (1 - (S / Smax[ind]))**beta[ind]),
                ],
                0.0,
                S0 + P[ind] * dt[ind],
                [
                    0.0,
                    - PET[ind] * m[ind] * Smax[ind] * (1 + m[ind]) / ((S + Smax[ind] * m[ind])**2),
                    - P[ind] * beta[ind] * ((1 - (S / Smax[ind]))**(beta[ind] - 1)) / Smax[ind],
                ]
            )

    @staticmethod
    @nb.jit('Tuple((UniTuple(f8, 3), f8, f8, UniTuple(f8, 3)))(optional(f8), f8, i4, f8[:], f8[:], f8[:], f8[:], f8[:], f8[:])',
            nopython=True)
    def _fluxes_function_numba(S, S0, ind, P, Smax, m, beta, PET, dt):

        return (
            (
                P[ind],
                - PET[ind] * ((S / Smax[ind]) * (1 + m[ind])) / ((S / Smax[ind]) + m[ind]),
                - P[ind] * (1 - (1 - (S / Smax[ind]))**beta[ind]),
            ),
            0.0,
            S0 + P[ind] * dt[ind],
            (
                0.0,
                - PET[ind] * m[ind] * Smax[ind] * (1 + m[ind]) / ((S + Smax[ind] * m[ind])**2),
                - P[ind] * beta[ind] * ((1 - (S / Smax[ind]))**(beta[ind] - 1)) / Smax[ind],
            )
        )

Channel routing and lower zone

The elements representing channel routing and lower zone are all linear reservoirs that can be implemented by extending the class ODEsElement.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class LinearReservoir(ODEsElement):

    def __init__(self, parameters, states, approximation, id):

        ODEsElement.__init__(self,
                             parameters=parameters,
                             states=states,
                             approximation=approximation,
                             id=id)

        self._fluxes_python = [self._fluxes_function_python]  # Used by get fluxes, regardless of the architecture

        if approximation.architecture == 'numba':
            self._fluxes = [self._fluxes_function_numba]
        elif approximation.architecture == 'python':
            self._fluxes = [self._fluxes_function_python]

    # METHODS FOR THE USER

    def set_input(self, input):

        self.input = {'P': input[0]}

    def get_output(self, solve=True):

        if solve:
            self._solver_states = [self._states[self._prefix_states + 'S0']]
            self._solve_differential_equation()

            # Update the state
            self.set_states({self._prefix_states + 'S0': self.state_array[-1, 0]})

        fluxes = self._num_app.get_fluxes(fluxes=self._fluxes_python,  # I can use the python method since it is fast
                                          S=self.state_array,
                                          S0=self._solver_states,
                                          dt=self._dt,
                                          **self.input,
                                          **{k[len(self._prefix_parameters):]: self._parameters[k] for k in self._parameters},
                                          )
        return [- fluxes[0][1]]

    # PROTECTED METHODS

    @staticmethod
    def _fluxes_function_python(S, S0, ind, P, k, dt):

        if ind is None:
            return (
                [
                    P,
                    - k * S,
                ],
                0.0,
                S0 + P * dt
            )
        else:
            return (
                [
                    P[ind],
                    - k[ind] * S,
                ],
                0.0,
                S0 + P[ind] * dt[ind],
                [
                    0.0,
                    - k[ind]
                ]
            )

    @staticmethod
    @nb.jit('Tuple((UniTuple(f8, 2), f8, f8, UniTuple(f8, 2)))(optional(f8), f8, i4, f8[:], f8[:], f8[:])',
            nopython=True)
    def _fluxes_function_numba(S, S0, ind, P, k, dt):

        return (
            (
                P[ind],
                - k[ind] * S,
            ),
            0.0,
            S0 + P[ind] * dt[ind],
            (
                0.0,
                - k[ind]
            )
        )

Model initialization

Now that all elements are implemented, we can combine them to build the HYMOD model structure. For details refer to How to build a model with SuperflexPy.

First, we initialize all elements.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
root_finder = PegasusPython()  # Use the default parameters
numerical_approximation = ImplicitEulerPython(root_finder)

upper_zone = UpperZone(parameters={'Smax': 50.0, 'm': 0.01, 'beta': 2.0},
                       states={'S0': 10.0},
                       approximation=numerical_approximation,
                       id='uz')

splitter = Splitter(weight=[[0.6], [0.4]],
                    direction=[[0], [0]],
                    id='spl')

channel_routing_1 = LinearReservoir(parameters={'k': 0.1},
                                    states={'S0': 10.0},
                                    approximation=numerical_approximation,
                                    id='cr1')

channel_routing_2 = LinearReservoir(parameters={'k': 0.1},
                                    states={'S0': 10.0},
                                    approximation=numerical_approximation,
                                    id='cr2')

channel_routing_3 = LinearReservoir(parameters={'k': 0.1},
                                    states={'S0': 10.0},
                                    approximation=numerical_approximation,
                                    id='cr3')

lower_zone = LinearReservoir(parameters={'k': 0.1},
                             states={'S0': 10.0},
                             approximation=numerical_approximation,
                             id='lz')

transparent_1 = Transparent(id='tr1')

transparent_2 = Transparent(id='tr2')

junction = Junction(direction=[[0, 0]],  # First output
                    id='jun')

The elements are now combined to define a Unit that reflects the structure shown in the figure.

1
2
3
4
5
6
7
model = Unit(layers=[[upper_zone],
                     [splitter],
                     [channel_routing_1, lower_zone],
                     [channel_routing_2, transparent_1],
                     [channel_routing_3, transparent_2],
                     [junction]],
             id='model')