Function Pipelines: Building Functional Programming Into PostgreSQL Using Custom Operators

Function Pipelines: Building Functional Programming Into PostgreSQL Using Custom Operators

Today, we are announcing function pipelines, a new capability that introduces functional programming concepts inside PostgreSQL (and SQL) using custom operators.

Function pipelines radically improve the developer ergonomics of analyzing data in PostgreSQL and SQL, by applying principles from functional programming and popular tools like Python’s Pandas and PromQL.

At Timescale our mission is to serve developers worldwide, and enable them to build exceptional data-driven products that measure everything that matters: e.g., software applications, industrial equipment, financial markets, blockchain activity, user actions, consumer behavior, machine learning models, climate change, and more.

We believe SQL is the best language for data analysis. We’ve championed the benefits of SQL for several years, even back when many were abandoning the language for custom domain-specific languages. And we were right - SQL has resurged and become the universal language for data analysis, and now many NoSQL databases are adding SQL interfaces to keep up.

But SQL is not perfect, and at times can get quite unwieldy. For example,

SELECT device_id, 
	sum(abs_delta) as volatility
FROM (
	SELECT device_id, 
		abs(val - lag(val) OVER (PARTITION BY device_id ORDER BY ts))
        	as abs_delta 
	FROM measurements
	WHERE ts >= now() - '1 day'::interval) calc_delta
GROUP BY device_id; 

Pop quiz: What does this query do?

Even if you are a SQL expert, queries like this can be quite difficult to read - and even harder to express. Complex data analysis in SQL can be hard.

Function pipelines let you express that same query like this:

SELECT device_id, 
	timevector(ts, val) -> sort() -> delta() -> abs() -> sum() 
    		as volatility
FROM measurements
WHERE ts >= now() - '1 day'::interval
GROUP BY device_id;

Now it is much clearer what this query is doing. It:

  • Gets the last day’s data from the measurements table, grouped by device_id
  • Sorts the data by the time column
  • Calculates the delta (or change) between values
  • Takes the absolute value of the delta
  • And then takes the sum of the result of the previous steps

Function pipelines improve your own coding productivity, while also making your SQL code easier for others to comprehend and maintain.

Inspired by functional programming languages, function pipelines enable you to analyze data by composing multiple functions, leading to a simpler, cleaner way of expressing complex logic in PostgreSQL.

And the best part: we built function pipelines in a way that is fully PostgreSQL compliant - we did not change any SQL syntax - meaning that any tool that speaks PostgreSQL will be able to support data analysis using function pipelines.

How did we build this? By taking advantage of the incredible extensibility of PostgreSQL, in particular: custom types, custom operators, and custom functions.

In our previous example, you can see the key elements of function pipelines:

  • Custom data types: in this case, the timevector, which is a set of (time, value) pairs.
  • Custom operator: ->, used to compose and apply function pipeline elements to the data that comes in.
  • And finally, custom functions; called pipeline elements. Pipeline elements can transform and analyze timevectors (or other data types) in a function pipeline. For this initial release, we’ve built 60 custom functions! (Full list here).

We’ll go into more detail on function pipelines in the rest of this post, but if you just want to get started as soon as possible, the easiest way to try function pipelines is through a fully managed Timescale Cloud service. Try it for free (no credit card required) for 30 days.

Function pipelines are pre-loaded on each new database service on Timescale Cloud, available immediately - so after you’ve created a new service, you’re all set to use them!

If you prefer to manage your own database instances, you can install the timescaledb_toolkit into your existing PostgreSQL installation, completely for free.

We’ve been working on this capability for a long time, but in line with our belief of “move fast but don’t break things”, we’re initially releasing function pipelines as an experimental feature - and we would absolutely love to get your feedback. You can open an issue or join a discussion thread in GitHub (And, if you like what you see, GitHub ⭐ are always welcome and appreciated too!).

We’d also like to take this opportunity to give a huge shoutout to pgx, the Rust-based framework for building PostgreSQL extensions - it handles a lot of the heavy lifting for this project. We have over 600 custom types, operators, and functions in the timescaledb_toolkit extension at this point; managing this without pgx (and the ease of use that comes from working with Rust) would be a real bear of a job.

