Skip to main content

SQLAlchemy với SQL Server

· 11 min read

SQLAlchemy với SQL Server

SQLAlchemy là một thư viện ORM (Object Relational Mapper) mạnh mẽ cho Python, cung cấp một bộ công cụ toàn diện để làm việc với cơ sở dữ liệu quan hệ như SQL Server, MySQL, PostgreSQL, và nhiều hệ quản trị cơ sở dữ liệu khác. Bài viết này sẽ hướng dẫn bạn cách sử dụng SQLAlchemy để tương tác với Microsoft SQL Server, từ thiết lập kết nối ban đầu đến thực hiện các thao tác CRUD (Create, Read, Update, Delete) phức tạp.

Giới thiệu về SQLAlchemy

SQLAlchemy với SQL Server

SQLAlchemy được thiết kế với hai thành phần chính:

  1. Core: Cung cấp một SQL abstraction toolkit, cho phép tạo và thực thi các truy vấn SQL thông qua Python mà không cần viết trực tiếp các câu lệnh SQL.

  2. ORM (Object Relational Mapper): Cho phép ánh xạ các bảng cơ sở dữ liệu thành các lớp Python và thao tác với dữ liệu như làm việc với các đối tượng Python thông thường.

SQLAlchemy mang lại nhiều lợi ích khi làm việc với SQL Server:

  • Tính di động: Mã nguồn có thể dễ dàng chuyển đổi giữa các cơ sở dữ liệu khác nhau
  • Bảo mật: Tự động xử lý các vấn đề bảo mật như SQL injection
  • Hiệu suất: Tối ưu hóa truy vấn và connection pooling
  • Mức độ trừu tượng: Làm việc với dữ liệu ở mức đối tượng thay vì viết SQL thuần
  • Hỗ trợ transaction: Quản lý transaction đơn giản và hiệu quả

Cài đặt các thành phần cần thiết

Để làm việc với SQL Server qua SQLAlchemy, bạn cần cài đặt các gói sau:

# Cài đặt SQLAlchemy
pip install sqlalchemy

# Cài đặt ODBC driver cho SQL Server
pip install pyodbc

# Cài đặt SQLAlchemy-ODBC
pip install sqlalchemy-pyodbc-mssql

Nếu bạn muốn sử dụng thư viện mới hơn và được khuyến nghị:

# Thay thế cho sqlalchemy-pyodbc-mssql
pip install pymssql

# Hoặc sử dụng với SQLAlchemy 2.0
pip install sqlalchemy[mssql]

Thiết lập kết nối đến SQL Server

1. Tạo URL kết nối

SQLAlchemy sử dụng URL để kết nối đến cơ sở dữ liệu. Dưới đây là cách tạo URL kết nối đến SQL Server:

from sqlalchemy import create_engine, URL
import urllib.parse

# Cách 1: Sử dụng URL trực tiếp
connection_string = "mssql+pyodbc://username:password@server_name/database_name?driver=ODBC+Driver+17+for+SQL+Server"

# Cách 2: Sử dụng dictionary và URL.create
connection_url = URL.create(
"mssql+pyodbc",
username="username",
password="password",
host="server_name",
database="database_name",
query={
"driver": "ODBC Driver 17 for SQL Server",
"TrustServerCertificate": "yes",
"encrypt": "yes",
},
)

# Cách 3: Sử dụng pyodbc connection string
params = urllib.parse.quote_plus(
"DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=server_name;"
"DATABASE=database_name;"
"UID=username;"
"PWD=password;"
"TrustServerCertificate=yes;"
)

connection_url = f"mssql+pyodbc:///?odbc_connect={params}"

2. Tạo Engine

Engine là thành phần trung tâm của SQLAlchemy, đại diện cho kết nối với cơ sở dữ liệu:

from sqlalchemy import create_engine

# Tạo engine từ URL kết nối
engine = create_engine(connection_url, echo=True)

Tham số echo=True giúp hiển thị các câu lệnh SQL được tạo ra, rất hữu ích khi gỡ lỗi.

3. Kiểm tra kết nối

# Kiểm tra kết nối
try:
with engine.connect() as connection:
result = connection.execute("SELECT @@VERSION")
print(f"Kết nối thành công! Phiên bản SQL Server: {result.scalar()}")
except Exception as e:
print(f"Lỗi kết nối: {e}")

Tạo mô hình dữ liệu (ORM)

1. Định nghĩa Base và Metadata

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey
from sqlalchemy.orm import relationship
import datetime

