# Function Pipelines: Building Functional Programming Into PostgreSQL Using Custom Operators

**Today, we are announcing ****function pipelines****, a new capability that introduces functional programming concepts inside PostgreSQL (and SQL) using custom operators.**

*Function pipelines* radically improve the developer ergonomics of analyzing data in PostgreSQL and SQL, by applying principles from functional programming and popular tools like Python’s Pandas and PromQL.

At Timescale our mission is to serve developers worldwide, and enable them to build exceptional data-driven products that measure everything that matters: e.g., software applications, industrial equipment, financial markets, blockchain activity, user actions, consumer behavior, machine learning models, climate change, and more.

We believe SQL is the best language for data analysis. We’ve championed the benefits of SQL for several years, even back when many were abandoning the language for custom domain-specific languages. And we were right - SQL has resurged and become the universal language for data analysis, and now many NoSQL databases are adding SQL interfaces to keep up.

But SQL is not perfect, and at times can get quite unwieldy. For example,

```
SELECT device_id,
sum(abs_delta) as volatility
FROM (
SELECT device_id,
abs(val - lag(val) OVER (PARTITION BY device_id ORDER BY ts))
as abs_delta
FROM measurements
WHERE ts >= now() - '1 day'::interval) calc_delta
GROUP BY device_id;
```

*Pop quiz: What does this query do?*

Even if you are a SQL expert, queries like this can be quite difficult to read - and even harder to express. Complex data analysis in SQL can be hard.

*Function pipelines* let you express that same query like this:

```
SELECT device_id,
timevector(ts, val) -> sort() -> delta() -> abs() -> sum()
as volatility
FROM measurements
WHERE ts >= now() - '1 day'::interval
GROUP BY device_id;
```

Now it is much clearer what this query is doing. It:

- Gets the last day’s data from the measurements table, grouped by
`device_id`

- Sorts the data by the time column
- Calculates the delta (or change) between values
- Takes the absolute value of the delta
- And then takes the sum of the result of the previous steps

**Function pipelines improve your own coding productivity, while also making your SQL code easier for others to comprehend and maintain.**

Inspired by functional programming languages, function pipelines enable you to analyze data by composing multiple functions, leading to a simpler, cleaner way of expressing complex logic in PostgreSQL.

And the best part: we built function pipelines in a way that is fully PostgreSQL compliant - we did not change any SQL syntax - meaning that any tool that speaks PostgreSQL will be able to support data analysis using function pipelines.

How did we build this? By taking advantage of the incredible extensibility of PostgreSQL, in particular: custom types, custom operators, and custom functions.

In our previous example, you can see the key elements of function pipelines:

: in this case, the**Custom data types**`timevector`

, which is a set of`(time, value)`

pairs.:**Custom operator**`->`

, used to*compose*and*apply*function pipeline elements to the data that comes in.- And finally,
**custom functions**; called*pipeline elements.*Pipeline elements can transform and analyze`timevector`

s (or other data types) in a function pipeline. For this initial release, we’ve built*60 custom functions!***(Full list here)**.

We’ll go into more detail on function pipelines in the rest of this post, but if you just want to get started as soon as possible, **the easiest way to try function pipelines is through a fully managed Timescale Cloud service**. Try it for free (no credit card required) for 30 days.

Function pipelines are pre-loaded on each new database service on Timescale Cloud, available immediately - so after you’ve created a new service, you’re all set to use them!

If you prefer to manage your own database instances, you can install the `timescaledb_toolkit`

into your existing PostgreSQL installation, completely for free.

We’ve been working on this capability for a long time, but in line with our belief of “move fast but don’t break things”, we’re initially releasing function pipelines as an experimental feature - and we would absolutely love to **get your feedback**. You can open an issue or join a discussion thread in GitHub (And, if you like what you see, GitHub ⭐ are always welcome and appreciated too!).

*We’d also like to take this opportunity to give a huge shoutout to pgx, the Rust-based framework for building PostgreSQL extensions - it handles a lot of the heavy lifting for this project. We have over 600 custom types, operators, and functions in the timescaledb_toolkit extension at this point; managing this without pgx (and the ease of use that comes from working with Rust) would be a real bear of a job.*

