Randomly slow insert into timescaledb and memory leak

Hi,
I need your help.
I just moved to Timescale for stock data.

Previously in Postgresql, I use auto-generated ID field as Primary key but due to Timescale requirements I need to make a composite key of ID and trade_date (a date time column). I have tried to remove ID column but the insert time shot to 20s in both Postgresql and Timescale so the official tutorial may have a problem.

Here is how I run timescaledb:

services:
  timescaledb:
    image: timescale/timescaledb:latest-pg16
    restart: always
    ports:
      - '5432:5432'
    environment:
      POSTGRES_USER: "postgres"
      POSTGRES_PASSWORD: "victoriasecret"
      POSTGRES_DB: fastapi
    command: -c 'max_connections=300'
    profiles: ["prod", "dev"]
    volumes:
      - timescale_2:/var/lib/postgresql/data

I need to increase max-connections otherwise my workers will get error of maximum connection reached.

Here is the Hypertable. I create this table following the tutorial with the partition of trade_date, default internal and migrate_data => True

BidVol10,double precision,,,YES
mck_id,integer,,,YES
trade_date,timestamp without time zone,,,NO
created_at,timestamp with time zone,,now(),YES
MatchedPrice,double precision,,,NO
OfferVol1,double precision,,,NO
OfferVol2,double precision,,,NO
FlrPrice,double precision,,,NO
OfferVol3,double precision,,,NO
BidVol1,double precision,,,NO
BidVol2,double precision,,,NO
FSellVol,double precision,,,NO
BidVol3,double precision,,,NO
OfferPrice3,double precision,,,NO
OfferPrice1,double precision,,,NO
OfferPrice2,double precision,,,NO
MatchedChange,double precision,,,NO
AvgPrice,double precision,,,NO
BidVol8,double precision,,,YES
BidVol9,double precision,,,YES
id,integer,,nextval('mck_records_id_seq'::regclass),NO
HigPrice,double precision,,,NO
TradeTime,timestamp without time zone,,,YES
MatchedVol,double precision,,,NO
BidPrice2,double precision,,,NO
BidPrice3,double precision,,,NO
FBuyVol,double precision,,,NO
CeiPrice,double precision,,,NO
BidPrice1,double precision,,,NO
LowPrice,double precision,,,NO
RefPrice,double precision,,,NO
MatchedTotalVol,double precision,,,NO
TradingSession,USER-DEFINED,,'LO'::tradingsessionenum,NO
OfferVol4,double precision,,,YES
OfferVol5,double precision,,,YES
OfferVol6,double precision,,,YES
OfferVol7,double precision,,,YES
OfferVol8,double precision,,,YES
OfferVol9,double precision,,,YES
OfferVol10,double precision,,,YES
OfferPrice4,double precision,,,YES
OfferPrice5,double precision,,,YES
OfferPrice6,double precision,,,YES
OfferPrice7,double precision,,,YES
OfferPrice8,double precision,,,YES
OfferPrice9,double precision,,,YES
OfferPrice10,double precision,,,YES
BidPrice4,double precision,,,YES
BidPrice5,double precision,,,YES
BidPrice6,double precision,,,YES
BidPrice7,double precision,,,YES
BidPrice8,double precision,,,YES
BidPrice9,double precision,,,YES
BidPrice10,double precision,,,YES
BidVol4,double precision,,,YES
BidVol5,double precision,,,YES
BidVol6,double precision,,,YES
BidVol7,double precision,,,YES
FloorCode,character varying,,,NO
AmPm,character varying,,,NO
Code,character varying,,,NO

Here is some insert that are randomly slow:

I insert using standard sqlachemy + psycopg2, I have no issue when insert with Postgres before.
After insert I make 2 atomic increments (with row locking) to a count table.

The amount of expected data is just 200-300 rows per seconds.

I also notice a weird pattern when a worker get stuck, as can be seen in the image (in comment section), one thread is hold up leading to all other thread to drop in a small window and the speed go up to normal.

I am running this in ubuntu 24.04 in Proxmox. The VM is on a SSD. What could be the reason behind this? Please help! My Boss does like it when his lovely graph have a bunch of gaps.

Here is the memory leak:

Here is the weird pattern:

Hi @H_N , can you please share your example in a runnable way so we can try to run it here and debug your PoC here?

requirements.txt

fastapi==0.115.8
uvicorn==0.30.6
selenium==4.29.0
selenium_wire==5.1.0
SQLAlchemy==2.0.38
psycopg2-binary==2.9.10
setuptools==75.8.0
blinker==1.7.0
alembic==1.14.1
nameko==2.14.1
async-timeout==5.0.1
billiard==4.2.1
celery==5.4.0
click-didyoumean==0.3.1
click-plugins==1.1.1
click-repl==0.3.0
redis==5.2.1
pydantic-settings==2.8.1
apscheduler==3.11.0
pytz==2025.1
PyJWT==2.10.1
rel==0.4.9.19
flower==2.0.1
orjson==3.10.15
gevent==24.11.1
pybloom-live==4.0.0
aiofiles==24.1.0
python-multipart==0.0.20
langchain-openai==0.3.11
langchain==0.3.21

Docker-compose:

services:
  flower:
    image: mher/flower:2.0
    command: celery --broker=redis://redis:6379/0 flower --port=5555
    profiles: ["prod", "dev"]
    ports:
      - '5555:5555'
    depends_on:
      celery:
        condition : service_healthy

  cadvisor:
      image: gcr.io/cadvisor/cadvisor:v0.51.0
      container_name: cadvisor
      restart: always
      profiles: ["prod", "dev"]
      devices:
        - /dev/kmsg
      privileged: true
      ports:
        - '8080:8080'
      volumes:
        - /:/rootfs:ro
        - /var/run:/var/run:ro
        - /sys:/sys:ro
        - /var/lib/docker/:/var/lib/docker:ro
        - /dev/disk/:/dev/disk:ro
        - /var/run/docker.sock:/var/run/docker.sock:ro
        - /etc/machine-id:/etc/machine-id:ro
        - /var/lib/dbus/machine-id:/var/lib/dbus/machine-id:ro

  node-exporter:
      image: prom/node-exporter:v1.9.0
      container_name: node-exporter
      restart: always
      profiles: ["prod", "dev"]
      volumes:
          - /proc:/host/proc:ro
          - /sys:/host/sys:ro
          - /:/rootfs:ro
      ports:
        - '9100:9100'
      command:
          - "--path.procfs=/host/proc"
          - "--path.sysfs=/host/sys"
          - "--path.rootfs=/rootfs"
          - "--collector.filesystem.ignored-mount-points='^/(sys|proc|dev|host|etc|rootfs/var/lib/docker/containers|rootfs/var/lib/docker/overlay2|rootfs/run/docker/netns|rootfs/var/lib/docker/aufs)($$|/)'"
      depends_on:
        - flower

  prometheus:
      image: prom/prometheus:v2.26.0
      container_name: prometheus
      restart: always
      profiles: ["prod", "dev"]
      ports:
        - '9090:9090'
      volumes:
          - ./prometheus.yml:/etc/prometheus/prometheus.yml
          - prometheus-data:/prometheus
      depends_on:
          - cadvisor
          - node-exporter
          - flower

  grafana:
      image: grafana/grafana:11.3.0-security-01
      container_name: grafana
      restart: always
      profiles: ["prod", "dev"]
      ports:
          - '3333:3000'
      volumes:
        - grafana-data:/var/lib/grafana
      depends_on:
          - prometheus
          - flower

  timescaledb:
    image: timescale/timescaledb:latest-pg16
    restart: always
    ports:
      - '5432:5432'
    environment:
      POSTGRES_USER: "postgres"
      POSTGRES_PASSWORD: "victoriasecret"
      POSTGRES_DB: fastapi
    command: -c 'max_connections=300'
    profiles: ["prod", "dev"]
    volumes:
      - timescale_2:/var/lib/postgresql/data

  redis:
    image: redis:7.4.1
    restart: unless-stopped
    ports:
      - "6379:6379"
    profiles: ["prod", "dev"]
    volumes:
      - redis-data:/data

  fast_api:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - '8000:8000'
    volumes:
      - .:/nextjs-fastapi
    depends_on:
      - timescaledb
      - celery
      - celery-gevent
      - celery_beat
      - cadvisor
      - grafana
      - redis
    profiles: [ "prod", "dev" ]
    restart: unless-stopped

  celery:
    build:
      context: .
      dockerfile: Dockerfile
    command: celery -A services.worker_init.celery_app worker --concurrency=4 --pool=processes --loglevel=info -Q main --purge
    env_file: ".env"
    depends_on:
      - redis
      - timescaledb
    volumes:
      - .:/nextjs-fastapi
    profiles: [ "prod", "dev" ]
    healthcheck:
        test: celery -b redis://redis:6379 inspect ping -d celery@$$HOSTNAME
        interval: 30s
        timeout: 10s
        retries: 3
    restart: unless-stopped

  celery-io:
    build:
      context: .
      dockerfile: Dockerfile
    command: celery -A services.worker_init.celery_app worker --concurrency=22 --pool=processes --loglevel=info -Q io_task --purge
    env_file: ".env"
    depends_on:
      - redis
      - timescaledb
    volumes:
      - .:/nextjs-fastapi
    profiles: [ "prod", "dev" ]
    healthcheck:
        test: celery -b redis://redis:6379 inspect ping -d celery@$$HOSTNAME
        interval: 30s
        timeout: 10s
        retries: 3
    restart: unless-stopped

  celery_beat:
    build:
      context: .
      dockerfile: Dockerfile
    command: celery -A services.worker_init.celery_app beat --loglevel=info
    env_file: ".env"
    depends_on:
      - redis
      - timescaledb
      - celery
      - celery-gevent
    volumes:
      - .:/nextjs-fastapi
    profiles: [ "prod", "dev" ]
    restart: unless-stopped