Function pipelines: why are they useful?

It’s October. In the Northern hemisphere (where most of Team Timescale sits, including your authors), it is starting to get cold.

Now imagine a restaurant in New York City whose owners care about their customers and their customers’ comfort. And you are working on an IoT product designed to help small businesses like these owners minimize their heating bill while maximizing their customers happiness. So you install two thermometers, one at the front measuring the temperature right by the door, and another at the back of the restaurant.

Now, as many of you may know (if you’ve ever had to sit by the door of a restaurant in the fall or winter), when someone enters, the temperature drops - and once the door is closed, the temperature warms back up. The temperature at the back of the restaurant will vary much less than at the front, right by the door. And both of them will drop slowly down to a lower set point during non-business hours and warm back up sometime before business hours based on the setpoints on our thermostat. So overall we’ll end up with a graph that looks something like this:

A graph with time on the x axis and temperature on the y axis showing two curves. First a curve labeled back which starts low steadily rises stays relatively constant in the section labeled operating hours and then drops slowly back down. Second a curve labeled front which starts following the other, starts low, rises then it starts getting jumpy, drastically varying while the other stays constant during operating hours, then it falls back down.
A graph of the temperature at the front (near the door) and back. The back is much steadier, while the front is more volatile. Graph is for illustrative purposes only, data is fabricated. No restaurants or restaurant patrons were harmed in the making of this post.

As we can see, the temperature by the front door varies much more than at the back of the restaurant. Another way to say this is the temperature by the front door is more volatile. Now, the owners of this restaurant want to measure this because frequent temperature changes means uncomfortable customers.

In order to measure volatility, we could first subtract each point from the point before to calculate a delta. If we add this up directly, large positive and negative deltas will cancel out. But, we only care about the magnitude of the delta, not its sign - so what we really should do is take the absolute value of the delta, and then take the total sum of the previous steps.

We now have a metric that might help us measure customer comfort, and also the efficacy of different weatherproofing methods (for example, adding one of those little vestibules that acts as a windbreak).

To track this, we collect measurements from our thermometers and store them in a table:

CREATE TABLE measurements(
	device_id BIGINT,
	ts TIMESTAMPTZ,
	val DOUBLE PRECISION
);

The device_id identifies the thermostat, ts the time of reading and val the temperature.

Using the data in our measurements table, let’s look at how we calculate volatility using function pipelines.

Note: because all of the function pipeline features are still experimental, they exist in the toolkit_experimental schema. Before running any of the SQL code in this post you will need to set your search_path to include the experimental schema as we do in the example below, we won’t repeat this throughout the post so as not to distract.

set search_path to toolkit_experimental, public; --still experimental, so do this to make it easier to read

SELECT device_id, 
	timevector(ts, val) -> sort() -> delta() -> abs() -> sum() 
    	as volatility
FROM measurements
WHERE ts >= now()-'1 day'::interval
GROUP BY device_id;

And now we have the same query that we used as our example in the introduction.

In this query, the function pipeline timevector(ts, val) -> sort() -> delta() -> abs() -> sum() succinctly expresses the following operations:

  1. Create timevectors (more detail on this later) out of the ts and val columns
  2. Sort each timevector by the time column
  3. Calculate the delta (or change) between each pair in the timevector by subtracting the previous val from the current
  4. Take the absolute value of the delta
  5. Take the sum of the result from the previous steps

The FROM, WHERE and GROUP BY clauses do the rest of the work telling us:

  1. We’re getting data FROM the measurements table
  2. WHERE the ts, or timestamp column, contains values over the last day
  3. Showing one pipeline output per device_id (the GROUP BY column)

As we noted before, if you were to do this same calculation using SQL and PostgreSQL functionality, your query would look like this:

SELECT device_id, 
sum(abs_delta) as volatility
FROM (
	SELECT 
		abs(val - lag(val) OVER (PARTITION BY device_id ORDER BY ts) ) 
        	as abs_delta 
	FROM measurements
	WHERE ts >= now() - '1 day'::interval) calc_delta