# Tạo base class cho các model
Base = declarative_base()

# Hoặc trong SQLAlchemy 2.0+
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
pass

2. Định nghĩa các model

# Định nghĩa model cho bảng Customer
class Customer(Base):
__tablename__ = 'customers'

id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
email = Column(String(100), unique=True)
phone = Column(String(20))
created_at = Column(DateTime, default=datetime.datetime.utcnow)

# Relationship với Order
orders = relationship("Order", back_populates="customer")

def __repr__(self):
return f"<Customer(id={self.id}, name='{self.name}', email='{self.email}')>"

# Định nghĩa model cho bảng Order
class Order(Base):
__tablename__ = 'orders'

id = Column(Integer, primary_key=True)
customer_id = Column(Integer, ForeignKey('customers.id'))
order_date = Column(DateTime, default=datetime.datetime.utcnow)
total_amount = Column(Float, nullable=False)
status = Column(String(20), default='pending')

# Relationship với Customer
customer = relationship("Customer", back_populates="orders")
# Relationship với OrderItem
items = relationship("OrderItem", back_populates="order")

def __repr__(self):
return f"<Order(id={self.id}, customer_id={self.customer_id}, total_amount={self.total_amount})>"

# Định nghĩa model cho bảng OrderItem
class OrderItem(Base):
__tablename__ = 'order_items'

id = Column(Integer, primary_key=True)
order_id = Column(Integer, ForeignKey('orders.id'))
product_name = Column(String(100), nullable=False)
quantity = Column(Integer, nullable=False)
unit_price = Column(Float, nullable=False)

# Relationship với Order
order = relationship("Order", back_populates="items")

def __repr__(self):
return f"<OrderItem(id={self.id}, order_id={self.order_id}, product_name='{self.product_name}')>"

3. Tạo bảng trong cơ sở dữ liệu

# Tạo tất cả các bảng theo mô hình đã định nghĩa
Base.metadata.create_all(engine)

Thao tác CRUD với SQLAlchemy ORM

1. Thiết lập Session

Session là cách SQLAlchemy quản lý các thao tác với cơ sở dữ liệu:

from sqlalchemy.orm import sessionmaker

# Tạo một lớp Session gắn với engine
Session = sessionmaker(bind=engine)

# Tạo một instance của Session
session = Session()

Trong SQLAlchemy 2.0+, bạn có thể sử dụng:

from sqlalchemy.orm import Session

# Sử dụng context manager
with Session(engine) as session:
# Thực hiện các thao tác với session
pass

2. Thêm dữ liệu (Create)

# Tạo một khách hàng mới
new_customer = Customer(
name="Nguyễn Văn A",
email="nguyenvana@example.com",
phone="0123456789"
)

# Thêm khách hàng vào session
session.add(new_customer)

# Hoặc thêm nhiều đối tượng cùng lúc
session.add_all([
Customer(name="Trần Thị B", email="tranthib@example.com", phone="0987654321"),
Customer(name="Lê Văn C", email="levanc@example.com", phone="0369874125")
])

# Lưu các thay đổi vào cơ sở dữ liệu
session.commit()

3. Truy vấn dữ liệu (Read)

# Truy vấn tất cả khách hàng
all_customers = session.query(Customer).all()
for customer in all_customers:
print(customer)

# Truy vấn với điều kiện
customer = session.query(Customer).filter(Customer.email == "nguyenvana@example.com").first()
print(f"Tìm thấy khách hàng: {customer.name}")

# Sử dụng các toán tử lọc phức tạp
from sqlalchemy import or_, and_

customers = session.query(Customer).filter(
or_(
Customer.name.like("Nguyễn%"),
and_(
Customer.email.like("%@example.com"),
Customer.phone.startswith("01")
)
)
).all()

# Truy vấn với join
orders_with_customers = session.query(Order, Customer).join(Customer).all()
for order, customer in orders_with_customers:
print(f"Đơn hàng {order.id} thuộc về khách hàng {customer.name}")

# Truy vấn với aggregation
from sqlalchemy import func
total_orders = session.query(func.count(Order.id)).scalar()
print(f"Tổng số đơn hàng: {total_orders}")

# Tính tổng doanh thu theo khách hàng
revenue_by_customer = session.query(
Customer.name,
func.sum(Order.total_amount).label('total_revenue')
).join(Order).group_by(Customer.name).order_by(func.sum(Order.total_amount).desc()).all()

for name, revenue in revenue_by_customer:
print(f"Khách hàng: {name}, Tổng doanh thu: {revenue}")