Dockerfile:

FROM python:3.12-alpine

# Set the working directory inside the container
WORKDIR /nextjs-fastapi

# Copy the requirements file to the working directory
COPY requirements.txt .

# Install the Python dependencies
RUN pip install -r requirements.txt

# Copy the application code to the working directory
COPY . .

# Expose the port on which the application will run
EXPOSE 8000

# Run the FastAPI application using uvicorn server
CMD ["uvicorn", "api.index:app", "--host", "0.0.0.0", "--port", "8000"]

Write function

@celery_app.task(queue="io_task")
def write_mck_record(mck_record: dict, date: str):

    db = next(get_db())
    mck_object = get_mck_by_code(db, mck_record["Code"])
    if not mck_object:
        mck_object = create_mck(db, mck=MCKSchema(code=mck_record["Code"]))

    for key, value in mck_record.items():
        try:
            if math.isnan(float(value)):
                mck_record[key] = 0.0
        except ValueError:
            continue

    new_mck_record = MCKRecords(
        mck_id=mck_object.id,
        MatchedPrice=float(mck_record["MatchedPrice"]),
        trade_date=datetime.strptime(date, "%d/%m/%Y"),
        OfferVol1=float(mck_record["OfferVol1"]),
        OfferVol2=float(mck_record["OfferVol2"]),
        OfferVol3=float(mck_record["OfferVol3"]),
        OfferVol4=float(mck_record["OfferVol4"]),
        OfferVol5=float(mck_record["OfferVol5"]),
        OfferVol6=float(mck_record["OfferVol6"]),
        OfferVol7=float(mck_record["OfferVol7"]),
        OfferVol8=float(mck_record["OfferVol8"]),
        OfferVol9=float(mck_record["OfferVol9"]),
        OfferVol10=float(mck_record["OfferVol10"]),
        BidVol1=float(mck_record["BidVol1"]),
        BidVol2=float(mck_record["BidVol2"]),
        BidVol3=float(mck_record["BidVol3"]),
        BidVol4=float(mck_record["BidVol4"]),
        BidVol5=float(mck_record["BidVol5"]),
        BidVol6=float(mck_record["BidVol6"]),
        BidVol7=float(mck_record["BidVol7"]),
        BidVol8=float(mck_record["BidVol8"]),
        BidVol9=float(mck_record["BidVol9"]),
        BidVol10=float(mck_record["BidVol10"]),
        OfferPrice1=float(mck_record["OfferPrice1"]),
        OfferPrice2=float(mck_record["OfferPrice2"]),
        OfferPrice3=float(mck_record["OfferPrice3"]),
        OfferPrice4=float(mck_record["OfferPrice4"]),
        OfferPrice5=float(mck_record["OfferPrice5"]),
        OfferPrice6=float(mck_record["OfferPrice6"]),
        OfferPrice7=float(mck_record["OfferPrice7"]),
        OfferPrice8=float(mck_record["OfferPrice8"]),
        OfferPrice9=float(mck_record["OfferPrice9"]),
        OfferPrice10=float(mck_record["OfferPrice10"]),
        BidPrice1=float(mck_record["BidPrice1"]),
        BidPrice2=float(mck_record["BidPrice2"]),
        BidPrice3=float(mck_record["BidPrice3"]),
        BidPrice4=float(mck_record["BidPrice4"]),
        BidPrice5=float(mck_record["BidPrice5"]),
        BidPrice6=float(mck_record["BidPrice6"]),
        BidPrice7=float(mck_record["BidPrice7"]),
        BidPrice8=float(mck_record["BidPrice8"]),
        BidPrice9=float(mck_record["BidPrice9"]),
        BidPrice10=float(mck_record["BidPrice10"]),
        MatchedChange=float(mck_record["MatchedChange"]),
        AvgPrice=float(mck_record["AvgPrice"]),
        FloorCode=mck_record["FloorCode"],
        HigPrice=float(mck_record["HigPrice"]),
        TradeTime=datetime.strptime(mck_record["TradeTime"], "%H:%M:%S"),
        MatchedVol=float(mck_record["MatchedVol"]),
        Code=mck_record["Code"],
        FlrPrice=float(mck_record["FlrPrice"]),
        FSellVol=float(mck_record["FSellVol"]),
        FBuyVol=float(mck_record["FBuyVol"]),
        CeiPrice=float(mck_record["CeiPrice"]),
        LowPrice=float(mck_record["LowPrice"]),
        RefPrice=float(mck_record["RefPrice"]),
        AmPm=mck_record["AmPm"],
        MatchedTotalVol=float(mck_record["MatchedTotalVol"]),
        TradingSession=MCKRecords.TradingSessionEnum(
            mck_record.get("TradingSession", "LO")
        ),
    )
    atomic_increment_record_count_all(
        db, trade_date=datetime.strptime(date, "%d/%m/%Y"), count_type=CountTypeEnum.ALL
    )
    atomic_increment_record_count_mck(
        db,
        mck_id=mck_object.id,
        trade_date=datetime.strptime(date, "%d/%m/%Y"),
        count_type=CountTypeEnum.ALL,
    )
    if mck_record["MatchedChange"] > 0:
        atomic_increment_record_count_all(
            db,
            trade_date=datetime.strptime(date, "%d/%m/%Y"),
            count_type=CountTypeEnum.GREEN,
        )
        atomic_increment_record_count_mck(
            db,
            mck_id=mck_object.id,
            trade_date=datetime.strptime(date, "%d/%m/%Y"),
            count_type=CountTypeEnum.GREEN,
        )
    elif mck_record["MatchedChange"] < 0:
        atomic_increment_record_count_all(
            db,
            trade_date=datetime.strptime(date, "%d/%m/%Y"),
            count_type=CountTypeEnum.RED,
        )
        atomic_increment_record_count_mck(
            db,
            mck_id=mck_object.id,
            trade_date=datetime.strptime(date, "%d/%m/%Y"),
            count_type=CountTypeEnum.RED,
        )
    elif mck_record["MatchedChange"] == 0:
        atomic_increment_record_count_all(
            db,
            trade_date=datetime.strptime(date, "%d/%m/%Y"),
            count_type=CountTypeEnum.YELLOW,
        )
        atomic_increment_record_count_mck(
            db,
            mck_id=mck_object.id,
            trade_date=datetime.strptime(date, "%d/%m/%Y"),
            count_type=CountTypeEnum.YELLOW,
        )
    db.add(new_mck_record)
    db.commit()
    db.refresh(new_mck_record)
    db.close()