GROUP BY device_id; 

This does the same 5 steps as the above, but is much harder to understand, because we have to use a window function and aggregate the results - but also, because aggregates are performed before window functions, we need to actually execute the window function in a subquery.

As we can see, function pipelines make it significantly easier to comprehend the overall analysis of our data. There’s no need to completely understand what’s going on in these functions just yet, but for now it’s enough to understand that we’ve essentially implemented a small functional programming language inside of PostgreSQL. You can still use all of the normal, expressive SQL you’ve come to know and love. Function pipelines just add new tools to your SQL toolbox that make it easier to work with time-series data.

Some avid SQL users might find the syntax a bit foreign at first, but for many people who work in other programming languages, especially using tools like Python’s Pandas Package, this type of successive operation on data sets will feel natural.

And again, this is still fully PostgreSQL compliant: We introduce no changes to the parser or anything that should break compatibility with PostgreSQL drivers.

How we built function pipelines without forking PostgreSQL

We built function pipelines- without modifying the parser or anything that would require a fork of PostgreSQL- by taking advantage of three of the many ways that PostgreSQL enables extensibility: custom types, custom functions, and custom operators.

  • Custom data types, starting with the timevector, which is a set of (time, value) pairs
  • A custom operator: ->, which is used to compose and apply function pipeline elements to the data that comes in.
  • Custom functions, called pipeline elements, which can transform and analyze timevectors (or other data types) in a function pipeline (with 60 functions in this initial release)

We believe that new idioms like these are exactly what PostgreSQL was meant to enable. That’s why it has supported custom types, functions and operators from its earliest days. (And is one of the many reasons why we love PostgreSQL.)

A custom data type: the timevector

A timevector is a collection of (time, value) pairs. As of now, the times must be TIMESTAMPTZs and the values must be DOUBLE PRECISION numbers. (But this may change in the future as we continue to develop this data type. If you have ideas/input, please file feature requests on GitHub explaining what you’d like!)

You can think of the timevector as something like this:

 A box with `timevector` on the outside, inside there is a table with one column labeled time containing multiple timestamps and another column labeled value containing floating point numbers.
A depiction of a timevector.

One of the first questions you might ask is: how does a timevector relate to time-series data? (If you want to know more about time-series data, we have a great blog post on that).

Let’s consider our example from above, where we were talking about a restaurant that was measuring temperatures, and we had a measurements table like so:

CREATE TABLE measurements(
	device_id BIGINT,
	ts TIMESTAMPTZ,
	val DOUBLE PRECISION
);

In this example, we can think of a single time-series dataset as all historical and future time and temperature measurements from a device.

Given this definition, we can think of a timevector as a finite subset of a time-series dataset. The larger time-series dataset may extend back into the past and it may extend into the future, but the timevector is bounded.

A box labeled time-series with a table in it. The table has one column labeled time and another labeled value like the diagram above. There is a timevector box inside the table with timestamps and values. Each column also has an arrow at the top pointing to the word past and another arrow at the bottom pointing to the word future conveying that the time-series extends into the past and future while the timevector contains a subset of the values.
A timevector is a finite subset of a time-series and contains all the (time, value) pairs in some region of the time-series.

In order to construct a timevector from the data gathered from a thermometer, we use a custom aggregate and pass in the columns we want to become our (time, value) pairs. We can use the WHERE clause to define the extent of the timevector (i.e., the limits of this subset), and the GROUP BY clause to provide identifying information about the time-series that’s represented.

Building on our example, this is how we construct a timevector for each thermometer in our dataset:

SELECT device_id, 
	timevector(ts, val)
FROM measurements
WHERE ts >= now() - '1 day'::interval
GROUP BY device_id;

But a timevector doesn't provide much value by itself. So now, let’s also consider some complex calculations that we can apply to the timevector, starting with a custom operator used to apply these functions

A custom operator: ->


In function pipelines, the -> operator is used to apply and compose multiple functions, in an easy to write and read format.