4. Cập nhật dữ liệu (Update)

# Cập nhật thông tin một khách hàng
customer = session.query(Customer).filter(Customer.email == "nguyenvana@example.com").first()
if customer:
customer.phone = "0123123123"
session.commit()
print(f"Đã cập nhật số điện thoại của khách hàng {customer.name}")

# Cập nhật hàng loạt
affected_rows = session.query(Order).filter(Order.status == "pending").update(
{"status": "processing"},
synchronize_session=False
)
session.commit()
print(f"Đã cập nhật {affected_rows} đơn hàng từ 'pending' sang 'processing'")

5. Xóa dữ liệu (Delete)

# Xóa một đơn hàng cụ thể
order = session.query(Order).filter(Order.id == 1).first()
if order:
session.delete(order)
session.commit()
print("Đã xóa đơn hàng")

# Xóa hàng loạt
deleted_count = session.query(OrderItem).filter(OrderItem.unit_price < 10000).delete(
synchronize_session=False
)
session.commit()
print(f"Đã xóa {deleted_count} sản phẩm có giá dưới 10.000")

Xử lý transaction và lỗi

1. Sử dụng transaction

# Sử dụng context manager để quản lý transaction
from sqlalchemy.orm import Session

try:
with Session(engine) as session:
# Thêm khách hàng mới
new_customer = Customer(name="Khách hàng mới", email="khachhang@example.com")
session.add(new_customer)

# Thêm đơn hàng cho khách hàng này
new_order = Order(customer=new_customer, total_amount=150000, status="new")
session.add(new_order)

# Thêm các mặt hàng trong đơn hàng
session.add_all([
OrderItem(order=new_order, product_name="Sản phẩm A", quantity=2, unit_price=50000),
OrderItem(order=new_order, product_name="Sản phẩm B", quantity=1, unit_price=50000)
])

# Lưu tất cả các thay đổi (tự động commit khi kết thúc block with)
session.commit()
print("Đã thêm khách hàng và đơn hàng thành công")
except Exception as e:
# Transaction sẽ tự động rollback khi có lỗi
print(f"Lỗi: {e}")
# Không cần gọi session.rollback() vì context manager sẽ tự động xử lý

2. Xử lý commit và rollback thủ công

# Xử lý transaction thủ công
session = Session()
try:
# Tạo một khách hàng mới
new_customer = Customer(name="Khách hàng thủ công", email="manual@example.com")
session.add(new_customer)

# Thêm đơn hàng
new_order = Order(customer=new_customer, total_amount=200000)
session.add(new_order)

# Commit transaction
session.commit()
print("Transaction thành công")

except Exception as e:
# Rollback khi có lỗi
session.rollback()
print(f"Transaction thất bại: {e}")

finally:
# Đóng session
session.close()

Các tính năng nâng cao của SQLAlchemy

1. Sử dụng SQLAlchemy Core (Expression Language)

Ngoài ORM, bạn có thể sử dụng SQLAlchemy Core để làm việc với cú pháp biểu thức gần hơn với SQL:

from sqlalchemy import Table, MetaData, select, join

# Tạo metadata
metadata = MetaData()

# Định nghĩa bảng theo cách thủ công
customers = Table('customers', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(100)),
Column('email', String(100)),
Column('phone', String(20))
)

orders = Table('orders', metadata,
Column('id', Integer, primary_key=True),
Column('customer_id', Integer, ForeignKey('customers.id')),
Column('total_amount', Float),
Column('status', String(20))
)

# Tạo một truy vấn select
query = select(customers.c.name, orders.c.total_amount).select_from(
join(customers, orders, customers.c.id == orders.c.customer_id)
).where(
orders.c.status == 'completed'
)

# Thực thi truy vấn
with engine.connect() as conn:
result = conn.execute(query)
for row in result:
print(f"Khách hàng: {row.name}, Giá trị đơn hàng: {row.total_amount}")

2. Tạo index và constraints

from sqlalchemy import Index, UniqueConstraint

# Thêm Index và UniqueConstraint vào model
class Product(Base):
__tablename__ = 'products'

id = Column(Integer, primary_key=True)
sku = Column(String(50), nullable=False)
name = Column(String(100), nullable=False)
price = Column(Float, nullable=False)
category = Column(String(50))

# Thêm uniqueness constraint
__table_args__ = (
UniqueConstraint('sku', name='uq_product_sku'),
# Thêm index cho tìm kiếm nhanh theo tên sản phẩm
Index('idx_product_name', 'name'),
# Thêm index cho category và price
Index('idx_product_category_price', 'category', 'price')
)