Count Function:

def atomic_increment_record_count_all(
    db: Session, trade_date: datetime, count_type: CountTypeEnum = CountTypeEnum.ALL
):
    mck_all_count = (
        db.query(MCKRecordCountAll)
        .filter(
            MCKRecordCountAll.trade_date == trade_date,
            MCKRecordCountAll.count_type == count_type,
        )
        .with_for_update()
        .first()
    )
    if mck_all_count is not None:
        mck_all_count.count = mck_all_count.count + 1
        db.add(mck_all_count)
        db.commit()
    else:
        create_mck_record_count_all(
            db=db, trade_date=trade_date, count=0, count_type=count_type
        )


def atomic_increment_record_count_mck(
    db: Session,
    trade_date: datetime,
    mck_id: int,
    count_type: CountTypeEnum = CountTypeEnum.ALL,
):
    mck_count = (
        db.query(MCKRecordCounts)
        .filter(
            MCKRecordCounts.trade_date == trade_date,
            MCKRecordCounts.mck_id == mck_id,
            MCKRecordCounts.count_type == count_type,
        )
        .with_for_update()
        .first()
    )
    if mck_count is not None:
        mck_count.count = mck_count.count + 1
        db.add(mck_count)
        db.commit()
    else:
        create_mck_record_count(
            db=db, trade_date=trade_date, mck_id=mck_id, count=0, count_type=count_type
        )

