Dec 04, 2024
Posted by
Junaid Ahmed
Efficiency, consistency, and agility are core concerns for application developers today. The choice of a database and its interaction with the system play vital roles in achieving these requirements.
Due to its robustness, PostgreSQL has been one of the go-to databases among developers. When integrated with the Python library asyncpg, PostgreSQL can significantly enhance application performance.
Asyncpg fully supports asynchronous programming, allowing non-blocking code execution. This capability is beneficial for many concurrent database operations, such as web applications, real-time analytic systems, or microservices architectures. With asyncpg, it's possible to service several requests at once, thus lowering latencies and boosting overall efficiency.
In this tutorial, I’ll walk you through creating applications with asyncpg and PostgreSQL. I'll start with the configuration basics and gradually build a simple app for financial data management. By the end, you will have learned how to make applications efficient but also scalable with asyncpg and PostgreSQL. But first, let’s describe the tools we’ll use in this tutorial.
Asyncpg is a high-performance asynchronous PostgreSQL client library for Python. It allows developers to execute database operations without blocking other code's execution. This capability is crucial in building responsive and practical applications, especially in very demanding cases with high concurrency and low latency.
Asyncpg is designed to be performant yet simple. Its architecture includes the following core components:
Asyncpg is deeply integrated with the Python asyncio event loop, which makes it able to do non-blocking operations with a database—those that are crucial for building scalable applications. The event loop schedules the execution of async tasks so that it handles multiple operations running concurrently without waiting for one operation to complete before another starts.
Connection pooling is a crucial feature of asyncpg. It means that instead of opening and closing the same database connection every time, you'll reuse one from an existing pool. This lowers the overhead of creating a connection, improving an application's performance. The connection pool will administer a set of database connections, providing an efficient way of handling numerous simultaneous requests to a database.
Asyncpg features an upper-level query interface, which abstracts the developer's interaction with the database and, hence, facilitates database work. This interface design allows developers to run SQL queries and communicate with the database in a very intuitive and effective manner. The high level of complexity in interfacing with database operations is thereby abstracted, facilitating ease in developing and maintaining database applications.
One of the most exciting features of asyncpg is binary protocol support for PostgreSQL. This protocol achieves faster communication between the client and database server by reducing the data that needs to be transferred. Because of binary protocol support, database operation performance is significantly increased when dealing with large datasets.
Asyncpg incorporates mechanisms for caching data, which would reduce the load on the database server while increasing response time. By storing often-used data, it prevents constant re-querying of the database for that information and hence improves overall application efficiency.
Error handling is a fundamental aspect of the asyncpg architecture. It supports developers with full error reporting and management, allowing them to gracefully handle errors that may arise from the database. This ensures that applications will be stable and dependable even in the face of unexpected issues.
Asyncpg adds support for asynchronously running transactional operations so that a developer can implement complex transactional flows without blocking the event loop. This is critical in applications that require correct and reliable data operations, such as those serving financial systems or real-time analytic platforms.
TimescaleDB is an open-source time-series database designed to efficiently manage large-scale time-series data. Developed as an extension to PostgreSQL, it seamlessly combines advanced time-series capabilities into applications. This ensures high performance, maximum scalability, and ease of use while building on PostgreSQL's robustness and reliability.
If you want to focus on building your application, not your database, the fully managed Timescale Cloud will give you time to focus on your priorities. It provides automated data management policies, backup and restore mechanisms, and top-notch 24/7 support, among other benefits. If you want to take it out for a spin, sign up and try it for free for 30 days.
TimescaleDB has numerous functionalities aimed at primarily enhancing time-series data management and analytics.
Now that you know more about the tools we’ll use, it’s time to get started. The code for this tutorial is available on GitHub.
To install the asyncpg library in Python, use the following code:
pip install asyncpg
You can use this code to connect the TimescaleDB to asyncpg and fetch data in Python:
import asyncpg
import pandas as pd
import asyncio
The libraries I used to connect the database are:
asyncpg
: a package that allows you to interact with PostgreSQL databases asynchronously.pandas
: a library for data manipulation and exploration.asyncio
: a library for writing concurrent code with the async and await keywords.I used the TimescaleDB finance database for the data that contains the information about the BTC/USD for high, open, and low. Then, you need to provide the database URL.
# Database credentials
db_url = "paste your credentials"
If you need to use your database, you can simply create the database through the TimescaleDB extension and import or insert data your data. Replace the credentials with the username, password, host, port, and database name.
The next step is to define the asynchronous function to fetch the data. Here’s how you can do it:
async def fetch_data():
# Connect to the database
conn = await asyncpg.connect(db_url)
fetch_data()
: an asynchronous function to handle the database connection and data retrieval.conn
: it establishes a connection to the database using the credentials provided in db_url
.Now let’s define the SQL query to extract the data from the table:
# Define the query to fetch data for the past 1 week
query = """
SELECT bucket, open, high, low, close
FROM one_min_candle
WHERE symbol = 'BTC/USD'
AND bucket >= NOW() - INTERVAL '1 week'
ORDER BY bucket DESC;
"""
The next step is to fetch and process the data.
# Execute the query and fetch the results
rows = await conn.fetch(query)
# Convert the results to a pandas DataFrame
data = pd.DataFrame(rows, columns=['bucket', 'open', 'high', 'low', 'close'])
#save the DataFrame to a CSV file
data.to_csv('data.csv', index=False)
The statement rows = await conn.fetch(query)
runs the given SQL query asynchronously on the linked PostgreSQL database and retrieves the rows. The await keyword guarantees that the function waits for the query to finish before proceeding, with the results saved in the rows parameter.
Now let’s close the connection and display the extracted data.
# Close the connection
await conn.close()
# Return the DataFrame to display it in the notebook
return data
# Using await directly in Jupyter to run the async function and display the DataFrame
display(await fetch_data())
The line await conn.close()
asynchronously terminates the database connection. The return data method then provides the DataFrame for additional usage. Lastly, display(await fetch_data())
invokes the async method fetch_data()
in a Jupyter Notebook and displays the resulting DataFrame. This sequence guarantees proper connection management and data presentation.
Upon executing the code, we got the following result:
Now that the data is extracted and displayed, it’s time to execute some operations. Let’s visualize the data to explore the trend.
The plot is dynamic, updating every second with the most recent information. This real-time data visualization allows us to monitor financial trends and market movements continuously. The connection to TimescaleDB ensures that we are accessing high-resolution time-series data efficiently, enabling us to provide up-to-the-minute insights and analytics.
In the previous section, we successfully connected to and fetched data from the TimescaleDB database. Now, let’s create our table and perform CRUD (create, read, update, and delete) operations to build a simple app for financial data.
First, you need to create your service in TimescaleDB and copy the connection link. Here is the code you can use to create the hypertable once you have the link:
async def create_hypertable():
conn = await asyncpg.connect(CONNECTION)
create_table_query = """
CREATE TABLE IF NOT EXISTS ticks (
time TIMESTAMPTZ NOT NULL,
symbol TEXT NOT NULL,
price DOUBLE PRECISION,
volume DOUBLE PRECISION,
PRIMARY KEY (time, symbol)
);
"""
await conn.execute(create_table_query)
create_hypertable_query = "SELECT create_hypertable('ticks', 'time', if_not_exists => TRUE);"
await conn.execute(create_hypertable_query)
create_continuous_aggregate_query = """
CREATE MATERIALIZED VIEW one_min_candle
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', time) AS bucket,
symbol,
first(price, time) AS open,
max(price) AS high,
min(price) AS low,
last(price, time) AS close
FROM
ticks
GROUP BY
bucket, symbol
WITH NO DATA;
"""
await conn.execute(create_continuous_aggregate_query)
print("Hypertable and continuous aggregate created successfully.")
await conn.close()
The first operation is to create and insert data in the table. Below is the code that you can use to insert data in the hypertable:
async def insert_ticks(data):
conn = await asyncpg.connect(CONNECTION)
insert_query = """
INSERT INTO ticks (time, symbol, price, volume)
VALUES ($1, $2, $3, $4)
"""
await conn.executemany(insert_query, data)
print("Tick data inserted successfully.")
await conn.close()
The next operation is to read the data that we have inserted earlier. Below is the code that you can use to read the data:
async def read_ticks():
conn = await asyncpg.connect(CONNECTION)
select_query = "SELECT * FROM ticks ORDER BY time DESC"
rows = await conn.fetch(select_query)
for row in rows:
print(dict(row))
await conn.close()
return rows
async def read_aggregates():
conn = await asyncpg.connect(CONNECTION)
select_query = "SELECT * FROM one_min_candle ORDER BY bucket DESC"
rows = await conn.fetch(select_query)
for row in rows:
print(dict(row))
await conn.close()
return rows
The code for updating the data in the table is:
async def update_tick(time, symbol, new_values):
conn = await asyncpg.connect(CONNECTION)
update_query = """
UPDATE ticks
SET price = $3, volume = $4
WHERE time = $1 AND symbol = $2
"""
await conn.execute(update_query, time, symbol, new_values[0], new_values[1])
print("Tick data updated successfully.")
To delete the data from the table, you can use the following function:
async def delete_tick(time, symbol):
conn = await asyncpg.connect(CONNECTION)
delete_query = "DELETE FROM ticks WHERE time = $1 AND symbol = $2"
await conn.execute(delete_query, time, symbol)
print("Tick data deleted successfully.")
await conn.close()
The main function handles the table's creation, data insertion, reading, updating, and eventually deletion, illustrating CRUD activities. The code for the main function is:
async def main():
# Create hypertable and continuous aggregate
await create_hypertable()
# Insert sample data
sample_ticks = [
(datetime.strptime('2024-07-03 06:25:00+00:00', '%Y-%m-%d %H:%M:%S%z'), 'BTC', 61057.0, 0.5),
(datetime.strptime('2024-07-03 06:24:30+00:00', '%Y-%m-%d %H:%M:%S%z'), 'BTC', 61058.8, 0.2),
(datetime.strptime('2024-07-03 06:23:45+00:00', '%Y-%m-%d %H:%M:%S%z'), 'BTC', 61061.4, 0.1),
(datetime.strptime('2024-07-03 06:22:30+00:00', '%Y-%m-%d %H:%M:%S%z'), 'BTC', 61072.7, 0.3),
(datetime.strptime('2024-07-03 06:21:15+00:00', '%Y-%m-%d %H:%M:%S%z'), 'BTC', 61026.4, 0.4)
]
await insert_ticks(sample_ticks)
# Read raw tick data
print("Reading raw tick data:")
await read_ticks()
# Read aggregated candlestick data
print("Reading aggregated candlestick data:")
await read_aggregates()
# Update a tick data
update_time = datetime.strptime('2024-07-03 06:25:00+00:00', '%Y-%m-%d %H:%M:%S%z')
new_values = (62000.0, 0.6)
await update_tick(update_time, 'BTC', new_values)
print("After update:")
await read_ticks()
await read_aggregates()
# Delete a tick data
delete_time = datetime.strptime('2024-07-03 06:25:00+00:00', '%Y-%m-%d %H:%M:%S%z')
await delete_tick(delete_time, 'BTC')
print("After delete:")
await read_ticks()
await read_aggregates()
await main()
Upon running the code, you will get an output similar to the one given below, demonstrating the simple application performing the CRUD operations using aysncpg with PostgreSQL.
Upon viewing the output, you'll see how to integrate asyncpg with PostgreSQL to create efficient apps. Using asyncpg's asynchronous capabilities, you can perform non-blocking CRUD operations and employ TimescaleDB’s sophisticated features like indexing and hypertables for optimal efficiency.
If you want to build a financial application with Python and supercharged PostgreSQL but using Django, check out this video:
Developing scalable and efficient applications requires implementing best practices and optimizing performance in dealing with time-series databases, like TimescaleDB, and when working with asynchronous programming, using asyncpg and asyncio.
Numerous openings and closings of the database connections use most of the resources. Pooling makes an application capable of managing the task of addressing a limited set of connections with which it could respond to a possibly huge number of concurrent database queries. It ensures better use of database connections, which could help gracefully handle spiking loads more effectively.
Optimizing PostgreSQL queries is the main option for time-series data, characterized by large data volumes and frequent access.
In this tutorial, I walked you through building a financial data application using asyncpg and PostgreSQL. By now, you should feel better equipped to make your applications more efficient and scalable using these tools.
The async functionality of asyncpg perfectly complements TimescaleDB’s speed and power when managing time-series data. These technologies are essential for businesses relying on real-time analytics and IoT, both of which demand efficient time-series data management.
If this tutorial piqued your interest in Timescale, you can create a free account, and try it out today.
For more Python resources, check out this blog post on how to develop applications using psycopg3.