## Function pipelines: why are they useful?

It’s October. In the Northern hemisphere (where most of Team Timescale sits, including your authors), it is starting to get cold.

Now imagine a restaurant in New York City whose owners care about their customers and their customers’ comfort. And you are working on an IoT product designed to help small businesses like these owners minimize their heating bill while maximizing their customers happiness. So you install two thermometers, one at the front measuring the temperature right by the door, and another at the back of the restaurant.

Now, as many of you may know (if you’ve ever had to sit by the door of a restaurant in the fall or winter), when someone enters, the temperature drops - and once the door is closed, the temperature warms back up. The temperature at the back of the restaurant will vary much less than at the front, right by the door. And both of them will drop slowly down to a lower set point during non-business hours and warm back up sometime before business hours based on the setpoints on our thermostat. So overall we’ll end up with a graph that looks something like this:

As we can see, the temperature by the front door varies much more than at the back of the restaurant. Another way to say this is the temperature by the front door is more *volatile*. Now, the owners of this restaurant want to measure this because frequent temperature changes means uncomfortable customers.

In order to measure volatility, we could first subtract each point from the point before to calculate a delta. If we add this up directly, large positive and negative deltas will cancel out. But, we only care about the magnitude of the delta, not its sign - so what we really should do is take the absolute value of the delta, and then take the total sum of the previous steps.

We now have a metric that might help us measure customer comfort, and also the efficacy of different weatherproofing methods (for example, adding one of those little vestibules that acts as a windbreak).

To track this, we collect measurements from our thermometers and store them in a table:

```
CREATE TABLE measurements(
device_id BIGINT,
ts TIMESTAMPTZ,
val DOUBLE PRECISION
);
```

The `device_id`

identifies the thermostat, `ts`

the time of reading and `val`

the temperature.

Using the data in our measurements table, let’s look at how we calculate volatility using function pipelines.

*Note: because all of the function pipeline features are still experimental, they exist in the toolkit_experimental schema. Before running any of the SQL code in this post you will need to set your search_path to include the experimental schema as we do in the example below, we won’t repeat this throughout the post so as not to distract.*

```
set search_path to toolkit_experimental, public; --still experimental, so do this to make it easier to read
SELECT device_id,
timevector(ts, val) -> sort() -> delta() -> abs() -> sum()
as volatility
FROM measurements
WHERE ts >= now()-'1 day'::interval
GROUP BY device_id;
```

And now we have the same query that we used as our example in the introduction.

In this query, the function pipeline `timevector(ts, val) -> sort() -> delta() -> abs() -> sum()`

succinctly expresses the following operations:

- Create
`timevector`

s (more detail on this later) out of the`ts`

and`val`

columns - Sort each
`timevector`

by the time column - Calculate the delta (or change) between each pair in the
`timevector`

by subtracting the previous`val`

from the current - Take the absolute value of the delta
- Take the sum of the result from the previous steps

The `FROM`

, `WHERE`

and `GROUP BY`

clauses do the rest of the work telling us:

- We’re getting data
*FROM*the`measurements`

table *WHERE*the ts, or timestamp column, contains values over the last day- Showing one pipeline output per
`device_id`

(the GROUP BY column)

As we noted before, if you were to do this same calculation using SQL and PostgreSQL functionality, your query would look like this:

```
SELECT device_id,
sum(abs_delta) as volatility
FROM (
SELECT
abs(val - lag(val) OVER (PARTITION BY device_id ORDER BY ts) )
as abs_delta
FROM measurements
WHERE ts >= now() - '1 day'::interval) calc_delta
GROUP BY device_id;
```

This does the same 5 steps as the above, but is much harder to understand, because we have to use a window function and aggregate the results - but also, because aggregates are performed before window functions, we need to actually execute the window function in a subquery.

As we can see, function pipelines make it significantly easier to comprehend the overall analysis of our data. There’s no need to completely understand what’s going on in these functions just yet, but for now it’s enough to understand that we’ve essentially implemented a small functional programming language inside of PostgreSQL. You can still use all of the normal, expressive SQL you’ve come to know and love. Function pipelines just add new tools to your SQL toolbox that make it easier to work with time-series data.

