requirements.txt
fastapi==0.115.8
uvicorn==0.30.6
selenium==4.29.0
selenium_wire==5.1.0
SQLAlchemy==2.0.38
psycopg2-binary==2.9.10
setuptools==75.8.0
blinker==1.7.0
alembic==1.14.1
nameko==2.14.1
async-timeout==5.0.1
billiard==4.2.1
celery==5.4.0
click-didyoumean==0.3.1
click-plugins==1.1.1
click-repl==0.3.0
redis==5.2.1
pydantic-settings==2.8.1
apscheduler==3.11.0
pytz==2025.1
PyJWT==2.10.1
rel==0.4.9.19
flower==2.0.1
orjson==3.10.15
gevent==24.11.1
pybloom-live==4.0.0
aiofiles==24.1.0
python-multipart==0.0.20
langchain-openai==0.3.11
langchain==0.3.21
Docker-compose:
services:
flower:
image: mher/flower:2.0
command: celery --broker=redis://redis:6379/0 flower --port=5555
profiles: ["prod", "dev"]
ports:
- '5555:5555'
depends_on:
celery:
condition : service_healthy
cadvisor:
image: gcr.io/cadvisor/cadvisor:v0.51.0
container_name: cadvisor
restart: always
profiles: ["prod", "dev"]
devices:
- /dev/kmsg
privileged: true
ports:
- '8080:8080'
volumes:
- /:/rootfs:ro
- /var/run:/var/run:ro
- /sys:/sys:ro
- /var/lib/docker/:/var/lib/docker:ro
- /dev/disk/:/dev/disk:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
- /etc/machine-id:/etc/machine-id:ro
- /var/lib/dbus/machine-id:/var/lib/dbus/machine-id:ro
node-exporter:
image: prom/node-exporter:v1.9.0
container_name: node-exporter
restart: always
profiles: ["prod", "dev"]
volumes:
- /proc:/host/proc:ro
- /sys:/host/sys:ro
- /:/rootfs:ro
ports:
- '9100:9100'
command:
- "--path.procfs=/host/proc"
- "--path.sysfs=/host/sys"
- "--path.rootfs=/rootfs"
- "--collector.filesystem.ignored-mount-points='^/(sys|proc|dev|host|etc|rootfs/var/lib/docker/containers|rootfs/var/lib/docker/overlay2|rootfs/run/docker/netns|rootfs/var/lib/docker/aufs)($$|/)'"
depends_on:
- flower
prometheus:
image: prom/prometheus:v2.26.0
container_name: prometheus
restart: always
profiles: ["prod", "dev"]
ports:
- '9090:9090'
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
depends_on:
- cadvisor
- node-exporter
- flower
grafana:
image: grafana/grafana:11.3.0-security-01
container_name: grafana
restart: always
profiles: ["prod", "dev"]
ports:
- '3333:3000'
volumes:
- grafana-data:/var/lib/grafana
depends_on:
- prometheus
- flower
timescaledb:
image: timescale/timescaledb:latest-pg16
restart: always
ports:
- '5432:5432'
environment:
POSTGRES_USER: "postgres"
POSTGRES_PASSWORD: "victoriasecret"
POSTGRES_DB: fastapi
command: -c 'max_connections=300'
profiles: ["prod", "dev"]
volumes:
- timescale_2:/var/lib/postgresql/data
redis:
image: redis:7.4.1
restart: unless-stopped
ports:
- "6379:6379"
profiles: ["prod", "dev"]
volumes:
- redis-data:/data
fast_api:
build:
context: .
dockerfile: Dockerfile
ports:
- '8000:8000'
volumes:
- .:/nextjs-fastapi
depends_on:
- timescaledb
- celery
- celery-gevent
- celery_beat
- cadvisor
- grafana
- redis
profiles: [ "prod", "dev" ]
restart: unless-stopped
celery:
build:
context: .
dockerfile: Dockerfile
command: celery -A services.worker_init.celery_app worker --concurrency=4 --pool=processes --loglevel=info -Q main --purge
env_file: ".env"
depends_on:
- redis
- timescaledb
volumes:
- .:/nextjs-fastapi
profiles: [ "prod", "dev" ]
healthcheck:
test: celery -b redis://redis:6379 inspect ping -d celery@$$HOSTNAME
interval: 30s
timeout: 10s
retries: 3
restart: unless-stopped
celery-io:
build:
context: .
dockerfile: Dockerfile
command: celery -A services.worker_init.celery_app worker --concurrency=22 --pool=processes --loglevel=info -Q io_task --purge
env_file: ".env"
depends_on:
- redis
- timescaledb
volumes:
- .:/nextjs-fastapi
profiles: [ "prod", "dev" ]
healthcheck:
test: celery -b redis://redis:6379 inspect ping -d celery@$$HOSTNAME
interval: 30s
timeout: 10s
retries: 3
restart: unless-stopped
celery_beat:
build:
context: .
dockerfile: Dockerfile
command: celery -A services.worker_init.celery_app beat --loglevel=info
env_file: ".env"
depends_on:
- redis
- timescaledb
- celery
- celery-gevent
volumes:
- .:/nextjs-fastapi
profiles: [ "prod", "dev" ]
restart: unless-stopped
Dockerfile:
FROM python:3.12-alpine
# Set the working directory inside the container
WORKDIR /nextjs-fastapi
# Copy the requirements file to the working directory
COPY requirements.txt .
# Install the Python dependencies
RUN pip install -r requirements.txt
# Copy the application code to the working directory
COPY . .
# Expose the port on which the application will run
EXPOSE 8000
# Run the FastAPI application using uvicorn server
CMD ["uvicorn", "api.index:app", "--host", "0.0.0.0", "--port", "8000"]
Write function
@celery_app.task(queue="io_task")
def write_mck_record(mck_record: dict, date: str):
db = next(get_db())
mck_object = get_mck_by_code(db, mck_record["Code"])
if not mck_object:
mck_object = create_mck(db, mck=MCKSchema(code=mck_record["Code"]))
for key, value in mck_record.items():
try:
if math.isnan(float(value)):
mck_record[key] = 0.0
except ValueError:
continue
new_mck_record = MCKRecords(
mck_id=mck_object.id,
MatchedPrice=float(mck_record["MatchedPrice"]),
trade_date=datetime.strptime(date, "%d/%m/%Y"),
OfferVol1=float(mck_record["OfferVol1"]),
OfferVol2=float(mck_record["OfferVol2"]),
OfferVol3=float(mck_record["OfferVol3"]),
OfferVol4=float(mck_record["OfferVol4"]),
OfferVol5=float(mck_record["OfferVol5"]),
OfferVol6=float(mck_record["OfferVol6"]),
OfferVol7=float(mck_record["OfferVol7"]),
OfferVol8=float(mck_record["OfferVol8"]),
OfferVol9=float(mck_record["OfferVol9"]),
OfferVol10=float(mck_record["OfferVol10"]),
BidVol1=float(mck_record["BidVol1"]),
BidVol2=float(mck_record["BidVol2"]),
BidVol3=float(mck_record["BidVol3"]),
BidVol4=float(mck_record["BidVol4"]),
BidVol5=float(mck_record["BidVol5"]),
BidVol6=float(mck_record["BidVol6"]),
BidVol7=float(mck_record["BidVol7"]),
BidVol8=float(mck_record["BidVol8"]),
BidVol9=float(mck_record["BidVol9"]),
BidVol10=float(mck_record["BidVol10"]),
OfferPrice1=float(mck_record["OfferPrice1"]),
OfferPrice2=float(mck_record["OfferPrice2"]),
OfferPrice3=float(mck_record["OfferPrice3"]),
OfferPrice4=float(mck_record["OfferPrice4"]),
OfferPrice5=float(mck_record["OfferPrice5"]),
OfferPrice6=float(mck_record["OfferPrice6"]),
OfferPrice7=float(mck_record["OfferPrice7"]),
OfferPrice8=float(mck_record["OfferPrice8"]),
OfferPrice9=float(mck_record["OfferPrice9"]),
OfferPrice10=float(mck_record["OfferPrice10"]),
BidPrice1=float(mck_record["BidPrice1"]),
BidPrice2=float(mck_record["BidPrice2"]),
BidPrice3=float(mck_record["BidPrice3"]),
BidPrice4=float(mck_record["BidPrice4"]),
BidPrice5=float(mck_record["BidPrice5"]),
BidPrice6=float(mck_record["BidPrice6"]),
BidPrice7=float(mck_record["BidPrice7"]),
BidPrice8=float(mck_record["BidPrice8"]),
BidPrice9=float(mck_record["BidPrice9"]),
BidPrice10=float(mck_record["BidPrice10"]),
MatchedChange=float(mck_record["MatchedChange"]),
AvgPrice=float(mck_record["AvgPrice"]),
FloorCode=mck_record["FloorCode"],
HigPrice=float(mck_record["HigPrice"]),
TradeTime=datetime.strptime(mck_record["TradeTime"], "%H:%M:%S"),
MatchedVol=float(mck_record["MatchedVol"]),
Code=mck_record["Code"],
FlrPrice=float(mck_record["FlrPrice"]),
FSellVol=float(mck_record["FSellVol"]),
FBuyVol=float(mck_record["FBuyVol"]),
CeiPrice=float(mck_record["CeiPrice"]),
LowPrice=float(mck_record["LowPrice"]),
RefPrice=float(mck_record["RefPrice"]),
AmPm=mck_record["AmPm"],
MatchedTotalVol=float(mck_record["MatchedTotalVol"]),
TradingSession=MCKRecords.TradingSessionEnum(
mck_record.get("TradingSession", "LO")
),
)
atomic_increment_record_count_all(
db, trade_date=datetime.strptime(date, "%d/%m/%Y"), count_type=CountTypeEnum.ALL
)
atomic_increment_record_count_mck(
db,
mck_id=mck_object.id,
trade_date=datetime.strptime(date, "%d/%m/%Y"),
count_type=CountTypeEnum.ALL,
)
if mck_record["MatchedChange"] > 0:
atomic_increment_record_count_all(
db,
trade_date=datetime.strptime(date, "%d/%m/%Y"),
count_type=CountTypeEnum.GREEN,
)
atomic_increment_record_count_mck(
db,
mck_id=mck_object.id,
trade_date=datetime.strptime(date, "%d/%m/%Y"),
count_type=CountTypeEnum.GREEN,
)
elif mck_record["MatchedChange"] < 0:
atomic_increment_record_count_all(
db,
trade_date=datetime.strptime(date, "%d/%m/%Y"),
count_type=CountTypeEnum.RED,
)
atomic_increment_record_count_mck(
db,
mck_id=mck_object.id,
trade_date=datetime.strptime(date, "%d/%m/%Y"),
count_type=CountTypeEnum.RED,
)
elif mck_record["MatchedChange"] == 0:
atomic_increment_record_count_all(
db,
trade_date=datetime.strptime(date, "%d/%m/%Y"),
count_type=CountTypeEnum.YELLOW,
)
atomic_increment_record_count_mck(
db,
mck_id=mck_object.id,
trade_date=datetime.strptime(date, "%d/%m/%Y"),
count_type=CountTypeEnum.YELLOW,
)
db.add(new_mck_record)
db.commit()
db.refresh(new_mck_record)
db.close()
Count Function:
def atomic_increment_record_count_all(
db: Session, trade_date: datetime, count_type: CountTypeEnum = CountTypeEnum.ALL
):
mck_all_count = (
db.query(MCKRecordCountAll)
.filter(
MCKRecordCountAll.trade_date == trade_date,
MCKRecordCountAll.count_type == count_type,
)
.with_for_update()
.first()
)
if mck_all_count is not None:
mck_all_count.count = mck_all_count.count + 1
db.add(mck_all_count)
db.commit()
else:
create_mck_record_count_all(
db=db, trade_date=trade_date, count=0, count_type=count_type
)
def atomic_increment_record_count_mck(
db: Session,
trade_date: datetime,
mck_id: int,
count_type: CountTypeEnum = CountTypeEnum.ALL,
):
mck_count = (
db.query(MCKRecordCounts)
.filter(
MCKRecordCounts.trade_date == trade_date,
MCKRecordCounts.mck_id == mck_id,
MCKRecordCounts.count_type == count_type,
)
.with_for_update()
.first()
)
if mck_count is not None:
mck_count.count = mck_count.count + 1
db.add(mck_count)
db.commit()
else:
create_mck_record_count(
db=db, trade_date=trade_date, mck_id=mck_id, count=0, count_type=count_type
)
Stock record table:
import enum
from sqlalchemy import (
TIMESTAMP,
Column,
DateTime,
Enum,
Float,
ForeignKey,
Index,
Integer,
String,
text,
)
from sqlalchemy.orm import relationship
from api.database.database import Base
class MCKRecords(Base):
__tablename__ = "mck_records"
__table_args__ = (
Index("mck_date_index", "mck_id", "trade_date"),
Index("mck_time_date_index", "mck_id", "trade_date", "TradeTime"),
)
class TradingSessionEnum(enum.Enum):
LO = "LO"
ATO = "ATO"
ATC = "ATC"
PT = "PT"
C = "C"
BREAK = "BREAK"
HALT = "HALT"
id = Column(Integer, primary_key=True, nullable=False)
mck_id = Column(Integer(), ForeignKey("mck.id"))
mck = relationship("MCK", back_populates="mck_record")
trade_date = Column(DateTime, nullable=False, index=True)
created_at = Column(TIMESTAMP(timezone=True), server_default=text("now()"))
MatchedPrice = Column(Float, nullable=False)
OfferVol1 = Column(Float, nullable=False)
OfferVol2 = Column(Float, nullable=False)
OfferVol3 = Column(Float, nullable=False)
OfferVol4 = Column(Float, nullable=True)
OfferVol5 = Column(Float, nullable=True)
OfferVol6 = Column(Float, nullable=True)
OfferVol7 = Column(Float, nullable=True)
OfferVol8 = Column(Float, nullable=True)
OfferVol9 = Column(Float, nullable=True)
OfferVol10 = Column(Float, nullable=True)
OfferPrice1 = Column(Float, nullable=False)
OfferPrice2 = Column(Float, nullable=False)
OfferPrice3 = Column(Float, nullable=False)
OfferPrice4 = Column(Float, nullable=True)
OfferPrice5 = Column(Float, nullable=True)
OfferPrice6 = Column(Float, nullable=True)
OfferPrice7 = Column(Float, nullable=True)
OfferPrice8 = Column(Float, nullable=True)
OfferPrice9 = Column(Float, nullable=True)
OfferPrice10 = Column(Float, nullable=True)
BidPrice1 = Column(Float, nullable=False)
BidPrice2 = Column(Float, nullable=False)
BidPrice3 = Column(Float, nullable=False)
BidPrice4 = Column(Float, nullable=True)
BidPrice5 = Column(Float, nullable=True)
BidPrice6 = Column(Float, nullable=True)
BidPrice7 = Column(Float, nullable=True)
BidPrice8 = Column(Float, nullable=True)
BidPrice9 = Column(Float, nullable=True)
BidPrice10 = Column(Float, nullable=True)
BidVol1 = Column(Float, nullable=False)
BidVol2 = Column(Float, nullable=False)
BidVol3 = Column(Float, nullable=False)
BidVol4 = Column(Float, nullable=True)
BidVol5 = Column(Float, nullable=True)
BidVol6 = Column(Float, nullable=True)
BidVol7 = Column(Float, nullable=True)
BidVol8 = Column(Float, nullable=True)
BidVol9 = Column(Float, nullable=True)
BidVol10 = Column(Float, nullable=True)
MatchedChange = Column(Float, nullable=False)
AvgPrice = Column(Float, nullable=False)
FloorCode = Column(String, nullable=False)
HigPrice = Column(Float, nullable=False)
TradeTime = Column(DateTime, index=True)
MatchedVol = Column(Float, nullable=False)
Code = Column(String, nullable=False)
FBuyVol = Column(Float, nullable=False)
CeiPrice = Column(Float, nullable=False)
LowPrice = Column(Float, nullable=False)
RefPrice = Column(Float, nullable=False)
AmPm = Column(String, nullable=False)
FlrPrice = Column(Float, nullable=False)
MatchedTotalVol = Column(Float, nullable=False)
FSellVol = Column(Float, nullable=False)
TradingSession = Column(
Enum(TradingSessionEnum),
nullable=False,
default=TradingSessionEnum.LO,
server_default="LO",
)
Count tables
import enum
from sqlalchemy import (
TIMESTAMP,
Column,
DateTime,
Enum,
Float,
ForeignKey,
Index,
Integer,
String,
text,
)
from sqlalchemy.orm import relationship
from api.database.database import Base
class CountTypeEnum(enum.Enum):
RED = "RED"
GREEN = "GREEN"
YELLOW = "YELLOW"
ALL = "ALL"
class MCKRecordCounts(Base):
__tablename__ = "mck_record_counts"
id = Column(Integer, primary_key=True, nullable=False)
mck_id = Column(Integer(), ForeignKey("mck.id"))
mck = relationship("MCK", back_populates="mck_record_count")
created_at = Column(TIMESTAMP(timezone=True), server_default=text("now()"))
trade_date = Column(DateTime, nullable=False, index=True)
count = Column(Integer, nullable=False)
count_type = Column(
Enum(CountTypeEnum),
nullable=False,
default=CountTypeEnum.ALL,
server_default="ALL",
)
class MCKRecordCountAll(Base):
__tablename__ = "mck_record_count_all"
id = Column(Integer, primary_key=True, nullable=False)
created_at = Column(TIMESTAMP(timezone=True), server_default=text("now()"))
trade_date = Column(DateTime, nullable=False, index=True)
count = Column(Integer, nullable=False)
count_type = Column(
Enum(CountTypeEnum),
nullable=False,
default=CountTypeEnum.ALL,
server_default="ALL",
)
After setting up all of these, you need to go into timescaledb to create a composite primary key of ID and trade_date.
Then make a Hypertable from mck_records.
After that you can pump a bunch of dict to the celery function. I dont know where the problem is so I add all the stuff.