Note

Last update 21/07/2021

Note

If you build your own SuperflexPy element, we would appreciate if you share your implementation with the community (see Software organization and contribution). Please remember to update the List of currently implemented elements page to make other users aware of your implementation.

Expand SuperflexPy: Build customized elements

This page illustrates how to create customized elements using the SuperflexPy framework.

The examples include three elements:

  • Linear reservoir
  • Half-triangular lag function
  • Parameterized splitter

The customized elements presented here are relatively simple, in order to provide a clear illustration of the programming approach. To gain a deeper understanding of SuperflexPy functionalities, please familiarize with the code of existing elements (importing “path” superflexpy.implementation.elements).

Linear reservoir

This section presents the implementation of a linear reservoir element from the generic class ODEsElement.

_images/reservoir.png

The reservoir is controlled by the following differential equation

\[\frac{\textrm{d}S}{\textrm{d}t}=P-Q\]

with

\[Q=kS\]

Note that the differential equation can be solved analytically, and (if applied) the implicit Euler numerical approximation does not require iteration. However, we will still use the numerical approximator offered by SuperflexPy (see Numerical implementation) to illustrate how to proceed in a more general case where analytical solutions are not available.

The SuperflexPy framework provides the class ODEsElement, which has most of the methods required to implement the linear reservoir element. The class implementing the reservoir will inherit from ODEsElement and implement only a few methods needed to specify its behavior.

1
2
3
4
import numba as nb
from superflexpy.framework.element import ODEsElement

class LinearReservoir(ODEsElement):

The first method to implement is the class initializer __init__

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    def __init__(self, parameters, states, approximation, id):

        ODEsElement.__init__(self,
                             parameters=parameters,
                             states=states,
                             approximation=approximation,
                             id=id)

        self._fluxes_python = [self._fluxes_function_python]  # Used by get fluxes, regardless of the architecture

        if approximation.architecture == 'numba':
            self._fluxes = [self._fluxes_function_numba]
        elif approximation.architecture == 'python':
            self._fluxes = [self._fluxes_function_python]
        else:
            message = '{}The architecture ({}) of the approximation is not correct'.format(self._error_message,
                                                                                           approximation.architecture)
            raise ValueError(message)

In the context of SuperflexPy, the main purpose of the method __init__ (lines 9-16) is to deal with the numerical solver. In this case we can accept two architectures: pure Python or Numba. The option selected will control the method used to calculate the fluxes. Note that, since some methods of the approximator may require the Python implementation of the fluxes, the Python implementation must be always provided.

The second method to implement is set_input, which converts the (ordered) list of input fluxes to a dictionary that gives a name to these fluxes.

1
2
3
    def set_input(self, input):

        self.input = {'P': input[0]}

Note that the key used to identify the input flux must match the name of the corresponding variable in the flux functions.

The third method to implement is get_output, which runs the model and returns the output flux.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    def get_output(self, solve=True):

        if solve:
            self._solver_states = [self._states[self._prefix_states + 'S0']]
            self._solve_differential_equation()

            self.set_states({self._prefix_states + 'S0': self.state_array[-1, 0]})

        fluxes = self._num_app.get_fluxes(fluxes=self._fluxes_python,
                                          S=self.state_array,
                                          S0=self._solver_states,
                                          dt=self._dt,
                                          **self.input,
                                          **{k[len(self._prefix_parameters):]: self._parameters[k] for k in self._parameters},
                                          )
        return [- fluxes[0][1]]

The method receives, as input, the argument solve: if False, the state array of the reservoir will not be recalculated and the outputs will be computed based on the current state (e.g., computed in a previous run of the reservoir). This option is needed for post-run inspection, when we want to check the output of the reservoir without solving it again.

Line 4 transforms the states dictionary to an ordered list. Line 5 calls the built-in ODE solver. Line 7 updates the state of the model to the last value calculated. Lines 9-14 call the external numerical approximator to get the values of the fluxes. Note that, for this operation, the Python implementation of the fluxes method is always used because the vectorized operation executed by the method get_fluxes of the numerical approximator does not benefit from the Numba optimization.

The last methods to implement are _fluxes_function_python (pure Python) and _fluxes_function_numba (Numba optimized), which calculate the reservoir fluxes and their derivatives.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    @staticmethod
    def _fluxes_function_python(S, S0, ind, P, k, dt):

        if ind is None:
            return (
                [
                    P,
                    - k * S,
                ],
                0.0,
                S0 + P * dt
            )
        else:
            return (
                [
                    P[ind],
                    - k[ind] * S,
                ],
                0.0,
                S0 + P[ind] * dt[ind],
                [
                    0.0,
                    - k[ind]
                ]
            )

    @staticmethod
    @nb.jit('Tuple((UniTuple(f8, 2), f8, f8))(optional(f8), f8, i4, f8[:], f8[:], f8[:])',
            nopython=True)
    def _fluxes_function_numba(S, S0, ind, P, k, dt):

        return (
            (
                P[ind],
                - k[ind] * S,
            ),
            0.0,
            S0 + P[ind] * dt[ind],
            (
                0.0,
                - k[ind]
            )
        )