Fundamentally, -> means: “apply the operation on the right to the inputs on the left”, or, more simply “do the next thing”.

We created a general-purpose operator for this because we think that too many operators meaning different things can get very confusing and difficult to read.

One thing that you’ll notice about the pipeline elements is that the arguments are in an unusual place in a statement like:

SELECT device_id, 
 	timevector(ts, val) -> sort() -> delta() -> abs() -> sum() 
    	as volatility
FROM measurements
WHERE ts >= now() - '1 day'::interval
GROUP BY device_id;

It appears (from the semantics) that the timevector(ts, val) is an argument to sort(), the resulting timevector is an argument to delta() and so on.

The thing is that sort() (and the others) are regular function calls; they can’t see anything outside of their parentheses and don’t know about anything to their left in the statement; so we need a way to get the timevector into the sort() (and the rest of the pipeline).

The way we solved this is by taking advantage of one of the same fundamental computing insights that functional programming languages use: code and data are really the same thing.

Each of our functions returns a special type that describes the function and its arguments. We call these types pipeline elements (more later).

The -> operator then performs one of two different types of actions depending on the types on its right and left sides.  It can either:

  1. Apply a pipeline element to the left hand argument - perform the function described by the pipeline element on the incoming data type directly.
  2. Compose pipeline elements into a combined element that can be applied at some point in the future (this is an optimization that allows us to apply multiple elements in a “nested” manner so that we don’t perform multiple unnecessary passes).

The operator determines the action to perform based on its left and right arguments.

Let’s look at our timevector from before: timevector(ts, val) -> sort() -> delta() -> abs() -> sum(). If you remember from before, I noted that this function pipeline performs the following steps:

  1. Create timevectors out of the ts and val columns
  2. Sort it by the time column
  3. Calculate the delta (or change) between each pair in the timevector by subtracting the previous val from the current
  4. Take the absolute value of the delta
  5. Take the sum of the result from the previous steps

And logically, at each step, we can think of the timevector being materialized and passed to the next step in the pipeline.

However, while this will produce a correct result, it’s not the most efficient way to compute this. Instead, it would be more efficient to compute as much as possible in a single pass over the data.

In order to do this, we allow not only the apply operation, but also the compose operation. Once we’ve composed a pipeline into a logically equivalent higher order pipeline with all of the elements we can choose the most efficient way to execute it internally. (Importantly, even if we have to perform each step sequentially, we don’t need to materialize it and pass it between each step in the pipeline so it has significantly less overhead even without other optimization).

Custom functions: pipeline elements

Now let’s discuss the third, and final, key piece that makes up function pipelines: custom functions, or as we call them, pipeline elements.

We have implemented over 60 individual pipeline elements, which fall into 4 categories (with a few subcategories):

timevector transforms

These elements take in a timevector and produce a timevector. They are the easiest to compose, as they produce the same type.

Example pipeline:

SELECT device_id, 
	timevector(ts, val) 
    	-> sort() 
        -> delta() 
        -> map($$ ($value^3 + $value^2 + $value * 2) $$) 
        -> lttb(100) 
FROM measurements

Organized by sub-category:

Unary mathematical

Simple mathematical functions applied to the value in each point in a timevector. (Docs link)

Element Description
abs() Computes the absolute value of each value
cbrt() Computes the cube root of each value
ceil() Computes the first integer greater than or equal to each value
floor() Computes the first integer less than or equal to each value
ln() Computes the natural logarithm of each value
log10() Computes the base 10 logarithm of each value
round() Computes the closest integer to each value
sign() Computes +/-1 for each positive/negative value
sqrt() Computes the square root for each value
trunc() Computes only the integer portion of each value

Binary mathematical

Simple mathematical functions with a scalar input applied to the value in each point in a timevector. (Docs link)

Element Description
add(N) Computes each value plus N
div(N) Computes each value divided by N
logn(N) Computes the logarithm base N of each value
mod(N) Computes the remainder when each number is divided by N
mul(N) Computes each value multiplied by N
power(N) Computes each value taken to the N power
sub(N) Computes each value less N