Some avid SQL users might find the syntax a bit foreign at first, but for many people who work in other programming languages, especially using tools like Python’s Pandas Package, this type of successive operation on data sets will feel natural.

And again, this is still fully PostgreSQL compliant: We introduce no changes to the parser or anything that should break compatibility with PostgreSQL drivers.

## How we built function pipelines without forking PostgreSQL

We built function pipelines- without modifying the parser or anything that would require a fork of PostgreSQL- by taking advantage of three of the many ways that PostgreSQL enables extensibility: custom types, custom functions, and custom operators.

, starting with the**Custom data types**`timevector`

, which is a set of`(time, value)`

pairs:**A****custom operator**`->`

, which is used to*compose*and*apply*function pipeline elements to the data that comes in., called**Custom functions***pipeline elements*, which can transform and analyze`timevector`

s (or other data types) in a function pipeline (with 60 functions in this initial release)

We believe that new idioms like these are exactly what PostgreSQL was *meant to enable*. That’s why it has supported custom types, functions and operators from its earliest days. (And is one of the many reasons why we love PostgreSQL.)

## A custom data type: the `timevector`

A `timevector`

is a collection of `(time, value)`

pairs. As of now, the times must be `TIMESTAMPTZ`

s and the values must be `DOUBLE PRECISION`

numbers. (But this may change in the future as we continue to develop this data type. If you have ideas/input, please file feature requests on GitHub explaining what you’d like!)

You can think of the `timevector`

as something like this:

One of the first questions you might ask is: how does a `timevector`

relate to time-series data? (If you want to know more about time-series data, we have a great blog post on that).

Let’s consider our example from above, where we were talking about a restaurant that was measuring temperatures, and we had a `measurements`

table like so:

```
CREATE TABLE measurements(
device_id BIGINT,
ts TIMESTAMPTZ,
val DOUBLE PRECISION
);
```

In this example, we can think of a single time-series dataset as all historical and future time and temperature measurements from a device.

Given this definition, we can think of a `timevector`

as a **finite subset of a time-series dataset**. The larger time-series dataset may extend back into the past and it may extend into the future, but the `timevector`

is bounded.

In order to construct a `timevector`

from the data gathered from a thermometer, we use a custom aggregate and pass in the columns we want to become our `(time, value)`

pairs. We can use the `WHERE`

clause to define the extent of the `timevector`

(i.e., the limits of this subset), and the `GROUP BY`

clause to provide identifying information about the time-series that’s represented.

Building on our example, this is how we construct a `timevector`

for each thermometer in our dataset:

```
SELECT device_id,
timevector(ts, val)
FROM measurements
WHERE ts >= now() - '1 day'::interval
GROUP BY device_id;
```

But a `timevector`

doesn't provide much value by itself. So now, let’s also consider some complex calculations that we can apply to the `timevector`

, starting with a custom operator used to apply these functions

## A custom operator: `->`

In function pipelines, the `->`

operator is used to apply and compose multiple functions, in an easy to write and read format.

Fundamentally, `->`

means: “apply the operation on the right to the inputs on the left”, or, more simply “do the next thing”.

We created a general-purpose operator for this because we think that too many operators meaning different things can get very confusing and difficult to read.

One thing that you’ll notice about the pipeline elements is that the arguments are in an unusual place in a statement like:

```
SELECT device_id,
timevector(ts, val) -> sort() -> delta() -> abs() -> sum()
as volatility
FROM measurements
WHERE ts >= now() - '1 day'::interval
GROUP BY device_id;
```

It *appears *(from the semantics) that the `timevector(ts, val)`

is an argument to `sort()`

, the resulting `timevector`

is an argument to `delta()`

and so on.

The thing is that `sort()`

(and the others) are regular function calls; they can’t see anything outside of their parentheses and don’t know about anything to their left in the statement; so we need a way to get the `timevector`

into the `sort()`

(and the rest of the pipeline).

The way we solved this is by taking advantage of one of the same fundamental computing insights that functional programming languages use: *code and data are really the same thing*.

Each of our functions returns a special type that describes the function and its arguments. We call these types *pipeline elements *(more later)*.*