3. Sử dụng lazy loading và eager loading

# Lazy loading - mặc định
customer = session.query(Customer).filter(Customer.id == 1).first()
# Các đơn hàng sẽ được load khi truy cập
for order in customer.orders: # Thêm truy vấn SQL được thực hiện ở đây
print(order)

# Eager loading với joinedload
from sqlalchemy.orm import joinedload

# Load khách hàng và đơn hàng cùng lúc
customer = session.query(Customer).options(
joinedload(Customer.orders)
).filter(Customer.id == 1).first()

# Không có thêm truy vấn SQL khi truy cập
for order in customer.orders:
print(order)

# Eager loading với nesting (lồng nhau)
customer = session.query(Customer).options(
joinedload(Customer.orders).joinedload(Order.items)
).filter(Customer.id == 1).first()

# Tất cả dữ liệu đã được load, không cần thêm truy vấn
for order in customer.orders:
for item in order.items:
print(item)

4. Sử dụng events

from sqlalchemy import event

# Event trước khi insert
@event.listens_for(Customer, 'before_insert')
def before_customer_insert(mapper, connection, customer):
print(f"Chuẩn bị thêm khách hàng: {customer.name}")
# Có thể thêm logic xử lý ở đây, ví dụ: chuẩn hóa email
customer.email = customer.email.lower()

# Event sau khi insert
@event.listens_for(Customer, 'after_insert')
def after_customer_insert(mapper, connection, customer):
print(f"Đã thêm khách hàng: {customer.name} với ID = {customer.id}")

Các thực hành tốt nhất và mẹo khi sử dụng SQLAlchemy với SQL Server

1. Sử dụng connection pooling

# Cấu hình pool khi tạo engine
engine = create_engine(
connection_url,
pool_size=10, # Số kết nối tối đa trong pool
max_overflow=20, # Số kết nối có thể tạo thêm khi pool đầy
pool_timeout=30, # Thời gian chờ kết nối (giây)
pool_recycle=1800 # Thời gian tái sử dụng kết nối (giây)
)

2. Sử dụng bulk operations cho hiệu suất cao

# Thêm hàng loạt dữ liệu hiệu quả
products = [
Product(sku=f"PRD-{i}", name=f"Sản phẩm {i}", price=10000 + i * 1000, category="Electronics")
for i in range(1, 1001)
]

# Sử dụng bulk_save_objects thay vì add_all
session.bulk_save_objects(products)
session.commit()

3. Quản lý migration với Alembic

Alembic là công cụ migration được phát triển bởi tác giả của SQLAlchemy:

# Cài đặt Alembic
pip install alembic

# Khởi tạo Alembic
alembic init migrations

Trong file env.py của Alembic, cấu hình metadata:

from sqlalchemy import engine_from_config, pool
from models import Base # Import Base từ module models của bạn

# Thiết lập target metadata
target_metadata = Base.metadata

Tạo migration và áp dụng:

# Tạo migration script
alembic revision --autogenerate -m "Create initial tables"

# Áp dụng migration
alembic upgrade head

4. Sử dụng stored procedures

from sqlalchemy import text

# Gọi stored procedure
with engine.connect() as conn:
result = conn.execute(
text("EXEC GetCustomerOrders :customer_id"),
{"customer_id": 1}
)
for row in result:
print(row)

Kết luận

SQLAlchemy cung cấp một cách mạnh mẽ và linh hoạt để làm việc với SQL Server từ Python. Bằng cách sử dụng ORM, bạn có thể tập trung vào logic nghiệp vụ của ứng dụng thay vì viết các câu lệnh SQL thủ công. Tuy nhiên, SQLAlchemy cũng rất linh hoạt, cho phép bạn viết truy vấn SQL thuần khi cần thiết.

Ngoài ra, SQLAlchemy còn có nhiều tính năng nâng cao như relationship, eager loading, connection pooling, và event listeners, giúp bạn xây dựng các ứng dụng có hiệu suất cao và dễ bảo trì.

Khi làm việc với SQL Server, hãy nhớ cấu hình các tham số kết nối phù hợp và sử dụng các trình điều khiển như pyodbc hoặc pymssql cho hiệu suất tốt nhất.


Bạn đã có kinh nghiệm sử dụng SQLAlchemy chưa? Thư viện ORM này đã giúp ích như thế nào trong các dự án của bạn? Hãy chia sẻ trong phần bình luận nhé!