Stock record table:

import enum

from sqlalchemy import (
    TIMESTAMP,
    Column,
    DateTime,
    Enum,
    Float,
    ForeignKey,
    Index,
    Integer,
    String,
    text,
)
from sqlalchemy.orm import relationship

from api.database.database import Base


class MCKRecords(Base):
    __tablename__ = "mck_records"
    __table_args__ = (
        Index("mck_date_index", "mck_id", "trade_date"),
        Index("mck_time_date_index", "mck_id", "trade_date", "TradeTime"),
    )

    class TradingSessionEnum(enum.Enum):
        LO = "LO"
        ATO = "ATO"
        ATC = "ATC"
        PT = "PT"
        C = "C"
        BREAK = "BREAK"
        HALT = "HALT"

    id = Column(Integer, primary_key=True, nullable=False)
    mck_id = Column(Integer(), ForeignKey("mck.id"))
    mck = relationship("MCK", back_populates="mck_record")

    trade_date = Column(DateTime, nullable=False, index=True)

    created_at = Column(TIMESTAMP(timezone=True), server_default=text("now()"))
    MatchedPrice = Column(Float, nullable=False)
    OfferVol1 = Column(Float, nullable=False)
    OfferVol2 = Column(Float, nullable=False)
    OfferVol3 = Column(Float, nullable=False)
    OfferVol4 = Column(Float, nullable=True)
    OfferVol5 = Column(Float, nullable=True)
    OfferVol6 = Column(Float, nullable=True)
    OfferVol7 = Column(Float, nullable=True)
    OfferVol8 = Column(Float, nullable=True)
    OfferVol9 = Column(Float, nullable=True)
    OfferVol10 = Column(Float, nullable=True)

    OfferPrice1 = Column(Float, nullable=False)
    OfferPrice2 = Column(Float, nullable=False)
    OfferPrice3 = Column(Float, nullable=False)
    OfferPrice4 = Column(Float, nullable=True)
    OfferPrice5 = Column(Float, nullable=True)
    OfferPrice6 = Column(Float, nullable=True)
    OfferPrice7 = Column(Float, nullable=True)
    OfferPrice8 = Column(Float, nullable=True)
    OfferPrice9 = Column(Float, nullable=True)
    OfferPrice10 = Column(Float, nullable=True)

    BidPrice1 = Column(Float, nullable=False)
    BidPrice2 = Column(Float, nullable=False)
    BidPrice3 = Column(Float, nullable=False)
    BidPrice4 = Column(Float, nullable=True)
    BidPrice5 = Column(Float, nullable=True)
    BidPrice6 = Column(Float, nullable=True)
    BidPrice7 = Column(Float, nullable=True)
    BidPrice8 = Column(Float, nullable=True)
    BidPrice9 = Column(Float, nullable=True)
    BidPrice10 = Column(Float, nullable=True)

    BidVol1 = Column(Float, nullable=False)
    BidVol2 = Column(Float, nullable=False)
    BidVol3 = Column(Float, nullable=False)
    BidVol4 = Column(Float, nullable=True)
    BidVol5 = Column(Float, nullable=True)
    BidVol6 = Column(Float, nullable=True)
    BidVol7 = Column(Float, nullable=True)
    BidVol8 = Column(Float, nullable=True)
    BidVol9 = Column(Float, nullable=True)
    BidVol10 = Column(Float, nullable=True)

    MatchedChange = Column(Float, nullable=False)
    AvgPrice = Column(Float, nullable=False)
    FloorCode = Column(String, nullable=False)
    HigPrice = Column(Float, nullable=False)
    TradeTime = Column(DateTime, index=True)
    MatchedVol = Column(Float, nullable=False)
    Code = Column(String, nullable=False)
    FBuyVol = Column(Float, nullable=False)
    CeiPrice = Column(Float, nullable=False)
    LowPrice = Column(Float, nullable=False)
    RefPrice = Column(Float, nullable=False)
    AmPm = Column(String, nullable=False)
    FlrPrice = Column(Float, nullable=False)
    MatchedTotalVol = Column(Float, nullable=False)
    FSellVol = Column(Float, nullable=False)
    TradingSession = Column(
        Enum(TradingSessionEnum),
        nullable=False,
        default=TradingSessionEnum.LO,
        server_default="LO",
    )