The `->`

operator then performs one of two different types of actions depending on the types on its right and left sides. It can either:

*Apply*a pipeline element to the left hand argument - perform the function described by the pipeline element on the incoming data type directly.*Compose*pipeline elements into a combined element that can be applied at some point in the future (this is an optimization that allows us to apply multiple elements in a “nested” manner so that we don’t perform multiple unnecessary passes).

The operator determines the action to perform based on its left and right arguments.

Let’s look at our `timevector`

from before: `timevector(ts, val) -> sort() -> delta() -> abs() -> sum()`

. If you remember from before, I noted that this function pipeline performs the following steps:

- Create
`timevector`

s out of the`ts`

and`val`

columns - Sort it by the time column
- Calculate the delta (or change) between each pair in the
`timevector`

by subtracting the previous`val`

from the current - Take the absolute value of the delta
- Take the sum of the result from the previous steps

And logically, at each step, we can think of the `timevector`

being materialized and passed to the next step in the pipeline.

However, while this will produce a correct result, it’s not the most efficient way to compute this. Instead, it would be more efficient to compute as much as possible in a single pass over the data.

In order to do this, we allow not only the *apply* operation, but also the *compose* operation. Once we’ve composed a pipeline into a logically equivalent higher order pipeline with all of the elements we can choose the most efficient way to execute it internally. (Importantly, even if we have to perform each step sequentially, we don’t need to *materialize* it and pass it between each step in the pipeline so it has significantly less overhead even without other optimization).

## Custom functions: pipeline elements

Now let’s discuss the third, and final, key piece that makes up function pipelines: custom functions, or as we call them, *pipeline elements*.

We have implemented over 60 individual pipeline elements, which fall into 4 categories (with a few subcategories):

`timevector`

transforms

These elements take in a `timevector`

and produce a `timevector`

. They are the easiest to compose, as they produce the same type.

Example pipeline:

```
SELECT device_id,
timevector(ts, val)
-> sort()
-> delta()
-> map($$ ($value^3 + $value^2 + $value * 2) $$)
-> lttb(100)
FROM measurements
```

Organized by sub-category:

#### Unary mathematical

Simple mathematical functions applied to the value in each point in a `timevector`

. (Docs link)

Element | Description |
---|---|

`abs()` |
Computes the absolute value of each value |

`cbrt()` |
Computes the cube root of each value |

`ceil()` |
Computes the first integer greater than or equal to each value |

`floor()` |
Computes the first integer less than or equal to each value |

`ln()` |
Computes the natural logarithm of each value |

`log10()` |
Computes the base 10 logarithm of each value |

`round()` |
Computes the closest integer to each value |

`sign()` |
Computes +/-1 for each positive/negative value |

`sqrt()` |
Computes the square root for each value |

`trunc()` |
Computes only the integer portion of each value |

#### Binary mathematical

Simple mathematical functions with a scalar input applied to the value in each point in a `timevector`

. (Docs link)

Element | Description |
---|---|

`add(N)` |
Computes each value plus `N` |

`div(N)` |
Computes each value divided by `N` |

`logn(N)` |
Computes the logarithm base `N` of each value |

`mod(N)` |
Computes the remainder when each number is divided by `N` |

`mul(N)` |
Computes each value multiplied by `N` |

`power(N)` |
Computes each value taken to the `N` power |

`sub(N)` |
Computes each value less `N` |

#### Compound transforms

Transforms involving multiple points inside of a `timevector`

. (Docs link)

Element | Description |
---|---|

`delta()` |
Subtracts each value from the previous` |

`fill_to(interval, fill_method)` |
Fills gaps larger than `interval` with points at `interval` from the previous using `fill_method` |

`lttb(resolution)` |
Downsamples a `timevector` using the largest triangle three buckets algorithm at `resolution, requires sorted input. |

`sort()` |
Sorts the `timevector` by the `time` column ascending |

#### Lambda Elements

These elements use lambda expressions, which allows the user to write small functions to be evaluated over each point in a `timevector`

.

Lambda expressions can return a `DOUBLE PRECISION`

value like `$$ $value^2 + $value + 3 $$`

. They can return a `BOOL`