Compound transforms

Transforms involving multiple points inside of a timevector. (Docs link)

Element Description
delta() Subtracts each value from the previous`
fill_to(interval, fill_method) Fills gaps larger than interval with points at interval from the previous using fill_method
lttb(resolution) Downsamples a timevector using the largest triangle three buckets algorithm at `resolution, requires sorted input.
sort() Sorts the timevector by the time column ascending

Lambda Elements

These elements use lambda expressions, which allows the user to write small functions to be evaluated over each point in a timevector.
Lambda expressions can return a DOUBLE PRECISION value like $$ $value^2 + $value + 3 $$. They can return a BOOL like $$ $time > ‘2020-01-01’t $$ . They can also return a (time, value) pair like $$ ($time + ‘1 day’i, sin($value) * 4)$$. (Docs link)

You can apply them using the elements below:

Element Description
filter(lambda (bool) ) Removes points from the timevector where the lambda expression evaluates to false
map(lambda (value) ) Applies the lambda expression to all the values in the timevector
map(lambda (time, value) ) Applies the lambda expression to all the times and values in the timevector

timevector finalizers

These elements end the timevector portion of a pipeline, they can either help with output or  produce an aggregate over the entire timevector. They are an optimization barrier to composition as they (usually) produce types other than timevector.

Example pipelines:

SELECT device_id, 
	timevector(ts, val) -> sort() -> delta() -> unnest()
FROM measurements
SELECT device_id, 
	timevector(ts, val) -> sort() -> delta() -> time_weight()
FROM measurements

Finalizer pipeline elements organized by sub-category:

timevector output

These elements help with output, and can produce a set of (time, value) pairs or a Note: this is an area where we’d love further feedback, are there particular data formats that would be especially useful for, say graphing that we can add? File an issue in our GitHub!

Element Description
unnest( ) Produces a set of (time, value) pairs. You can wrap and expand as a composite type to produce separate columns (pipe -> unnest()).*
materialize() Materializes a timevector to pass to an application or other operation directly, blocks any optimizations that would materialize it lazily.

timevector aggregates

Aggregate all the points in a timevector to produce a single value as a result. (Docs link)

Element Description
average() Computes the average of the values in the timevector
counter_agg() Computes the counter_agg aggregate over the times and values in the timevector
stats_agg() Computes a range of statistical aggregates and returns a 1DStatsAgg over the values in the timevector
sum() Computes the sum of the values in the timevector
num_vals() Counts the points in the timevector

Aggregate accessors and mutators

These function pipeline elements act like the accessors that I described in our previous post on aggregates. You can use them to get a value from the aggregate part of a function pipeline like so:

SELECT device_id, 
	timevector(ts, val) -> sort() -> delta() -> stats_agg() -> variance() 
FROM measurements

But these don’t just work on timevectors - they also work on a normally produced aggregate as well.

When used instead of normal function accessors and mutators they can make the syntax more clear by getting rid of nested functions like:

SELECT approx_percentile(0.5, percentile_agg(val)) 
FROM measurements

Instead, we can use the arrow accessor to convey the same thing:

SELECT percentile_agg(val) -> approx_percentile(0.5) 
FROM measurements

By aggregate family:

Counter Aggregates

Counter aggregates deal with resetting counters, (and were stabilized in our 1.3 release this week!). Counters are a common type of metric in the application performance monitoring and metrics world. All values have resets accounted for. These elements must have a CounterSummary to their left when used in a pipeline, from a counter_agg() aggregate or pipeline element. (Docs link)