Count tables

import enum

from sqlalchemy import (
    TIMESTAMP,
    Column,
    DateTime,
    Enum,
    Float,
    ForeignKey,
    Index,
    Integer,
    String,
    text,
)
from sqlalchemy.orm import relationship

from api.database.database import Base


class CountTypeEnum(enum.Enum):
    RED = "RED"
    GREEN = "GREEN"
    YELLOW = "YELLOW"
    ALL = "ALL"


class MCKRecordCounts(Base):
    __tablename__ = "mck_record_counts"

    id = Column(Integer, primary_key=True, nullable=False)
    mck_id = Column(Integer(), ForeignKey("mck.id"))
    mck = relationship("MCK", back_populates="mck_record_count")
    created_at = Column(TIMESTAMP(timezone=True), server_default=text("now()"))
    trade_date = Column(DateTime, nullable=False, index=True)
    count = Column(Integer, nullable=False)
    count_type = Column(
        Enum(CountTypeEnum),
        nullable=False,
        default=CountTypeEnum.ALL,
        server_default="ALL",
    )


class MCKRecordCountAll(Base):

    __tablename__ = "mck_record_count_all"

    id = Column(Integer, primary_key=True, nullable=False)
    created_at = Column(TIMESTAMP(timezone=True), server_default=text("now()"))
    trade_date = Column(DateTime, nullable=False, index=True)
    count = Column(Integer, nullable=False)
    count_type = Column(
        Enum(CountTypeEnum),
        nullable=False,
        default=CountTypeEnum.ALL,
        server_default="ALL",
    )

After setting up all of these, you need to go into timescaledb to create a composite primary key of ID and trade_date.
Then make a Hypertable from mck_records.

After that you can pump a bunch of dict to the celery function. I dont know where the problem is so I add all the stuff.

Hi @H_N , apart from the memory leak, one thing I spot is the inefficiencies of the Count function and the way that mck_record_count_all keeps incrementing in the database. In the end, it’s just generating more and more dead tuples for the vacuum to clean later.

Here’s a blog post explaining it in detail:

you can think about having continuous aggregates that store the counters and materialize them in the background.

For the memory leak error, I suggest you to test over more versions of the timescaledb to just confirm it’s only on latest versions. If yes, we can certainly open a bug about it.

Another idea, if you want to keep your structure as it is, make it count in memory and update/insert the counters only in the very last process to have a single transaction about it. If you sort it by date, It can be done only when a new day surges or end of records.

1 Like

Thank you for the insight on the counters.
I think I know where I am wrong.
I forgot the add a bunch a index when migrate to timescale. Therefore it may cause the delay in upstream.

Got it! thanks for sharing the details. If we can isolate the memory leak, it would probably be good to create an issue on github about it.