like `$$ $time > ‘2020-01-01’t $$`

. They can also return a `(time, value)`

pair like `$$ ($time + ‘1 day’i, sin($value) * 4)$$`

. (Docs link)

You can apply them using the elements below:

Element | Description |
---|---|

`filter(lambda (bool) )` |
Removes points from the `timevector` where the lambda expression evaluates to `false` |

`map(lambda (value) )` |
Applies the lambda expression to all the values in the `timevector` |

`map(lambda (time, value) )` |
Applies the lambda expression to all the times and values in the `timevector` |

`timevector`

finalizers

These elements end the `timevector`

portion of a pipeline, they can either help with output or produce an aggregate over the entire `timevector`

. They are an optimization barrier to composition as they (usually) produce types other than `timevector`

.

Example pipelines:

```
SELECT device_id,
timevector(ts, val) -> sort() -> delta() -> unnest()
FROM measurements
```

```
SELECT device_id,
timevector(ts, val) -> sort() -> delta() -> time_weight()
FROM measurements
```

Finalizer pipeline elements organized by sub-category:

`timevector`

output

These elements help with output, and can produce a set of `(time, value)`

pairs or a Note: this is an area where we’d love further feedback, are there particular data formats that would be especially useful for, say graphing that we can add? File an issue in our GitHub!

Element | Description |
---|---|

`unnest( )` |
Produces a set of `(time, value)` pairs. You can wrap and expand as a composite type to produce separate columns `(pipe -> unnest()).*` |

`materialize()` |
Materializes a `timevector` to pass to an application or other operation directly, blocks any optimizations that would materialize it lazily. |

`timevector`

aggregates

Aggregate all the points in a `timevector`

to produce a single value as a result. (Docs link)

Element | Description |
---|---|

`average()` |
Computes the average of the values in the `timevector` |

`counter_agg()` |
Computes the `counter_agg` aggregate over the times and values in the `timevector` |

`stats_agg()` |
Computes a range of statistical aggregates and returns a `1DStatsAgg` over the values in the `timevector` |

`sum()` |
Computes the sum of the values in the `timevector` |

`num_vals()` |
Counts the points in the `timevector` |

## Aggregate accessors and mutators

These function pipeline elements act like the accessors that I described in our previous post on aggregates. You can use them to get a value from the aggregate part of a function pipeline like so:

```
SELECT device_id,
timevector(ts, val) -> sort() -> delta() -> stats_agg() -> variance()
FROM measurements
```

But these don’t *just* work on `timevector`

s - they also work on a normally produced aggregate as well.

When used instead of normal function accessors and mutators they can make the syntax more clear by getting rid of nested functions like:

```
SELECT approx_percentile(0.5, percentile_agg(val))
FROM measurements
```

Instead, we can use the arrow accessor to convey the same thing:

```
SELECT percentile_agg(val) -> approx_percentile(0.5)
FROM measurements
```

By aggregate family:

#### Counter Aggregates

Counter aggregates deal with resetting counters, (and were stabilized in our 1.3 release this week!). Counters are a common type of metric in the application performance monitoring and metrics world. All values have resets accounted for. These elements must have a `CounterSummary`

to their left when used in a pipeline, from a `counter_agg()`

aggregate or pipeline element. (Docs link)

Element | Description |
---|---|

`counter_zero_time()` |
The time at which the counter value is predicted to have been zero based on the least squares fit of the points input to the `CounterSummary` (x intercept) |

`corr()` |
The correlation coefficient of the least squares fit line of the adjusted counter value. |

`delta()` |
Computes the last - first value of the counter |

`extapolated_delta(method)` |
Computes the delta extrapolated using the provided method to bounds of range. Bounds must have been provided in the aggregate or a `with_bounds` call |

`idelta_left()` / `idelta_right()` |
Computes the instantaneous difference between the second and first points (left) or last and next-to-last points (right) |

`intercept()` |
The y-intercept of the least squares fit line of the adjusted counter value. |

`irate_left()` / `irate_right()` |
Computes the instantaneous rate of change between the second and first points (left) or last and next-to-last points (right) |

`num_changes()` |
Number of times the counter changed values. |