Element Description
counter_zero_time() The time at which the counter value is predicted to have been zero based on the least squares fit of the points input to the CounterSummary(x intercept)
corr() The correlation coefficient of the least squares fit line of the adjusted counter value.
delta() Computes the last - first value of the counter
extapolated_delta(method) Computes the delta extrapolated using the provided method to bounds of range. Bounds must have been provided in the aggregate or a with_bounds call
idelta_left() / idelta_right() Computes the instantaneous difference between the second and first points (left) or last and next-to-last points (right)
intercept() The y-intercept of the least squares fit line of the adjusted counter value.
irate_left() / irate_right() Computes the instantaneous rate of change between the second and first points (left) or last and next-to-last points (right)
num_changes() Number of times the counter changed values.
num_elements() Number of items - any with the exact same time will have been counted only once.
num_changes() Number of times the counter reset.
slope() The slope of the least squares fit line of the adjusted counter value.
with_bounds(range) Applies bounds using the range (a TSTZRANGE) to the CounterSummary if they weren’t provided in the aggregation step

Percentile Approximation

These aggregate accessors deal with percentile approximation. For now we’ve only implemented them for percentile_agg and uddsketch based aggregates. We have not yet implemented them for tdigest. (Docs link)

Element Description
approx_percentile(p) The approximate value at percentile p
approx_percentile_rank(v) The approximate percentile a value v would fall in
error() The maximum relative error guaranteed by the approximation
mean() The exact average of the input values.
num_vals() The number of input values

Statistical aggregates

These aggregate accessors add support for common statistical aggregates (and were stabilized in our 1.3 release this week!). These allow you to compute and rollup() common statistical aggregates like average, stddev and more advanced ones like skewness as well as 2-dimensional aggregates like slope and covariance. Because there are both 1D and 2D versions of these, the accessors can have multiple forms, for instance, average() calculates the average on a 1D aggregate while average_y() & average_x() do so on each dimension of a 2D aggregate. (Docs link)

Element Description
average() / average_y() / average_x() The average of the values.
corr() The correlation coefficient of the least squares fit line.
covariance(method) The covariance of the values using either population or sample method.
determination_coeff() The determination coefficient (aka R squared) of the values.
kurtosis(method) / kurtosis_y(method) / kurtosis_x(method) The kurtosis (4th moment) of the values using either population or sample method.
intercept() The intercept of the least squares fit line.
num_vals() The number of (non-null) values seen.
sum() / sum_x() / sum_y() The sum of the values seen.
skewness(method) / skewness_y(method) / skewness_x(method) The skewness (3rd moment) of the values using either population or sample method.
slope() The slope of the least squares fit line.
stddev(method) / stddev_y(method) / stddev_x(method) The standard deviation of the values using either population or sample method.
variance(method) / variance_y(method) / variance_x(method) The variance of the values using either population or sample method.
x_intercept() The x intercept of the least squares fit line.

Time Weighted Averages

(More info)
The average() accessor may be called on the output of a time_weight() like so:

SELECT time_weight('Linear', ts, val) -> average()  FROM measurements;

Approximate Count Distinct (Hyperloglog)

This is an approximation for distinct counts that was stabilized in our 1.3 release!(Docs link) The distinct_count() accessor may be called on the output of a hyperloglog() like so:

SELECT hyperloglog(device_id) -> distinct_count() FROM measurements;

Next steps

We hope this post helped you understand how function pipelines leverage PostgreSQL extensibility to offer functional programming concepts in a way that is fully PostgreSQL compliant. And how function pipelines can improve the ergonomics of your code making it easier to write, read, and maintain.

You can try function pipelines today with a fully-managed Timescale Cloud service (no credit card required, free for 30 days). Function pipelines are available now on every new database service on Timescale Cloud, so after you’ve created a new service, you’re all set to use them!

If you prefer to manage your own database instances, you can download and install the timescaledb_toolkit extension on GitHub for free, after which you’ll be able to use function pipelines.

We love building in public. You can view our upcoming roadmap on GitHub for a list of proposed features, as well as features we’re currently implementing and those that are available to use today. We also welcome feedback from the community (it helps us prioritize the features users really want). To contribute feedback, comment on an open issue or in a discussion thread in GitHub.

And if you want to hear more about function pipelines or meet some of the folks who helped build it, be sure to join us for our first Timescale Community Day on October 28, 2021!

Ingest and query in milliseconds, even at terabyte scale.
This post was written by
17 min read
Always Be Launching
Contributors

Related posts