_fluxes_function_python and _fluxes_function_numba are both private static methods.

Their inputs are: the state used to compute the fluxes (S), the initial state (S0), the index to use in the arrays (ind), the input fluxes (P), and the parameters (k). All inputs are arrays and ind is used, when solving for a single time step, to indicate the time step to look for.

The output is a tuple containing three elements:

  • tuple with the computed values of the fluxes; positive sign for incoming fluxes (e.g. precipitation, P), negative sign for outgoing fluxes (e.g. streamflow, - k * S);
  • lower bound for the search of the state;
  • upper bound for the search of the state;
  • tuple with the computed values of the derivatives of fluxes w.r.t. the state S; we maintain the convention of positive sign for incoming fluxes (e.g. derivative of the precipitation, 0), negative sign for outgoing fluxes (e.g. derivative of the streamflow, - k);

The implementation for the Numba solver differs from the pure Python implementation in two aspects:

  • the usage of the Numba decorator that defines the types of input variables (lines 28-29)
  • the method works only for a single time step and not for the vectorized solution. For the vectorized solution the Python implementation (with Numpy) is considered sufficient, and hence a Numba implementation is not pursued.

Half-triangular lag function

_images/lag.png

The half-triangular lag function grows linearly until \(t_{\textrm{lag}}\) and then drops to zero, as shown in the schematic. The slope \(\alpha\) is determined from the constraint that the total area of the triangle must be equal to 1.

\[\begin{split}& f_{\textrm{lag}}=\alpha t & \quad \textrm{for }t\leq t_{\textrm{lag}}\\ & f_{\textrm{lag}}=0 & \quad \textrm{for }t>t_{\textrm{lag}}\end{split}\]

SuperflexPy provides the class LagElement that contains most of the functionalities needed to calculate the output of a lag function. The class implementing a customized lag function will inherit from LagElement, and implement only the methods needed to compute the weight array.

1
2
3
import numpy as np

class TriangularLag(LagElement):

The only method requiring implementation is the private method used to calculate the weight array.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    def _build_weight(self, lag_time):

        weight = []

        for t in lag_time:
            array_length = np.ceil(t)
            w_i = []

            for i in range(int(array_length)):
                w_i.append(self._calculate_lag_area(i + 1, t)
                           - self._calculate_lag_area(i, t))

            weight.append(np.array(w_i))

        return weight

The method _build_weight makes use of a secondary private static method

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    @staticmethod
    def _calculate_lag_area(bin, len):

        if bin <= 0:
            value = 0
        elif bin < len:
            value = (bin / len)**2
        else:
            value = 1

        return value

This method returns the area \(A_i\) of the red triangle in the figure, which has base \(t_i\) (bin). The method _build_weight uses this function to calculate the weight array \(W_i\), as the difference between \(A_i\) and \(A_{i-1}\).

Note that the method _build_weight can be implemented using other approaches, e.g., without using auxiliary methods.

Parameterized splitter

A splitter is an element that takes the flux from an upstream element and distributes it to feed multiple downstream elements. The element is controlled by parameters that define the portions of the flux that go into specific elements.

The simple case that we consider here has a single input flux that is split to two downstream elements. In this case, the splitter requires only one parameter \(\alpha_{\textrm{split}}\). The fluxes to the downstream elements are

\[\begin{split}& Q_1^{\textrm{out}} = \alpha_{\textrm{split}} Q^{\textrm{in}} \\ & Q_2^{\textrm{out}} = \left(1-\alpha_{\textrm{split}}\right) Q^{\textrm{in}}\end{split}\]

SuperflexPy provides the class ParameterizedElement, which can be extended to implement all elements that are controlled by parameters but do not have a state. The class implementing the parameterized splitter will inherit from ParameterizedElement and implement only the methods required for the new functionality.

1
2
3
from superflexpy.framework.element import ParameterizedElement

class ParameterizedSingleFluxSplitter(ParameterizedElement):

First, we define two private attributes that specify the number of upstream and downstream elements of the splitter. This information is used by the unit when constructing the model structure.

1
2
    _num_downstream = 2
    _num_upstream = 1

We then define the method that receives the inputs, and the method that calculates the outputs.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    def set_input(self, input):

        self.input = {'Q_in': input[0]}

    def get_output(self, solve=True):

        split_par = self._parameters[self._prefix_parameters + 'split-par']

        return [
            self.input['Q_in'] * split_par,
            self.input['Q_in'] * (1 - split_par)
        ]

The two methods have the same structure as the ones implemented as part of the earlier Linear reservoir example. Note that, in this case, the argument solve of the method get_output is not used, but is still required to maintain a consistent interface.