`num_elements()` |
Number of items - any with the exact same time will have been counted only once. |

`num_changes()` |
Number of times the counter reset. |

`slope()` |
The slope of the least squares fit line of the adjusted counter value. |

`with_bounds(range)` |
Applies bounds using the `range` (a `TSTZRANGE` ) to the `CounterSummary` if they weren’t provided in the aggregation step |

#### Percentile Approximation

These aggregate accessors deal with percentile approximation. For now we’ve only implemented them for `percentile_agg`

and `uddsketch`

based aggregates. We have not yet implemented them for `tdigest`

. (Docs link)

Element | Description |
---|---|

`approx_percentile(p)` |
The approximate value at percentile `p` |

`approx_percentile_rank(v)` |
The approximate percentile a value `v` would fall in |

`error()` |
The maximum relative error guaranteed by the approximation |

`mean()` |
The exact average of the input values. |

`num_vals()` |
The number of input values |

#### Statistical aggregates

These aggregate accessors add support for common statistical aggregates (and were stabilized in our 1.3 release this week!). These allow you to compute and `rollup()`

common statistical aggregates like `average`

, `stddev`

and more advanced ones like `skewness`

as well as 2-dimensional aggregates like `slope`

and `covariance`

. Because there are both 1D and 2D versions of these, the accessors can have multiple forms, for instance, `average()`

calculates the average on a 1D aggregate while `average_y()`

& `average_x()`

do so on each dimension of a 2D aggregate. (Docs link)

Element | Description |
---|---|

`average() / average_y() / average_x()` |
The average of the values. |

`corr()` |
The correlation coefficient of the least squares fit line. |

`covariance(method)` |
The covariance of the values using either `population` or `sample` method. |

`determination_coeff()` |
The determination coefficient (aka R squared) of the values. |

`kurtosis(method) / kurtosis_y(method) / kurtosis_x(method)` |
The kurtosis (4th moment) of the values using either `population` or `sample` method. |

`intercept()` |
The intercept of the least squares fit line. |

`num_vals()` |
The number of (non-null) values seen. |

`sum() / sum_x() / sum_y()` |
The sum of the values seen. |

`skewness(method) / skewness_y(method) / skewness_x(method)` |
The skewness (3rd moment) of the values using either `population` or `sample` method. |

`slope()` |
The slope of the least squares fit line. |

`stddev(method) / stddev_y(method) / stddev_x(method)` |
The standard deviation of the values using either `population` or `sample` method. |

`variance(method) / variance_y(method) / variance_x(method)` |
The variance of the values using either `population` or `sample` method. |

`x_intercept()` |
The x intercept of the least squares fit line. |

#### Time Weighted Averages

(More info)

The `average()`

accessor may be called on the output of a `time_weight()`

like so:

```
SELECT time_weight('Linear', ts, val) -> average() FROM measurements;
```

#### Approximate Count Distinct (Hyperloglog)

This is an approximation for distinct counts that was stabilized in our 1.3 release!(Docs link) The `distinct_count()`

accessor may be called on the output of a `hyperloglog()`

like so:

```
SELECT hyperloglog(device_id) -> distinct_count() FROM measurements;
```

## Next steps

We hope this post helped you understand how function pipelines leverage PostgreSQL extensibility to offer functional programming concepts in a way that is fully PostgreSQL compliant. And how function pipelines can improve the ergonomics of your code making it easier to write, read, and maintain.

**You can try function pipelines today** with a fully-managed Timescale Cloud service (no credit card required, free for 30 days). Function pipelines are available now on every new database service on Timescale Cloud, so after you’ve created a new service, you’re all set to use them!

If you prefer to manage your own database instances, you can download and install the timescaledb_toolkit extension on GitHub for free, after which you’ll be able to use function pipelines.

We love building in public. You can view our upcoming roadmap on GitHub for a list of proposed features, as well as features we’re currently implementing and those that are available to use today. We also welcome feedback from the community (it helps us prioritize the features users really want). To contribute feedback, comment on an open issue or in a discussion thread in GitHub.

And if you want to hear more about function pipelines or meet some of the folks who helped build it, be sure to join us for our first Timescale Community Day on October 28, 2021!