Skip to main content

Index và Tối Ưu Hiệu Suất trong SQL Server

· 3 min read

Trong bài viết này, chúng ta sẽ tìm hiểu về Index và các kỹ thuật tối ưu hiệu suất trong SQL Server.

Index là gì?

Index là cấu trúc dữ liệu giúp tăng tốc độ truy vấn dữ liệu. SQL Server hỗ trợ nhiều loại index khác nhau:

Clustered Index

-- Tạo Clustered Index
CREATE CLUSTERED INDEX IX_Orders_OrderID
ON Orders(OrderID);

Non-Clustered Index

-- Tạo Non-Clustered Index
CREATE NONCLUSTERED INDEX IX_Customers_City
ON Customers(City);

Composite Index

-- Tạo Composite Index
CREATE NONCLUSTERED INDEX IX_Orders_CustomerDate
ON Orders(CustomerID, OrderDate);

Filtered Index

-- Tạo Filtered Index
CREATE NONCLUSTERED INDEX IX_Products_Active
ON Products(ProductID, ProductName)
WHERE Discontinued = 0;

Tối ưu hiệu suất truy vấn

Sử dụng Execution Plan

-- Bật Execution Plan
SET SHOWPLAN_TEXT ON;
GO

-- Truy vấn cần phân tích
SELECT
c.CustomerName,
COUNT(o.OrderID) as OrderCount
FROM Customers c
LEFT JOIN Orders o ON c.CustomerID = o.CustomerID
GROUP BY c.CustomerName;

Tối ưu JOIN

-- Sử dụng INNER JOIN thay vì LEFT JOIN khi có thể
SELECT
c.CustomerName,
o.OrderID
FROM Customers c
INNER JOIN Orders o ON c.CustomerID = o.CustomerID;

-- Sử dụng EXISTS thay vì IN
SELECT ProductName
FROM Products p
WHERE EXISTS (
SELECT 1
FROM OrderDetails od
WHERE od.ProductID = p.ProductID
);

Tối ưu WHERE

-- Sử dụng Index Seek
SELECT ProductName
FROM Products
WHERE ProductID = 1;

-- Tránh sử dụng hàm trong WHERE
-- Không tốt
SELECT OrderID
FROM Orders
WHERE YEAR(OrderDate) = 2023;

-- Tốt hơn
SELECT OrderID
FROM Orders
WHERE OrderDate >= '2023-01-01'
AND OrderDate < '2024-01-01';

Monitoring và Maintenance

Kiểm tra Index Fragmentation

SELECT 
OBJECT_NAME(ips.OBJECT_ID) as TableName,
i.name as IndexName,
ips.avg_fragmentation_in_percent
FROM sys.dm_db_index_physical_stats(
DB_ID(), NULL, NULL, NULL, NULL) ips
JOIN sys.indexes i ON ips.object_id = i.object_id
AND ips.index_id = i.index_id
WHERE ips.avg_fragmentation_in_percent > 30;

Rebuild Index

-- Rebuild một index
ALTER INDEX IX_Orders_OrderID ON Orders REBUILD;

-- Rebuild tất cả index của một bảng
ALTER INDEX ALL ON Orders REBUILD;

Update Statistics

-- Update statistics cho một bảng
UPDATE STATISTICS Orders;

-- Update statistics cho toàn bộ database
EXEC sp_updatestats;

Best Practices

  1. Tạo index cho các cột thường xuyên tìm kiếm
  2. Tránh tạo quá nhiều index
  3. Thường xuyên bảo trì index
  4. Sử dụng Execution Plan để phân tích
  5. Tối ưu câu truy vấn

Các công cụ monitoring

Dynamic Management Views (DMVs)

-- Kiểm tra index usage
SELECT
OBJECT_NAME(i.object_id) as TableName,
i.name as IndexName,
ius.user_seeks,
ius.user_scans,
ius.user_lookups
FROM sys.dm_db_index_usage_stats ius
JOIN sys.indexes i ON ius.object_id = i.object_id
AND ius.index_id = i.index_id;

SQL Server Profiler

  • Theo dõi các truy vấn chậm
  • Phân tích deadlock
  • Kiểm tra resource usage

Kết luận

Index và tối ưu hiệu suất là những chủ đề quan trọng trong SQL Server. Hiểu và áp dụng đúng các kỹ thuật này sẽ giúp cải thiện đáng kể hiệu suất của database. Trong bài viết tiếp theo, chúng ta sẽ tìm hiểu về Backup và Restore trong SQL Server.

Hệ thống quản lý rủi ro

· 4 min read

Trong bài viết này, chúng ta sẽ tìm hiểu cách xây dựng hệ thống quản lý rủi ro hiệu quả cho bot giao dịch tự động.

Hệ thống quản lý rủi ro

Đánh giá rủi ro

1. Rủi ro thị trường

class MarketRiskAnalyzer:
def __init__(self):
self.risk_metrics = {}

def calculate_var(self, returns, confidence_level=0.95):
"""Tính toán Value at Risk"""
return np.percentile(returns, (1 - confidence_level) * 100)

def calculate_expected_shortfall(self, returns, var):
"""Tính toán Expected Shortfall (CVaR)"""
return returns[returns <= var].mean()

def analyze_market_risk(self, portfolio):
# Phân tích rủi ro thị trường
returns = self.calculate_returns(portfolio)
var = self.calculate_var(returns)
es = self.calculate_expected_shortfall(returns, var)

return {
'var': var,
'expected_shortfall': es,
'volatility': returns.std()
}

2. Rủi ro tín dụng

class CreditRiskAnalyzer:
def __init__(self):
self.credit_limits = {}

def check_credit_risk(self, counterparty, amount):
"""Kiểm tra rủi ro tín dụng"""
if counterparty not in self.credit_limits:
return False

current_exposure = self.get_current_exposure(counterparty)
return current_exposure + amount <= self.credit_limits[counterparty]

def update_credit_limits(self, counterparty, new_limit):
"""Cập nhật hạn mức tín dụng"""
self.credit_limits[counterparty] = new_limit

Kiểm soát rủi ro

1. Giới hạn vị thế

class PositionLimiter:
def __init__(self, max_position_size, max_leverage):
self.max_position_size = max_position_size
self.max_leverage = max_leverage

def check_position_limit(self, symbol, size, price):
"""Kiểm tra giới hạn vị thế"""
position_value = size * price

# Kiểm tra kích thước vị thế
if position_value > self.max_position_size:
return False

# Kiểm tra đòn bẩy
leverage = position_value / self.get_account_equity()
if leverage > self.max_leverage:
return False

return True

2. Quản lý Stop Loss

class StopLossManager:
def __init__(self, max_loss_percent):
self.max_loss_percent = max_loss_percent

def calculate_stop_loss(self, entry_price, position_type):
"""Tính toán mức stop loss"""
if position_type == 'long':
return entry_price * (1 - self.max_loss_percent)
else:
return entry_price * (1 + self.max_loss_percent)

def check_stop_loss(self, current_price, stop_loss, position_type):
"""Kiểm tra điều kiện stop loss"""
if position_type == 'long':
return current_price <= stop_loss
else:
return current_price >= stop_loss

Giám sát rủi ro

1. Cảnh báo thời gian thực

class RiskMonitor:
def __init__(self):
self.risk_thresholds = {}
self.alert_channels = {}

def monitor_risk_metrics(self, metrics):
"""Giám sát các chỉ số rủi ro"""
alerts = []

for metric, value in metrics.items():
if metric in self.risk_thresholds:
threshold = self.risk_thresholds[metric]
if value > threshold:
alerts.append({
'metric': metric,
'value': value,
'threshold': threshold
})

return alerts

def send_alerts(self, alerts):
"""Gửi cảnh báo"""
for alert in alerts:
for channel, send_func in self.alert_channels.items():
send_func(alert)

2. Báo cáo rủi ro

class RiskReporter:
def __init__(self):
self.report_templates = {}

def generate_risk_report(self, risk_metrics, time_period):
"""Tạo báo cáo rủi ro"""
report = {
'timestamp': datetime.now(),
'period': time_period,
'metrics': risk_metrics,
'summary': self.summarize_risk_metrics(risk_metrics)
}

return report

def summarize_risk_metrics(self, metrics):
"""Tóm tắt các chỉ số rủi ro"""
return {
'highest_risk': max(metrics.items(), key=lambda x: x[1]),
'risk_trend': self.calculate_risk_trend(metrics)
}

Giảm thiểu rủi ro

1. Hedging

class HedgingManager:
def __init__(self):
self.hedging_instruments = {}

def calculate_hedge_ratio(self, position, hedge_instrument):
"""Tính toán tỷ lệ hedge"""
correlation = self.calculate_correlation(position, hedge_instrument)
return correlation * (position.volatility / hedge_instrument.volatility)

def execute_hedge(self, position, hedge_instrument, ratio):
"""Thực hiện hedge"""
hedge_size = position.size * ratio
return self.place_hedge_order(hedge_instrument, hedge_size)

2. Đa dạng hóa

class PortfolioDiversifier:
def __init__(self):
self.correlation_matrix = {}

def calculate_diversification_benefit(self, portfolio):
"""Tính toán lợi ích đa dạng hóa"""
portfolio_risk = self.calculate_portfolio_risk(portfolio)
individual_risks = sum(asset.risk for asset in portfolio.assets)

return 1 - (portfolio_risk / individual_risks)

def optimize_diversification(self, portfolio):
"""Tối ưu hóa đa dạng hóa"""
weights = self.calculate_optimal_weights(portfolio)
return self.rebalance_portfolio(portfolio, weights)

Best Practices

  1. Thiết lập các giới hạn rủi ro rõ ràng
  2. Thực hiện kiểm tra rủi ro thường xuyên
  3. Duy trì hệ thống cảnh báo hiệu quả
  4. Cập nhật chiến lược giảm thiểu rủi ro
  5. Lưu trữ và phân tích dữ liệu rủi ro

Kết luận

Hệ thống quản lý rủi ro là thành phần không thể thiếu trong việc vận hành bot giao dịch. Trong bài viết tiếp theo, chúng ta sẽ tìm hiểu về cách xây dựng hệ thống backtesting cho chiến lược giao dịch.

Truy Vấn SQL Nâng Cao trong SQL Server

· 3 min read

Trong bài viết này, chúng ta sẽ tìm hiểu về các kỹ thuật truy vấn SQL nâng cao trong SQL Server, bao gồm JOIN, Subquery, và Common Table Expressions (CTE).

JOIN - Kết hợp dữ liệu từ nhiều bảng

INNER JOIN

SELECT Orders.OrderID, Customers.CustomerName
FROM Orders
INNER JOIN Customers ON Orders.CustomerID = Customers.CustomerID;

LEFT JOIN

SELECT Customers.CustomerName, Orders.OrderID
FROM Customers
LEFT JOIN Orders ON Customers.CustomerID = Orders.CustomerID;

RIGHT JOIN

SELECT Orders.OrderID, Customers.CustomerName
FROM Orders
RIGHT JOIN Customers ON Orders.CustomerID = Customers.CustomerID;

FULL JOIN

SELECT Customers.CustomerName, Orders.OrderID
FROM Customers
FULL JOIN Orders ON Customers.CustomerID = Orders.CustomerID;

Subquery - Truy vấn lồng nhau

Trong mệnh đề WHERE

SELECT ProductName, UnitPrice
FROM Products
WHERE UnitPrice > (
SELECT AVG(UnitPrice)
FROM Products
);

Trong mệnh đề FROM

SELECT CategoryName, AvgPrice
FROM (
SELECT CategoryID, AVG(UnitPrice) as AvgPrice
FROM Products
GROUP BY CategoryID
) AS CategoryAvg
JOIN Categories ON CategoryAvg.CategoryID = Categories.CategoryID;

Trong mệnh đề SELECT

SELECT 
ProductName,
UnitPrice,
(SELECT AVG(UnitPrice) FROM Products) as AvgPrice
FROM Products;

Common Table Expressions (CTE)

CTE cơ bản

WITH SalesCTE AS (
SELECT
ProductID,
SUM(Quantity) as TotalQuantity
FROM OrderDetails
GROUP BY ProductID
)
SELECT
p.ProductName,
s.TotalQuantity
FROM Products p
JOIN SalesCTE s ON p.ProductID = s.ProductID;

CTE đệ quy

WITH EmployeeHierarchy AS (
-- Anchor member
SELECT
EmployeeID,
ManagerID,
EmployeeName,
1 as Level
FROM Employees
WHERE ManagerID IS NULL

UNION ALL

-- Recursive member
SELECT
e.EmployeeID,
e.ManagerID,
e.EmployeeName,
eh.Level + 1
FROM Employees e
JOIN EmployeeHierarchy eh ON e.ManagerID = eh.EmployeeID
)
SELECT * FROM EmployeeHierarchy;

Window Functions

ROW_NUMBER()

SELECT 
ProductName,
UnitPrice,
ROW_NUMBER() OVER (ORDER BY UnitPrice DESC) as PriceRank
FROM Products;

RANK() và DENSE_RANK()

SELECT 
ProductName,
UnitPrice,
RANK() OVER (ORDER BY UnitPrice DESC) as PriceRank,
DENSE_RANK() OVER (ORDER BY UnitPrice DESC) as DensePriceRank
FROM Products;

LAG() và LEAD()

SELECT 
OrderID,
OrderDate,
LAG(OrderDate) OVER (ORDER BY OrderDate) as PreviousOrder,
LEAD(OrderDate) OVER (ORDER BY OrderDate) as NextOrder
FROM Orders;

Best Practices

  1. Sử dụng JOIN thay vì Subquery khi có thể
  2. Tối ưu hiệu suất với Index
  3. Tránh SELECT *
  4. Sử dụng CTE để cải thiện khả năng đọc
  5. Kiểm tra Execution Plan

Kết luận

Các kỹ thuật truy vấn SQL nâng cao giúp bạn xử lý dữ liệu phức tạp một cách hiệu quả. Trong bài viết tiếp theo, chúng ta sẽ tìm hiểu về Stored Procedures và Functions trong SQL Server.

Stored Procedures và Functions trong SQL Server

· 3 min read

Trong bài viết này, chúng ta sẽ tìm hiểu về Stored Procedures và Functions trong SQL Server, hai công cụ mạnh mẽ để tổ chức và tái sử dụng code SQL.

Stored Procedures

Tạo Stored Procedure cơ bản

CREATE PROCEDURE sp_GetCustomerOrders
@CustomerID int
AS
BEGIN
SELECT
o.OrderID,
o.OrderDate,
p.ProductName,
od.Quantity,
od.UnitPrice
FROM Orders o
JOIN OrderDetails od ON o.OrderID = od.OrderID
JOIN Products p ON od.ProductID = p.ProductID
WHERE o.CustomerID = @CustomerID;
END;

Stored Procedure với nhiều tham số

CREATE PROCEDURE sp_InsertOrder
@CustomerID int,
@OrderDate datetime,
@ShipAddress nvarchar(100)
AS
BEGIN
INSERT INTO Orders (CustomerID, OrderDate, ShipAddress)
VALUES (@CustomerID, @OrderDate, @ShipAddress);

SELECT SCOPE_IDENTITY() as NewOrderID;
END;

Stored Procedure với OUTPUT

CREATE PROCEDURE sp_CalculateOrderTotal
@OrderID int,
@Total decimal(18,2) OUTPUT
AS
BEGIN
SELECT @Total = SUM(Quantity * UnitPrice)
FROM OrderDetails
WHERE OrderID = @OrderID;
END;

Functions

Scalar Functions

CREATE FUNCTION fn_CalculateDiscount
(
@Price decimal(18,2),
@DiscountPercent decimal(5,2)
)
RETURNS decimal(18,2)
AS
BEGIN
DECLARE @DiscountAmount decimal(18,2);
SET @DiscountAmount = @Price * (@DiscountPercent / 100);
RETURN @Price - @DiscountAmount;
END;

Table-Valued Functions

CREATE FUNCTION fn_GetProductInventory
(
@MinQuantity int
)
RETURNS TABLE
AS
RETURN
(
SELECT
p.ProductID,
p.ProductName,
p.UnitsInStock,
c.CategoryName
FROM Products p
JOIN Categories c ON p.CategoryID = c.CategoryID
WHERE p.UnitsInStock >= @MinQuantity
);

Multi-Statement Table-Valued Functions

CREATE FUNCTION fn_GetSalesByPeriod
(
@StartDate date,
@EndDate date
)
RETURNS @SalesTable TABLE
(
ProductID int,
ProductName nvarchar(100),
TotalQuantity int,
TotalAmount decimal(18,2)
)
AS
BEGIN
INSERT INTO @SalesTable
SELECT
p.ProductID,
p.ProductName,
SUM(od.Quantity) as TotalQuantity,
SUM(od.Quantity * od.UnitPrice) as TotalAmount
FROM Products p
JOIN OrderDetails od ON p.ProductID = od.ProductID
JOIN Orders o ON od.OrderID = o.OrderID
WHERE o.OrderDate BETWEEN @StartDate AND @EndDate
GROUP BY p.ProductID, p.ProductName;

RETURN;
END;

Sử dụng Stored Procedures và Functions

Gọi Stored Procedure

-- Gọi với tham số đơn
EXEC sp_GetCustomerOrders @CustomerID = 1;

-- Gọi với OUTPUT
DECLARE @Total decimal(18,2);
EXEC sp_CalculateOrderTotal @OrderID = 1, @Total = @Total OUTPUT;
SELECT @Total as OrderTotal;

Sử dụng Functions

-- Scalar Function
SELECT
ProductName,
UnitPrice,
dbo.fn_CalculateDiscount(UnitPrice, 10) as DiscountedPrice
FROM Products;

-- Table-Valued Function
SELECT * FROM fn_GetProductInventory(10);

-- Multi-Statement Table-Valued Function
SELECT * FROM fn_GetSalesByPeriod('2023-01-01', '2023-12-31');

Best Practices

  1. Đặt tên có tiền tố (sp_ cho Stored Procedures, fn_ cho Functions)
  2. Sử dụng tham số thay vì hardcode giá trị
  3. Xử lý lỗi với TRY-CATCH
  4. Tối ưu hiệu suất
  5. Ghi chú đầy đủ

Kết luận

Stored Procedures và Functions là những công cụ quan trọng trong SQL Server, giúp tổ chức code và tăng tính tái sử dụng. Trong bài viết tiếp theo, chúng ta sẽ tìm hiểu về Index và tối ưu hiệu suất trong SQL Server.

Flutter hỗ trợ iOS và Android không?

· 5 min read

Flutter có hỗ trợ phát triển ứng dụng trên cả hai nền tảng iOS và Android không?

Flutter Cross-Platform Development

Giới thiệu

Flutter là một framework phát triển ứng dụng di động mã nguồn mở được phát triển bởi Google. Ra mắt lần đầu tiên vào năm 2017, Flutter đã nhanh chóng trở thành một trong những công cụ phổ biến nhất để phát triển ứng dụng đa nền tảng.

Flutter và tính năng đa nền tảng

Để trả lời câu hỏi chính: Có, Flutter hỗ trợ đầy đủ việc phát triển ứng dụng trên cả iOS và Android từ một codebase duy nhất. Đây chính là một trong những điểm mạnh chính và cũng là lý do khiến Flutter ngày càng được ưa chuộng trong cộng đồng phát triển ứng dụng di động.

Cách Flutter hoạt động trên các nền tảng khác nhau

Flutter sử dụng cách tiếp cận độc đáo để đạt được khả năng đa nền tảng:

  1. Kiến trúc độc lập với nền tảng: Flutter không sử dụng các thành phần UI gốc của iOS hay Android. Thay vào đó, nó triển khai các widget và hệ thống render của riêng mình.

  2. Skia Graphics Engine: Flutter sử dụng engine đồ họa Skia (cũng được sử dụng bởi Chrome và Android) để vẽ mọi pixel lên màn hình, đảm bảo giao diện người dùng giống nhau trên mọi thiết bị.

  3. Dart Platform: Flutter sử dụng ngôn ngữ lập trình Dart - ngôn ngữ được tối ưu hóa cho việc phát triển ứng dụng di động và web.

Lợi ích của Flutter cho phát triển iOS và Android

1. Mã nguồn duy nhất

Với Flutter, bạn chỉ cần viết code một lần và có thể triển khai trên cả iOS và Android, giúp:

  • Giảm thời gian phát triển đến 50%
  • Dễ dàng bảo trì và cập nhật
  • Đảm bảo tính nhất quán giữa các nền tảng

2. Hiệu suất cao

Ứng dụng Flutter được biên dịch thành mã máy gốc (native code), cho phép:

  • Hiệu suất gần tương đương với ứng dụng native
  • Khởi động nhanh
  • Hoạt động mượt mà với tốc độ 60fps

3. Hot Reload

Tính năng Hot Reload của Flutter cho phép nhà phát triển thấy thay đổi ngay lập tức mà không cần khởi động lại ứng dụng, giúp:

  • Tăng tốc quá trình phát triển
  • Dễ dàng thử nghiệm và sửa lỗi
  • Cải thiện workflow giữa nhà phát triển và nhà thiết kế

So sánh Flutter với các giải pháp đa nền tảng khác

Tiêu chíFlutterReact NativeXamarin
Ngôn ngữDartJavaScriptC#
Hiệu suấtCao (biên dịch ra mã máy)Trung bìnhCao
UIWidgets độc lậpThành phần NativeThành phần Native
Cộng đồngĐang phát triển nhanhLớnỔn định
Hỗ trợ bởiGoogleFacebookMicrosoft

Các thách thức khi phát triển đa nền tảng với Flutter

Mặc dù Flutter mang lại nhiều lợi ích, nhưng vẫn có một số thách thức:

  1. Kích thước ứng dụng: Ứng dụng Flutter thường lớn hơn một chút so với ứng dụng native do phải bao gồm framework.

  2. Tích hợp API đặc thù của nền tảng: Đôi khi cần viết code đặc biệt cho mỗi nền tảng để tương tác với các API riêng biệt.

  3. Đường cong học tập của Dart: Nhà phát triển mới cần làm quen với ngôn ngữ Dart.

Ví dụ mã nguồn Flutter đơn giản

Dưới đây là một ví dụ đơn giản về ứng dụng Flutter hoạt động trên cả iOS và Android:

import 'package:flutter/material.dart';

void main() {
runApp(MyApp());
}

class MyApp extends StatelessWidget {

Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
),
home: Scaffold(
appBar: AppBar(
title: Text('Flutter trên iOS và Android'),
),
body: Center(
child: Text(
'Ứng dụng này chạy trên cả iOS và Android!',
style: TextStyle(fontSize: 20),
),
),
),
);
}
}

Kết luận

Flutter đã mang đến một giải pháp mạnh mẽ cho việc phát triển ứng dụng đa nền tảng, đặc biệt là cho iOS và Android. Với những ưu điểm vượt trội về hiệu suất, UI đồng nhất và tốc độ phát triển, Flutter đang dần thay đổi cách các nhà phát triển tiếp cận việc xây dựng ứng dụng di động.

Nếu bạn đang cân nhắc một công nghệ để phát triển ứng dụng trên cả iOS và Android, Flutter chắc chắn là một lựa chọn đáng cân nhắc, đặc biệt là khi bạn muốn có một UI đẹp và hiệu suất cao với nguồn lực phát triển hạn chế.

Tối Ưu Hóa Hiệu Suất Bot Giao Dịch

· 3 min read

Trong bài viết này, chúng ta sẽ khám phá các kỹ thuật và chiến lược để tối ưu hóa hiệu suất hoạt động của bot giao dịch tự động.

1. Xác định các nút thắt cổ chai (Bottlenecks)

1.1. Phân tích hiệu suất (Profiling)

import cProfile
import re

def my_strategy_execution():
# Code thực thi chiến lược
pass

cProfile.run('my_strategy_execution()')

1.2. Theo dõi tài nguyên hệ thống

  • CPU usage
  • Memory usage
  • Network latency

2. Tối ưu hóa mã nguồn

2.1. Cải thiện thuật toán

  • Sử dụng các cấu trúc dữ liệu hiệu quả
  • Tối ưu hóa các vòng lặp và phép tính toán
# Ví dụ: Thay vì tính toán thủ công
def calculate_sma_manual(data, window):
sma = []
for i in range(len(data)):
if i < window - 1:
sma.append(None)
else:
sma.append(sum(data[i-window+1:i+1]) / window)
return sma

# Sử dụng thư viện tối ưu
import pandas as pd

def calculate_sma_pandas(data, window):
return data.rolling(window=window).mean()

# So sánh hiệu suất
import time

data = pd.Series(range(10000))

start_time = time.time()
sma_manual = calculate_sma_manual(data, 50)
end_time = time.time()
print(f"Manual SMA time: {end_time - start_time}")

start_time = time.time()
sma_pandas = calculate_sma_pandas(data, 50)
end_time = time.time()
print(f"Pandas SMA time: {end_time - start_time}")

2.2. Sử dụng các thư viện hiệu quả

  • NumPy cho các phép tính mảng
  • Pandas cho xử lý dữ liệu
  • Tận dụng các hàm được tối ưu hóa trong thư viện

3. Tối ưu hóa cơ sở hạ tầng

3.1. Lựa chọn máy chủ và vị trí

  • Chọn máy chủ có cấu hình phù hợp
  • Đặt máy chủ gần sàn giao dịch để giảm độ trễ mạng

3.2. Tối ưu hóa kết nối mạng

  • Sử dụng kết nối internet tốc độ cao và ổn định
  • Cấu hình tường lửa và bảo mật mạng

3.3. Tối ưu hóa cơ sở dữ liệu

  • Sử dụng cơ sở dữ liệu hiệu suất cao (ví dụ: PostgreSQL, TimescaleDB)
  • Đánh chỉ mục (indexing) phù hợp cho các truy vấn dữ liệu
  • Cache dữ liệu thường xuyên sử dụng

4. Giám sát liên tục

4.1. Thiết lập hệ thống giám sát

  • Theo dõi hiệu suất bot (lợi nhuận, drawdown, v.v.)
  • Theo dõi tài nguyên hệ thống (CPU, RAM, mạng, disk)
  • Theo dõi kết nối đến sàn giao dịch

4.2. Cảnh báo và thông báo

  • Thiết lập cảnh báo khi hiệu suất giảm sút đột ngột
  • Cảnh báo khi có lỗi kết nối hoặc lỗi thực thi lệnh

5. Kiểm tra và tinh chỉnh

5.1. Backtesting với dữ liệu chất lượng cao

  • Sử dụng dữ liệu lịch sử đầy đủ và chính xác
  • Thực hiện backtesting trên nhiều khung thời gian và thị trường khác nhau

5.2. Tối ưu hóa tham số chiến lược

  • Sử dụng các kỹ thuật tối ưu hóa (ví dụ: Grid Search, Genetic Algorithms)
  • Tránh overfitting

Kết luận

Tối ưu hóa hiệu suất bot giao dịch là một quá trình liên tục đòi hỏi sự kết hợp giữa phân tích, lập trình và quản lý cơ sở hạ tầng. Bằng cách áp dụng các kỹ thuật được đề cập trong bài viết này, bạn có thể cải thiện đáng kể hiệu quả hoạt động của bot giao dịch của mình.

Trong bài viết tiếp theo, chúng ta sẽ đi sâu vào hệ thống giám sát và báo cáo cho bot giao dịch.

Lập Trình Bot Giao Dịch Tự Động

· 4 min read

Trong bài viết này, chúng ta sẽ tìm hiểu cách xây dựng và triển khai bot giao dịch tự động sử dụng Python.

Cấu trúc Bot Giao Dịch

1. Các thành phần chính

class TradingBot:
def __init__(self, api_key, api_secret):
self.api_key = api_key
self.api_secret = api_secret
self.exchange = self.connect_exchange()
self.strategy = None
self.positions = {}

def connect_exchange(self):
# Kết nối với sàn giao dịch
exchange = ccxt.binance({
'apiKey': self.api_key,
'secret': self.api_secret
})
return exchange

2. Quản lý kết nối

def handle_connection(self):
try:
# Kiểm tra kết nối
self.exchange.load_markets()
return True
except Exception as e:
logger.error(f"Lỗi kết nối: {str(e)}")
return False

Xử lý dữ liệu thị trường

1. Lấy dữ liệu realtime

def get_market_data(self, symbol, timeframe='1m'):
try:
# Lấy dữ liệu OHLCV
ohlcv = self.exchange.fetch_ohlcv(
symbol,
timeframe,
limit=100
)
return pd.DataFrame(
ohlcv,
columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']
)
except Exception as e:
logger.error(f"Lỗi lấy dữ liệu: {str(e)}")
return None

2. Xử lý tín hiệu

def process_signals(self, data):
# Áp dụng chiến lược
signals = self.strategy.generate_signals(data)

# Lọc tín hiệu
valid_signals = signals[signals['signal'] != 0]

return valid_signals

Quản lý giao dịch

1. Thực hiện lệnh

def execute_trade(self, symbol, side, amount):
try:
# Tạo lệnh
order = self.exchange.create_order(
symbol=symbol,
type='market',
side=side,
amount=amount
)

# Cập nhật vị thế
self.update_positions(order)

return order
except Exception as e:
logger.error(f"Lỗi thực hiện lệnh: {str(e)}")
return None

2. Quản lý vị thế

def update_positions(self, order):
symbol = order['symbol']
side = order['side']
amount = float(order['amount'])

if symbol not in self.positions:
self.positions[symbol] = 0

if side == 'buy':
self.positions[symbol] += amount
else:
self.positions[symbol] -= amount

Quản lý rủi ro

1. Kiểm tra điều kiện

def check_risk_limits(self, symbol, amount):
# Kiểm tra số dư
balance = self.exchange.fetch_balance()
available = balance['free'].get(symbol.split('/')[0], 0)

# Kiểm tra giới hạn
if amount > available:
logger.warning(f"Số dư không đủ: {amount} > {available}")
return False

return True

2. Quản lý stop loss

def manage_stop_loss(self, symbol, entry_price):
current_price = self.exchange.fetch_ticker(symbol)['last']
stop_loss = entry_price * 0.95 # 5% stop loss

if current_price <= stop_loss:
self.execute_trade(symbol, 'sell', self.positions[symbol])
logger.info(f"Đã kích hoạt stop loss: {symbol}")

Giám sát và báo cáo

1. Theo dõi hiệu suất

def monitor_performance(self):
# Tính toán lợi nhuận
total_pnl = 0
for symbol, amount in self.positions.items():
current_price = self.exchange.fetch_ticker(symbol)['last']
pnl = amount * current_price
total_pnl += pnl

return total_pnl

2. Tạo báo cáo

def generate_report(self):
report = {
'timestamp': datetime.now(),
'total_pnl': self.monitor_performance(),
'positions': self.positions,
'trades': self.trade_history
}

# Lưu báo cáo
self.save_report(report)

Triển khai và bảo trì

1. Chạy bot

def run(self):
while True:
try:
# Cập nhật dữ liệu
data = self.get_market_data('BTC/USDT')

# Xử lý tín hiệu
signals = self.process_signals(data)

# Thực hiện giao dịch
for signal in signals:
if self.check_risk_limits(signal['symbol'], signal['amount']):
self.execute_trade(
signal['symbol'],
signal['side'],
signal['amount']
)

# Quản lý rủi ro
self.manage_risk()

# Tạo báo cáo
self.generate_report()

# Đợi đến chu kỳ tiếp theo
time.sleep(60)

except Exception as e:
logger.error(f"Lỗi trong vòng lặp chính: {str(e)}")
time.sleep(60)

2. Bảo trì và cập nhật

def update_strategy(self, new_strategy):
# Cập nhật chiến lược
self.strategy = new_strategy

# Kiểm tra tính tương thích
if not self.strategy.validate():
raise ValueError("Chiến lược không hợp lệ")

Best Practices

  1. Luôn có cơ chế dừng khẩn cấp
  2. Kiểm tra kỹ lưỡng trước khi triển khai
  3. Giám sát liên tục
  4. Sao lưu dữ liệu thường xuyên
  5. Cập nhật và tối ưu hóa định kỳ

Kết luận

Lập trình bot giao dịch tự động đòi hỏi sự kết hợp giữa kiến thức về thị trường, kỹ năng lập trình và quản lý rủi ro. Trong bài viết tiếp theo, chúng ta sẽ tìm hiểu về cách tối ưu hóa hiệu suất của bot giao dịch.

🛠️ Thực hành và tối ưu

· 25 min read

Bắt lỗi và log chi tiết khi thao tác SQL Server bằng Python

Bắt lỗi và log chi tiết SQL Server

Giới thiệu

Khi làm việc với hệ thống phân tích dữ liệu kết hợp Python và SQL Server, việc xử lý lỗi và ghi log chi tiết là một phần không thể thiếu để đảm bảo tính ổn định và khả năng bảo trì của ứng dụng. Một hệ thống ghi log được thiết kế tốt giúp phát hiện, phân tích và khắc phục lỗi nhanh chóng, đồng thời cung cấp thông tin quý giá về hiệu suất và hành vi của hệ thống. Bài viết này sẽ hướng dẫn chi tiết các kỹ thuật bắt lỗi và thiết lập hệ thống log hiệu quả khi thao tác với SQL Server từ Python.

1. Tổng quan về xử lý lỗi và logging trong Python

1.1. Các loại lỗi thường gặp khi làm việc với SQL Server

Khi thao tác với SQL Server từ Python, chúng ta thường gặp các loại lỗi sau:

  1. Lỗi kết nối: Không thể kết nối đến máy chủ SQL Server
  2. Lỗi xác thực: Sai thông tin đăng nhập
  3. Lỗi cú pháp SQL: Lỗi trong câu lệnh SQL
  4. Lỗi thời gian chờ: Truy vấn mất quá nhiều thời gian để thực thi
  5. Lỗi ràng buộc dữ liệu: Vi phạm các ràng buộc như khóa ngoại, giá trị duy nhất, v.v
  6. Lỗi chuyển đổi kiểu dữ liệu: Không thể chuyển đổi dữ liệu giữa Python và SQL Server
  7. Lỗi tài nguyên: Hết bộ nhớ, kết nối, v.v

1.2. Hệ thống log trong Python

Python cung cấp module logging tiêu chuẩn giúp ghi log với nhiều cấp độ khác nhau:

  • DEBUG: Thông tin chi tiết, thường dùng khi gỡ lỗi
  • INFO: Xác nhận mọi thứ đang hoạt động như mong đợi
  • WARNING: Chỉ ra rằng có điều gì đó không mong muốn xảy ra, nhưng ứng dụng vẫn hoạt động
  • ERROR: Do lỗi, ứng dụng không thể thực hiện một số chức năng
  • CRITICAL: Lỗi nghiêm trọng, ứng dụng có thể không tiếp tục hoạt động

2. Thiết lập cơ bản cho logging

2.1. Thiết lập logging cơ bản

import logging

# Cấu hình cơ bản
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='sql_operations.log',
filemode='a' # Append mode
)

# Tạo logger
logger = logging.getLogger('sql_server_operations')

2.2. Cấu hình handlers đa dạng

import logging
import os
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler

def setup_logger(name, log_file, level=logging.INFO):
"""Thiết lập logger với file và console handlers"""

# Tạo thư mục logs nếu chưa tồn tại
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)

# Tạo và cấu hình logger
logger = logging.getLogger(name)
logger.setLevel(level)

# Ngăn log trùng lặp
if logger.handlers:
return logger

# Tạo file handler sử dụng RotatingFileHandler
file_handler = RotatingFileHandler(
log_file, maxBytes=10*1024*1024, backupCount=5
)
file_handler.setLevel(level)

# Tạo console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(level)

# Tạo formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

# Thêm handlers vào logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)

return logger

# Sử dụng
sql_logger = setup_logger(
'sql_server_operations',
os.path.join('logs', 'sql_operations.log')
)

2.3. Sử dụng TimedRotatingFileHandler để phân chia log theo thời gian

def setup_timed_logger(name, log_file, level=logging.INFO):
"""Thiết lập logger với TimedRotatingFileHandler để phân chia log theo ngày"""

logger = logging.getLogger(name)
logger.setLevel(level)

if logger.handlers:
return logger

# Tạo thư mục logs nếu chưa tồn tại
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)

# Tạo file handler sử dụng TimedRotatingFileHandler
# Phân chia file log mỗi ngày, giữ lại 30 file
file_handler = TimedRotatingFileHandler(
log_file, when='midnight', interval=1, backupCount=30
)
file_handler.setLevel(level)

# Tạo formatter bao gồm nhiều thông tin hơn
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s'
)
file_handler.setFormatter(formatter)

logger.addHandler(file_handler)

return logger

# Sử dụng
detailed_logger = setup_timed_logger(
'sql_detailed_operations',
os.path.join('logs', 'sql_operations_detailed.log')
)

3. Bắt và xử lý lỗi SQL Server

3.1. Xử lý lỗi cơ bản với try-except

import pyodbc
import logging

logger = logging.getLogger('sql_server_operations')

def execute_query(conn_string, query, params=None):
"""Thực thi truy vấn SQL với xử lý lỗi cơ bản"""

conn = None
cursor = None

try:
# Thiết lập kết nối
conn = pyodbc.connect(conn_string)
cursor = conn.cursor()

# Thực thi truy vấn
logger.info(f"Thực thi truy vấn: {query[:100]}...")

if params:
cursor.execute(query, params)
else:
cursor.execute(query)

# Commit nếu là truy vấn thay đổi dữ liệu
if query.strip().upper().startswith(('INSERT', 'UPDATE', 'DELETE')):
conn.commit()
logger.info("Đã commit thay đổi")
return cursor.rowcount # Trả về số hàng bị ảnh hưởng

# Lấy kết quả nếu là truy vấn SELECT
results = cursor.fetchall()
logger.info(f"Truy vấn trả về {len(results)} kết quả")
return results

except pyodbc.Error as e:
if conn:
conn.rollback()
logger.error(f"Lỗi SQL: {str(e)}")
raise

except Exception as e:
if conn:
conn.rollback()
logger.error(f"Lỗi không xác định: {str(e)}")
raise

finally:
# Đảm bảo đóng cursor và connection
if cursor:
cursor.close()
if conn:
conn.close()
logger.debug("Đã đóng kết nối")

3.2. Xử lý lỗi chi tiết theo từng loại lỗi

import pyodbc
import time
import logging
from functools import wraps

logger = logging.getLogger('sql_detailed_operations')

# Định nghĩa các mã lỗi SQL Server phổ biến
SQL_TIMEOUT_ERROR = '08S01' # Timeout
SQL_CONNECTION_ERROR = '08001' # Không thể kết nối
SQL_CONSTRAINT_VIOLATION = '23000' # Vi phạm ràng buộc

def retry_on_connection_error(max_attempts=3, delay=2):
"""Decorator để thử lại khi gặp lỗi kết nối"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
last_exception = None

while attempts < max_attempts:
try:
return func(*args, **kwargs)
except pyodbc.Error as e:
error_code = e.args[0] if len(e.args) > 0 else "Unknown"

# Chỉ thử lại với lỗi kết nối
if error_code in (SQL_TIMEOUT_ERROR, SQL_CONNECTION_ERROR):
attempts += 1
wait_time = delay * attempts # Tăng thời gian chờ theo số lần thử

logger.warning(
f"Lỗi kết nối (mã: {error_code}). "
f"Thử lại lần {attempts}/{max_attempts} sau {wait_time} giây. "
f"Chi tiết: {str(e)}"
)

time.sleep(wait_time)
last_exception = e
else:
# Với các lỗi khác thì ném ra ngay
raise

# Nếu đã thử hết số lần mà vẫn lỗi
logger.error(f"Đã thử {max_attempts} lần nhưng vẫn thất bại: {str(last_exception)}")
raise last_exception

return wrapper
return decorator


class SQLServerError(Exception):
"""Lớp cơ sở cho các lỗi SQL Server tùy chỉnh"""
def __init__(self, message, original_error=None, query=None, params=None):
self.message = message
self.original_error = original_error
self.query = query
self.params = params
super().__init__(self.message)


class SQLConnectionError(SQLServerError):
"""Lỗi kết nối đến SQL Server"""
pass


class SQLConstraintError(SQLServerError):
"""Lỗi vi phạm ràng buộc dữ liệu"""
pass


class SQLTimeoutError(SQLServerError):
"""Lỗi timeout khi thực thi truy vấn"""
pass


class SQLSyntaxError(SQLServerError):
"""Lỗi cú pháp SQL"""
pass


@retry_on_connection_error(max_attempts=3, delay=2)
def execute_query_advanced(conn_string, query, params=None, timeout=30):
"""Thực thi truy vấn SQL với xử lý lỗi chi tiết"""

conn = None
cursor = None
start_time = time.time()

try:
# Log thông tin truy vấn
if params:
masked_params = ['***' if i > 1 else str(p)[:10] for i, p in enumerate(params)]
logger.info(f"Thực thi truy vấn với tham số: {query[:100]}... - Params: {masked_params}")
else:
logger.info(f"Thực thi truy vấn: {query[:100]}...")

# Thiết lập kết nối
conn = pyodbc.connect(conn_string, timeout=timeout)
cursor = conn.cursor()

# Thực thi truy vấn
if params:
cursor.execute(query, params)
else:
cursor.execute(query)

# Đo thời gian thực thi
execution_time = time.time() - start_time

# Xác định loại truy vấn và xử lý kết quả phù hợp
query_type = query.strip().upper().split()[0] if query.strip() else ""

if query_type in ('INSERT', 'UPDATE', 'DELETE'):
conn.commit()
affected_rows = cursor.rowcount
logger.info(f"Đã commit thay đổi. {affected_rows} hàng bị ảnh hưởng. "
f"Thời gian thực thi: {execution_time:.3f}s")
return affected_rows

elif query_type == 'SELECT':
columns = [column[0] for column in cursor.description]
results = cursor.fetchall()
row_count = len(results)

logger.info(f"Truy vấn SELECT thành công. Trả về {row_count} kết quả. "
f"Thời gian thực thi: {execution_time:.3f}s")

# Log mẫu dữ liệu (chỉ log vài hàng đầu tiên để tránh quá tải)
if row_count > 0 and logger.level <= logging.DEBUG:
sample_data = str(results[0])
if len(sample_data) > 200:
sample_data = sample_data[:200] + "..."
logger.debug(f"Mẫu dữ liệu: {sample_data}")

# Trả về kết quả dưới dạng list of dict để dễ sử dụng
return [dict(zip(columns, row)) for row in results]

else:
conn.commit()
logger.info(f"Đã thực thi truy vấn. Thời gian thực thi: {execution_time:.3f}s")
return True

except pyodbc.Error as e:
# Rollback transaction nếu có lỗi
if conn:
try:
conn.rollback()
logger.info("Đã rollback transaction")
except Exception:
pass

# Xác định mã lỗi
error_code = e.args[0] if len(e.args) > 0 else "Unknown"
error_message = str(e)

# Ghi log và ném ra exception tùy chỉnh tương ứng
if error_code == SQL_CONNECTION_ERROR:
logger.error(f"Lỗi kết nối SQL Server: {error_message}")
raise SQLConnectionError("Không thể kết nối đến SQL Server", e, query, params)

elif error_code == SQL_TIMEOUT_ERROR:
logger.error(f"Lỗi timeout SQL: {error_message}")
raise SQLTimeoutError("Truy vấn bị timeout", e, query, params)

elif error_code == SQL_CONSTRAINT_VIOLATION:
logger.error(f"Lỗi vi phạm ràng buộc dữ liệu: {error_message}")
raise SQLConstraintError("Vi phạm ràng buộc dữ liệu", e, query, params)

elif 'syntax' in error_message.lower():
logger.error(f"Lỗi cú pháp SQL: {error_message}")
raise SQLSyntaxError("Lỗi cú pháp trong truy vấn SQL", e, query, params)

else:
logger.error(f"Lỗi SQL không xác định (mã: {error_code}): {error_message}")
raise SQLServerError(f"Lỗi SQL Server: {error_message}", e, query, params)

except Exception as e:
# Xử lý các lỗi khác không phải từ SQL Server
if conn:
try:
conn.rollback()
logger.info("Đã rollback transaction")
except Exception:
pass

logger.error(f"Lỗi không xác định: {str(e)}", exc_info=True)
raise

finally:
# Đảm bảo đóng cursor và connection
if cursor:
cursor.close()
if conn:
conn.close()
logger.debug("Đã đóng kết nối DB")

4. Thiết kế hệ thống log toàn diện

4.1. Tạo lớp Logger tùy chỉnh

import logging
import os
import json
import traceback
import socket
from datetime import datetime
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler


class SQLLogger:
"""Lớp logger tùy chỉnh cho các thao tác với SQL Server"""

def __init__(self, app_name, log_dir='logs', log_level=logging.INFO):
"""Khởi tạo hệ thống log"""
self.app_name = app_name
self.log_dir = log_dir
self.log_level = log_level
self.hostname = socket.gethostname()

# Tạo thư mục logs nếu chưa tồn tại
if not os.path.exists(log_dir):
os.makedirs(log_dir)

# Thiết lập các logger
self.setup_loggers()

def setup_loggers(self):
"""Thiết lập các logger khác nhau cho từng mục đích"""

# Logger cho các thao tác SQL thông thường
self.sql_logger = self._create_logger(
'sql_operations',
os.path.join(self.log_dir, 'sql_operations.log'),
self.log_level
)

# Logger chi tiết cho lỗi
self.error_logger = self._create_logger(
'sql_errors',
os.path.join(self.log_dir, 'sql_errors.log'),
logging.ERROR
)

# Logger cho các truy vấn chậm
self.slow_query_logger = self._create_logger(
'slow_queries',
os.path.join(self.log_dir, 'slow_queries.log'),
logging.WARNING
)

def _create_logger(self, name, log_file, level):
"""Tạo logger với file handler và formatter"""

# Tạo logger với tên ứng dụng làm prefix
logger_name = f"{self.app_name}.{name}"
logger = logging.getLogger(logger_name)
logger.setLevel(level)

# Ngăn log trùng lặp
if logger.handlers:
return logger

# Tạo file handler với rotation
file_handler = TimedRotatingFileHandler(
log_file, when='midnight', interval=1, backupCount=30
)
file_handler.setLevel(level)

# Tạo formatter chi tiết
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(name)s - [%(hostname)s] - '
'%(pathname)s:%(lineno)d - %(message)s'
)

# Thêm thông tin hostname vào formatter
old_factory = logging.getLogRecordFactory()

def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.hostname = self.hostname
return record

logging.setLogRecordFactory(record_factory)

file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

return logger

def log_query(self, query, params=None, duration=None, result_count=None):
"""Ghi log truy vấn SQL"""

# Tạo thông tin log
log_data = {
'query': query[:500] + ('...' if len(query) > 500 else ''),
'timestamp': datetime.now().isoformat(),
'hostname': self.hostname
}

# Thêm params nếu có (che dấu thông tin nhạy cảm)
if params:
masked_params = []
for p in params:
if isinstance(p, str) and len(p) > 10:
masked_params.append(p[:5] + '...' + p[-2:])
else:
masked_params.append(p)
log_data['params'] = masked_params

# Thêm thời gian thực thi nếu có
if duration:
log_data['duration'] = f"{duration:.3f}s"

# Log truy vấn chậm (>1s) vào logger riêng
if duration > 1.0:
self.slow_query_logger.warning(
f"Truy vấn chậm: {log_data['query']} - "
f"Thời gian: {log_data['duration']}"
)

# Thêm số lượng kết quả nếu có
if result_count is not None:
log_data['result_count'] = result_count

# Ghi log thông thường
self.sql_logger.info(f"SQL Query: {json.dumps(log_data)}")

return log_data

def log_error(self, error, query=None, params=None, context=None):
"""Ghi log lỗi SQL với thông tin chi tiết"""

error_type = type(error).__name__
error_message = str(error)
stack_trace = traceback.format_exc()

# Tạo thông tin log
error_data = {
'error_type': error_type,
'error_message': error_message,
'timestamp': datetime.now().isoformat(),
'hostname': self.hostname,
'stack_trace': stack_trace
}

# Thêm query nếu có
if query:
error_data['query'] = query[:500] + ('...' if len(query) > 500 else '')

# Thêm params nếu có (che dấu thông tin nhạy cảm)
if params:
masked_params = []
for p in params:
if isinstance(p, str) and len(p) > 10:
masked_params.append(p[:5] + '...' + p[-2:])
else:
masked_params.append(p)
error_data['params'] = masked_params

# Thêm context nếu có
if context:
error_data['context'] = context

# Ghi log lỗi
self.error_logger.error(f"SQL Error: {json.dumps(error_data)}")

# Đồng thời ghi log thông thường
self.sql_logger.error(
f"SQL Error: {error_type} - {error_message}"
)

return error_data

def log_transaction(self, action, affected_rows=None, duration=None):
"""Ghi log các thao tác transaction"""

log_data = {
'action': action,
'timestamp': datetime.now().isoformat(),
'hostname': self.hostname
}

if affected_rows is not None:
log_data['affected_rows'] = affected_rows

if duration:
log_data['duration'] = f"{duration:.3f}s"

self.sql_logger.info(f"Transaction: {json.dumps(log_data)}")

return log_data

4.2. Tích hợp logger tùy chỉnh với thao tác SQL

import pyodbc
import time
from contextlib import contextmanager

class SQLServerDatabase:
"""Lớp quản lý kết nối và thao tác với SQL Server kèm logging"""

def __init__(self, conn_string, app_name="SQLApp", log_dir="logs"):
"""Khởi tạo với chuỗi kết nối và thiết lập logger"""
self.conn_string = conn_string
self.logger = SQLLogger(app_name, log_dir)

@contextmanager
def connection(self):
"""Context manager để quản lý kết nối tự động đóng"""
conn = None
try:
conn = pyodbc.connect(self.conn_string)
yield conn
except pyodbc.Error as e:
self.logger.log_error(e, context="establishing connection")
raise
finally:
if conn:
conn.close()

@contextmanager
def transaction(self):
"""Context manager để quản lý transaction"""
with self.connection() as conn:
try:
# Bắt đầu transaction
start_time = time.time()
self.logger.log_transaction("START")

yield conn

# Commit transaction nếu không có lỗi
conn.commit()
duration = time.time() - start_time
self.logger.log_transaction("COMMIT", duration=duration)

except Exception as e:
# Rollback transaction nếu có lỗi
conn.rollback()
duration = time.time() - start_time
self.logger.log_transaction("ROLLBACK", duration=duration)

# Log lỗi
self.logger.log_error(e, context="transaction")
raise

def execute_query(self, query, params=None):
"""Thực thi truy vấn và trả về kết quả"""
with self.connection() as conn:
try:
start_time = time.time()
cursor = conn.cursor()

# Thực thi truy vấn
if params:
cursor.execute(query, params)
else:
cursor.execute(query)

# Lấy kết quả nếu là truy vấn SELECT
if query.strip().upper().startswith("SELECT"):
columns = [column[0] for column in cursor.description]
results = cursor.fetchall()
result_count = len(results)

# Tính thời gian thực thi
duration = time.time() - start_time

# Log truy vấn
self.logger.log_query(
query, params, duration=duration, result_count=result_count
)

# Trả về kết quả dưới dạng list of dict
return [dict(zip(columns, row)) for row in results]
else:
# Đối với các truy vấn thay đổi dữ liệu
affected_rows = cursor.rowcount

# Tính thời gian thực thi
duration = time.time() - start_time

# Log truy vấn
self.logger.log_query(
query, params, duration=duration, result_count=affected_rows
)

# Commit thay đổi
conn.commit()

# Trả về số hàng bị ảnh hưởng
return affected_rows

except Exception as e:
# Log lỗi
self.logger.log_error(e, query, params)
raise

def execute_many(self, query, params_list):
"""Thực thi nhiều truy vấn với danh sách tham số"""
with self.transaction() as conn:
try:
start_time = time.time()
cursor = conn.cursor()

# Thực thi executemany
cursor.executemany(query, params_list)

# Tính thời gian thực thi
duration = time.time() - start_time

# Lấy số hàng bị ảnh hưởng
affected_rows = cursor.rowcount

# Log thông tin
self.logger.log_query(
query,
f"[{len(params_list)} parameter sets]",
duration=duration,
result_count=affected_rows
)

return affected_rows

except Exception as e:
# Log lỗi
self.logger.log_error(e, query, f"[{len(params_list)} parameter sets]")
raise

def bulk_insert(self, table_name, data, batch_size=1000):
"""Thực hiện bulk insert với logging chi tiết"""
if not data:
return 0

total_rows = len(data)
total_batches = (total_rows + batch_size - 1) // batch_size
total_inserted = 0
start_total_time = time.time()

self.logger.sql_logger.info(
f"Bắt đầu bulk insert vào bảng {table_name}. "
f"{total_rows} hàng, {total_batches} batch(es)"
)

try:
# Lấy tên các cột từ dữ liệu
if isinstance(data[0], dict):
columns = list(data[0].keys())
# Chuyển dữ liệu từ dict sang list
data_values = [[row[col] for col in columns] for row in data]
else:
raise ValueError("Data phải là list of dict")

# Xây dựng câu truy vấn insert
placeholders = ','.join('?' for _ in columns)
column_names = ','.join(columns)
query = f"INSERT INTO {table_name} ({column_names}) VALUES ({placeholders})"

# Thực hiện insert theo batch
for i in range(0, total_rows, batch_size):
batch_start_time = time.time()

# Lấy một batch dữ liệu
batch = data_values[i:i+batch_size]
batch_size_actual = len(batch)

# Thực hiện insert
with self.transaction() as conn:
cursor = conn.cursor()
cursor.executemany(query, batch)
batch_affected = cursor.rowcount

# Tính thời gian và log
batch_duration = time.time() - batch_start_time
total_inserted += batch_affected

self.logger.sql_logger.info(
f"Batch {(i//batch_size)+1}/{total_batches}: "
f"Đã insert {batch_affected}/{batch_size_actual} hàng "
f"trong {batch_duration:.3f}s"
)

# Log tổng kết
total_duration = time.time() - start_total_time
self.logger.sql_logger.info(
f"Hoàn thành bulk insert vào bảng {table_name}. "
f"Tổng số: {total_inserted}/{total_rows} hàng "
f"trong {total_duration:.3f}s"
)

return total_inserted

except Exception as e:
# Log lỗi
self.logger.log_error(
e,
context=f"bulk_insert to {table_name}, {total_rows} rows"
)
raise

5. Ứng dụng thực tế

5.1. Ví dụ sử dụng lớp Database với xử lý lỗi

# Ví dụ sử dụng lớp Database để thao tác với SQL Server
conn_string = 'DRIVER={SQL Server};SERVER=your_server;DATABASE=your_db;UID=your_user;PWD=your_password'

# Khởi tạo đối tượng Database
db = SQLServerDatabase(conn_string, app_name="SalesAnalytics", log_dir="logs/sales")

try:
# Truy vấn đơn giản
results = db.execute_query(
"SELECT TOP 10 * FROM DuLieuBanHang WHERE NgayBan >= ?",
['2024-01-01']
)
print(f"Lấy được {len(results)} kết quả")

# Thực hiện insert với transaction
with db.transaction() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO DuLieuBanHang (NgayBan, MaSanPham, SoLuong, DoanhThu)
VALUES (?, ?, ?, ?)
""", ['2024-05-01', 'SP001', 5, 1500000])

# Có thể thực hiện nhiều lệnh trong cùng một transaction
cursor.execute("""
UPDATE TongKetDoanhThu
SET TongDoanhThu = TongDoanhThu + ?
WHERE Thang = 5 AND Nam = 2024
""", [1500000])

# Bulk insert dữ liệu
sales_data = [
{
'NgayBan': '2024-05-01',
'MaSanPham': f'SP{i:03d}',
'SoLuong': i % 10 + 1,
'DoanhThu': (i % 10 + 1) * 300000
}
for i in range(1, 101)
]

inserted = db.bulk_insert('DuLieuBanHang', sales_data, batch_size=20)
print(f"Đã insert {inserted} hàng dữ liệu")

except SQLConnectionError as e:
print(f"Lỗi kết nối: {e.message}")
# Thử kết nối lại hoặc thông báo cho người dùng

except SQLConstraintError as e:
print(f"Lỗi ràng buộc dữ liệu: {e.message}")
# Kiểm tra và sửa dữ liệu

except SQLTimeoutError as e:
print(f"Truy vấn bị timeout: {e.message}")
# Tối ưu truy vấn hoặc tăng timeout

except SQLSyntaxError as e:
print(f"Lỗi cú pháp SQL: {e.message}")
# Sửa lỗi cú pháp trong truy vấn

except SQLServerError as e:
print(f"Lỗi SQL Server: {e.message}")
# Xử lý lỗi chung từ SQL Server

except Exception as e:
print(f"Lỗi không xác định: {str(e)}")
# Ghi log và thông báo lỗi chung

5.2. Xử lý lỗi khi làm việc với pandas và SQLAlchemy

import pandas as pd
from sqlalchemy import create_engine, text
import urllib
import logging

# Thiết lập logger
logger = logging.getLogger('pandas_sql')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('logs/pandas_sql.log')
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(handler)

def create_sql_engine(server, database, username, password):
"""Tạo SQLAlchemy engine với xử lý lỗi"""
try:
# Tạo chuỗi kết nối
params = urllib.parse.quote_plus(
f"DRIVER={{SQL Server}};SERVER={server};"
f"DATABASE={database};UID={username};PWD={password}"
)

# Tạo engine với cấu hình
engine = create_engine(
f"mssql+pyodbc:///?odbc_connect={params}",
pool_pre_ping=True, # Kiểm tra kết nối trước khi sử dụng
pool_recycle=3600, # Làm mới kết nối sau 1 giờ
connect_args={'timeout': 30} # Timeout kết nối là 30 giây
)

# Kiểm tra kết nối
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
if result.scalar() == 1:
logger.info(f"Kết nối thành công đến {server}/{database}")
return engine
else:
raise Exception("Kiểm tra kết nối không thành công")

except Exception as e:
logger.error(f"Lỗi tạo kết nối SQL: {str(e)}")
raise

def read_sql_with_logging(query, engine, params=None):
"""Đọc dữ liệu từ SQL với pandas và log"""
try:
start_time = time.time()
logger.info(f"Bắt đầu đọc dữ liệu với query: {query[:200]}...")

# Thực hiện truy vấn
if params:
df = pd.read_sql(query, engine, params=params)
else:
df = pd.read_sql(query, engine)

# Log kết quả
duration = time.time() - start_time
row_count = len(df)
col_count = len(df.columns)

logger.info(
f"Hoàn thành đọc dữ liệu: {row_count} hàng × {col_count} cột "
f"trong {duration:.3f}s"
)

# Log thông tin về bộ nhớ sử dụng
memory_usage = df.memory_usage(deep=True).sum()
logger.info(f"Bộ nhớ sử dụng: {memory_usage/1024/1024:.2f} MB")

return df

except Exception as e:
logger.error(f"Lỗi đọc dữ liệu SQL: {str(e)}")
raise

def write_to_sql_with_logging(df, table_name, engine, if_exists='replace', chunksize=1000):
"""Ghi DataFrame vào SQL Server với logging"""
try:
start_time = time.time()
row_count = len(df)
logger.info(
f"Bắt đầu ghi {row_count} hàng vào bảng {table_name}, "
f"chế độ: {if_exists}, kích thước chunk: {chunksize}"
)

# Thực hiện ghi dữ liệu
df.to_sql(
table_name,
engine,
if_exists=if_exists,
chunksize=chunksize,
index=False
)

# Log kết quả
duration = time.time() - start_time
logger.info(
f"Hoàn thành ghi dữ liệu vào bảng {table_name}: "
f"{row_count} hàng trong {duration:.3f}s"
)

return True

except Exception as e:
logger.error(f"Lỗi ghi dữ liệu vào SQL: {str(e)}")
raise

5.3. Tích hợp với hệ thống giám sát

import requests
import socket
import json
import traceback
from datetime import datetime

class SQLMonitor:
"""Lớp giám sát SQL Server và gửi thông báo khi có lỗi"""

def __init__(self, webhook_url=None, email_config=None):
"""Khởi tạo với URL webhook và cấu hình email"""
self.webhook_url = webhook_url
self.email_config = email_config
self.hostname = socket.gethostname()
self.logger = logging.getLogger('sql_monitor')

# Thiết lập handler cho logger
handler = logging.FileHandler('logs/sql_monitor.log')
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)

def log_and_alert(self, error, query=None, context=None, alert_level='warning'):
"""Ghi log lỗi và gửi thông báo"""
# Tạo thông tin lỗi
error_data = {
'timestamp': datetime.now().isoformat(),
'hostname': self.hostname,
'error_type': type(error).__name__,
'error_message': str(error),
'stack_trace': traceback.format_exc(),
'alert_level': alert_level
}

if query:
error_data['query'] = query

if context:
error_data['context'] = context

# Ghi log
if alert_level == 'critical':
self.logger.critical(f"SQL Critical Error: {json.dumps(error_data)}")
elif alert_level == 'error':
self.logger.error(f"SQL Error: {json.dumps(error_data)}")
else:
self.logger.warning(f"SQL Warning: {json.dumps(error_data)}")

# Gửi thông báo
if alert_level in ('error', 'critical'):
self._send_alert(error_data)

def _send_alert(self, error_data):
"""Gửi thông báo lỗi qua webhook và/hoặc email"""

# Gửi qua webhook (ví dụ: Slack, Teams, etc.)
if self.webhook_url:
try:
# Định dạng thông báo
message = {
'text': f"SQL Error on {error_data['hostname']}",
'attachments': [{
'title': f"{error_data['error_type']}: {error_data['error_message']}",
'text': f"Context: {error_data.get('context', 'N/A')}\n"
f"Query: {error_data.get('query', 'N/A')}\n"
f"Time: {error_data['timestamp']}",
'color': 'danger' if error_data['alert_level'] == 'critical' else 'warning'
}]
}

# Gửi request
response = requests.post(
self.webhook_url,
json=message,
timeout=5
)

if response.status_code == 200:
self.logger.info("Đã gửi thông báo lỗi qua webhook")
else:
self.logger.warning(
f"Không thể gửi thông báo qua webhook. "
f"Status code: {response.status_code}"
)

except Exception as e:
self.logger.error(f"Lỗi khi gửi thông báo webhook: {str(e)}")

# Gửi qua email
if self.email_config:
try:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# Tạo email
msg = MIMEMultipart()
msg['From'] = self.email_config['from']
msg['To'] = self.email_config['to']
msg['Subject'] = f"SQL Error on {error_data['hostname']}: {error_data['error_type']}"

# Tạo nội dung
body = f"""
<h2>SQL Error Details</h2>
<p><strong>Time:</strong> {error_data['timestamp']}</p>
<p><strong>Host:</strong> {error_data['hostname']}</p>
<p><strong>Error Type:</strong> {error_data['error_type']}</p>
<p><strong>Error Message:</strong> {error_data['error_message']}</p>

<h3>Context</h3>
<p>{error_data.get('context', 'N/A')}</p>

<h3>Query</h3>
<pre>{error_data.get('query', 'N/A')}</pre>

<h3>Stack Trace</h3>
<pre>{error_data['stack_trace']}</pre>
"""

msg.attach(MIMEText(body, 'html'))

# Gửi email
server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
server.starttls()
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
server.quit()

self.logger.info("Đã gửi thông báo lỗi qua email")

except Exception as e:
self.logger.error(f"Lỗi khi gửi email thông báo: {str(e)}")

5.4. Hệ thống theo dõi hiệu suất truy vấn SQL

class SQLPerformanceTracker:
"""Lớp theo dõi hiệu suất truy vấn SQL"""

def __init__(self, log_dir='logs/performance'):
"""Khởi tạo với thư mục log"""
self.log_dir = log_dir

# Tạo thư mục nếu chưa tồn tại
if not os.path.exists(log_dir):
os.makedirs(log_dir)

# Thiết lập logger
self.logger = logging.getLogger('sql_performance')
handler = logging.FileHandler(os.path.join(log_dir, 'performance.log'))
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)

# Thống kê hiệu suất
self.stats = {
'total_queries': 0,
'total_duration': 0,
'max_duration': 0,
'slow_queries': 0,
'error_queries': 0,
'query_types': {
'SELECT': 0,
'INSERT': 0,
'UPDATE': 0,
'DELETE': 0,
'OTHER': 0
}
}

def track_query(self, query, duration, result_count=None, error=None):
"""Theo dõi một truy vấn SQL"""
try:
# Cập nhật thống kê
self.stats['total_queries'] += 1
self.stats['total_duration'] += duration
self.stats['max_duration'] = max(self.stats['max_duration'], duration)

if duration > 1.0: # Truy vấn chậm > 1 giây
self.stats['slow_queries'] += 1

if error:
self.stats['error_queries'] += 1

# Xác định loại truy vấn
first_word = query.strip().upper().split()[0] if query.strip() else "OTHER"
if first_word in self.stats['query_types']:
self.stats['query_types'][first_word] += 1
else:
self.stats['query_types']['OTHER'] += 1

# Log thông tin truy vấn
log_entry = {
'timestamp': datetime.now().isoformat(),
'query_type': first_word,
'duration': duration,
'result_count': result_count,
'has_error': error is not None,
'query_preview': query[:100] + ('...' if len(query) > 100 else '')
}

self.logger.info(f"Query Stats: {json.dumps(log_entry)}")

# Log chi tiết cho truy vấn chậm
if duration > 1.0:
slow_log_entry = {
'timestamp': datetime.now().isoformat(),
'duration': duration,
'query': query,
'result_count': result_count
}

with open(os.path.join(self.log_dir, 'slow_queries.log'), 'a') as f:
f.write(f"{json.dumps(slow_log_entry)}\n")

return log_entry

except Exception as e:
print(f"Lỗi khi theo dõi truy vấn: {str(e)}")

def get_statistics(self):
"""Lấy thống kê hiệu suất"""
stats = self.stats.copy()

# Tính thời gian trung bình
if stats['total_queries'] > 0:
stats['avg_duration'] = stats['total_duration'] / stats['total_queries']
else:
stats['avg_duration'] = 0

return stats

def generate_report(self, output_file=None):
"""Tạo báo cáo hiệu suất"""
stats = self.get_statistics()

report = f"""
SQL Performance Report - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
=============================================================

Summary:
--------
Total Queries: {stats['total_queries']}
Average Duration: {stats['avg_duration']:.6f} seconds
Maximum Duration: {stats['max_duration']:.6f} seconds
Slow Queries (>1s): {stats['slow_queries']} ({stats['slow_queries']/max(1, stats['total_queries'])*100:.2f}%)
Error Queries: {stats['error_queries']} ({stats['error_queries']/max(1, stats['total_queries'])*100:.2f}%)

Query Types:
------------
SELECT: {stats['query_types']['SELECT']} ({stats['query_types']['SELECT']/max(1, stats['total_queries'])*100:.2f}%)
INSERT: {stats['query_types']['INSERT']} ({stats['query_types']['INSERT']/max(1, stats['total_queries'])*100:.2f}%)
UPDATE: {stats['query_types']['UPDATE']} ({stats['query_types']['UPDATE']/max(1, stats['total_queries'])*100:.2f}%)
DELETE: {stats['query_types']['DELETE']} ({stats['query_types']['DELETE']/max(1, stats['total_queries'])*100:.2f}%)
OTHER: {stats['query_types']['OTHER']} ({stats['query_types']['OTHER']/max(1, stats['total_queries'])*100:.2f}%)
"""

if output_file:
with open(output_file, 'w') as f:
f.write(report)

return report

6. Tích hợp vào hệ thống trực tiếp

6.1. Tạo decorators cho xử lý lỗi tự động

from functools import wraps
import time
import traceback

def log_sql_operation(logger):
"""Decorator để log các thao tác SQL"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Lấy tên hàm
func_name = func.__name__
start_time = time.time()

try:
# Thực thi hàm
logger.info(f"Bắt đầu thực thi {func_name}")
result = func(*args, **kwargs)

# Tính thời gian thực thi
duration = time.time() - start_time

# Log kết quả thành công
logger.info(f"Thực thi {func_name} thành công trong {duration:.3f}s")

return result

except Exception as e:
# Tính thời gian thực thi
duration = time.time() - start_time

# Log lỗi
error_type = type(e).__name__
error_message = str(e)
stack_trace = traceback.format_exc()

logger.error(
f"Lỗi khi thực thi {func_name}: {error_type} - {error_message}\n"
f"Thời gian: {duration:.3f}s\n"
f"Stack trace: {stack_trace}"
)

# Ném lại exception
raise

return wrapper
return decorator

def retry_on_specific_errors(max_attempts=3, delay=2, error_types=(Exception,)):
"""Decorator để thử lại khi gặp lỗi cụ thể"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
last_exception = None

while attempts < max_attempts:
try:
return func(*args, **kwargs)
except error_types as e:
attempts += 1
last_exception = e

# Tăng thời gian chờ theo số lần thử
wait_time = delay * attempts

if attempts < max_attempts:
logging.warning(
f"Lỗi khi thực thi {func.__name__}: {str(e)}\n"
f"Thử lại lần {attempts}/{max_attempts} sau {wait_time} giây"
)
time.sleep(wait_time)
else:
logging.error(
f"Đã thử lại {max_attempts} lần nhưng vẫn thất bại: {str(e)}"
)

# Nếu đã thử hết số lần mà vẫn lỗi
raise last_exception

return wrapper
return decorator

6.2. Tích hợp với FastAPI hoặc Flask

from fastapi import FastAPI, HTTPException, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel
import logging
import time

# Thiết lập logger
logger = logging.getLogger("api_sql_operations")
logger.setLevel(logging.INFO)
handler = logging.FileHandler("logs/api_sql.log")
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(handler)

# Khởi tạo FastAPI app
app = FastAPI(title="SQL Operations API")

# Khởi tạo đối tượng Database
db = SQLServerDatabase(
conn_string='DRIVER={SQL Server};SERVER=your_server;DATABASE=your_db;UID=your_user;PWD=your_password',
app_name="API_Service",
log_dir="logs/api"
)

# Model cho request
class DataQuery(BaseModel):
query: str
params: list = None

# Middleware để log request và response
@app.middleware("http")
async def log_requests(request, call_next):
start_time = time.time()

# Log request
logger.info(f"Request: {request.method} {request.url}")

try:
# Xử lý request
response = await call_next(request)

# Tính thời gian xử lý
duration = time.time() - start_time

# Log response
logger.info(f"Response: {response.status_code} in {duration:.3f}s")

return response

except Exception as e:
# Log lỗi
duration = time.time() - start_time
logger.error(f"Error: {str(e)} in {duration:.3f}s")

# Trả về lỗi 500
return JSONResponse(
status_code=500,
content={"detail": "Internal Server Error"}
)

# Endpoint để thực thi truy vấn SELECT
@app.post("/query/select")
async def execute_select(query_data: DataQuery):
try:
# Log truy vấn
logger.info(f"Executing SELECT query: {query_data.query[:100]}...")

# Kiểm tra xem có phải truy vấn SELECT không
if not query_data.query.strip().upper().startswith("SELECT"):
raise HTTPException(
status_code=400,
detail="Only SELECT queries are allowed for this endpoint"
)

# Thực thi truy vấn
start_time = time.time()
results = db.execute_query(query_data.query, query_data.params)
duration = time.time() - start_time

# Log kết quả
logger.info(f"Query executed in {duration:.3f}s, returned {len(results)} rows")

return {
"success": True,
"duration": duration,
"row_count": len(results),
"results": results
}

except SQLServerError as e:
# Log lỗi
logger.error(f"SQL Error: {e.message}")

# Trả về lỗi
raise HTTPException(
status_code=400,
detail=f"SQL Error: {e.message}"
)

except Exception as e:
# Log lỗi không xác định
logger.error(f"Unexpected error: {str(e)}")

# Trả về lỗi 500
raise HTTPException(
status_code=500,
detail="Internal server error"
)

Kết luận

Bắt lỗi và log chi tiết SQL Server

Bắt lỗi và log chi tiết khi thao tác SQL Server bằng Python là một phần không thể thiếu trong việc xây dựng các ứng dụng dữ liệu chuyên nghiệp, đáng tin cậy. Một hệ thống xử lý lỗi và ghi log tốt không chỉ giúp phát hiện và khắc phục sự cố nhanh chóng mà còn cung cấp thông tin quý giá để tối ưu hóa hiệu suất và độ tin cậy của hệ thống.

Các nguyên tắc quan trọng cần nhớ:

  1. Luôn xử lý lỗi một cách cụ thể: Phân loại và xử lý từng loại lỗi riêng biệt thay vì bắt tất cả các lỗi cùng một cách.
  2. Ghi log có cấu trúc và chi tiết: Bao gồm thông tin về thời gian, ngữ cảnh, truy vấn và tham số để dễ dàng phân tích.
  3. Sử dụng các cấp độ log phù hợp: DEBUG, INFO, WARNING, ERROR, CRITICAL cho từng loại thông tin khác nhau.
  4. Áp dụng log rotation: Ngăn chặn các file log quá lớn và khó quản lý.
  5. Tích hợp với hệ thống giám sát: Gửi thông báo khi có lỗi nghiêm trọng để xử lý kịp thời.
  6. Theo dõi hiệu suất truy vấn: Phát hiện và tối ưu các truy vấn chậm.

Với các kỹ thuật và công cụ được trình bày trong bài viết này, bạn có thể xây dựng một hệ thống logging và xử lý lỗi toàn diện, giúp ứng dụng của bạn trở nên ổn định, dễ bảo trì và hiệu quả hơn khi làm việc với SQL Server từ Python.

Lập Trình Bot Giao Dịch Tự Động

· 4 min read

Trong bài viết này, chúng ta sẽ tìm hiểu cách xây dựng và triển khai bot giao dịch tự động sử dụng Python.

Cấu trúc cơ bản của bot giao dịch

1. Khởi tạo bot

class TradingBot:
def __init__(self, api_key, api_secret, symbol):
self.api_key = api_key
self.api_secret = api_secret
self.symbol = symbol
self.position = None
self.orders = []

def connect(self):
# Kết nối với sàn giao dịch
self.exchange = ccxt.binance({
'apiKey': self.api_key,
'secret': self.api_secret
})

2. Quản lý kết nối

    def check_connection(self):
try:
self.exchange.fetch_balance()
return True
except Exception as e:
log_error(f"Lỗi kết nối: {e}")
return False

def reconnect(self):
max_attempts = 3
for attempt in range(max_attempts):
if self.check_connection():
return True
time.sleep(5)
return False

Xử lý dữ liệu thị trường

1. Lấy dữ liệu realtime

    def fetch_market_data(self, timeframe='1m', limit=100):
try:
ohlcv = self.exchange.fetch_ohlcv(
self.symbol,
timeframe=timeframe,
limit=limit
)
df = pd.DataFrame(
ohlcv,
columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']
)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
return df
except Exception as e:
log_error(f"Lỗi lấy dữ liệu: {e}")
return None

2. Xử lý tín hiệu

    def process_signals(self, data):
# Áp dụng chiến lược giao dịch
data = self.strategy.generate_signals(data)

# Lọc tín hiệu mới nhất
latest_signal = data['Signal'].iloc[-1]

return latest_signal

Quản lý giao dịch

1. Đặt lệnh

    def place_order(self, side, amount, price=None):
try:
if price:
order = self.exchange.create_limit_order(
self.symbol,
side,
amount,
price
)
else:
order = self.exchange.create_market_order(
self.symbol,
side,
amount
)

self.orders.append(order)
return order
except Exception as e:
log_error(f"Lỗi đặt lệnh: {e}")
return None

2. Quản lý vị thế

    def manage_position(self, signal):
if signal == 1 and not self.position: # Tín hiệu mua
amount = self.calculate_position_size()
self.place_order('buy', amount)
self.position = 'long'

elif signal == -1 and self.position == 'long': # Tín hiệu bán
amount = self.get_position_size()
self.place_order('sell', amount)
self.position = None

Quản lý rủi ro

1. Kiểm tra điều kiện thị trường

    def check_market_conditions(self):
# Kiểm tra biến động giá
volatility = self.calculate_volatility()
if volatility > self.max_volatility:
return False

# Kiểm tra thanh khoản
liquidity = self.check_liquidity()
if liquidity < self.min_liquidity:
return False

return True

2. Quản lý vốn

    def manage_capital(self):
# Tính toán vốn có sẵn
balance = self.exchange.fetch_balance()
available = balance['free'][self.base_currency]

# Kiểm tra giới hạn rủi ro
if available < self.min_capital:
return False

return True

Giám sát và báo cáo

1. Theo dõi hiệu suất

    def monitor_performance(self):
# Tính toán các chỉ số
returns = self.calculate_returns()
sharpe = self.calculate_sharpe_ratio()
drawdown = self.calculate_drawdown()

# Lưu kết quả
self.performance_metrics = {
'returns': returns,
'sharpe_ratio': sharpe,
'max_drawdown': drawdown
}

2. Gửi báo cáo

    def send_report(self):
# Tạo báo cáo
report = {
'timestamp': datetime.now(),
'performance': self.performance_metrics,
'positions': self.get_positions(),
'orders': self.get_recent_orders()
}

# Gửi qua email hoặc API
self.notification_service.send(report)

Triển khai và vận hành

1. Chạy bot

    def run(self):
while True:
try:
# Kiểm tra kết nối
if not self.check_connection():
self.reconnect()
continue

# Lấy và xử lý dữ liệu
data = self.fetch_market_data()
signal = self.process_signals(data)

# Kiểm tra điều kiện thị trường
if self.check_market_conditions():
# Quản lý vị thế
self.manage_position(signal)

# Giám sát hiệu suất
self.monitor_performance()

# Gửi báo cáo định kỳ
if self.should_send_report():
self.send_report()

time.sleep(self.interval)

except Exception as e:
log_error(f"Lỗi trong vòng lặp chính: {e}")
time.sleep(60)

Best Practices

  1. Luôn có cơ chế dừng khẩn cấp
  2. Kiểm tra kỹ lưỡng trước khi triển khai
  3. Giám sát liên tục
  4. Sao lưu dữ liệu thường xuyên
  5. Cập nhật và bảo trì định kỳ

Kết luận

Lập trình bot giao dịch tự động đòi hỏi sự kết hợp giữa kiến thức về thị trường, kỹ năng lập trình và quản lý rủi ro. Trong bài viết tiếp theo, chúng ta sẽ tìm hiểu về cách tối ưu hóa hiệu suất của bot giao dịch.

Flutter Firebase Authentication

· 21 min read

Flutter Firebase Authentication

Hướng dẫn tích hợp Firebase Authentication vào ứng dụng Flutter để quản lý người dùng

Flutter Firebase Authentication

Firebase Authentication là một giải pháp xác thực người dùng mạnh mẽ và an toàn được phát triển bởi Google. Nó cung cấp các phương thức xác thực phổ biến như email/mật khẩu, xác thực qua mạng xã hội (Google, Facebook, Twitter,...), xác thực qua số điện thoại và nhiều phương thức khác. Trong bài viết này, chúng ta sẽ tìm hiểu cách tích hợp Firebase Authentication vào ứng dụng Flutter để xây dựng hệ thống quản lý người dùng hoàn chỉnh.

Mục lục

  1. Thiết lập dự án Firebase
  2. Cài đặt các gói phụ thuộc cần thiết
  3. Cấu hình Firebase cho ứng dụng Flutter
  4. Xây dựng các màn hình xác thực
  5. Xác thực với Email và Mật khẩu
  6. Xác thực với Google
  7. Xác thực với Facebook
  8. Xác thực với Số điện thoại
  9. Quản lý trạng thái người dùng
  10. Phân quyền và Bảo mật
  11. Các kỹ thuật nâng cao
  12. Xử lý lỗi
  13. Kết luận

1. Thiết lập dự án Firebase

Bước 1: Tạo dự án Firebase

  1. Truy cập Firebase Console
  2. Nhấp vào "Thêm dự án" (hoặc "Add project")
  3. Nhập tên dự án, ví dụ: "Flutter Auth Demo"
  4. Tùy chọn bật Google Analytics cho dự án (khuyến nghị)
  5. Nhấp "Tiếp tục" và làm theo các bước để hoàn thành quá trình tạo dự án

Bước 2: Bật Firebase Authentication

  1. Trong Firebase Console, chọn dự án vừa tạo
  2. Từ menu bên trái, chọn "Authentication"
  3. Nhấp vào tab "Sign-in method"
  4. Bật các phương thức xác thực mà bạn muốn sử dụng (ví dụ: Email/Password, Google, Facebook, Phone)
  5. Cấu hình thêm các thông tin cần thiết cho từng phương thức

Flutter Firebase Authentication

2. Cài đặt các gói phụ thuộc cần thiết

Mở file pubspec.yaml của dự án Flutter và thêm các gói sau:

dependencies:
flutter:
sdk: flutter
firebase_core: ^2.13.0
firebase_auth: ^4.6.1
google_sign_in: ^6.1.0
flutter_facebook_auth: ^5.0.11
provider: ^6.0.5

Chạy lệnh sau để cài đặt các gói:

flutter pub get

3. Cấu hình Firebase cho ứng dụng Flutter

Cấu hình cho Android

Bước 1: Đăng ký ứng dụng Android

  1. Trong Firebase Console, chọn dự án của bạn
  2. Nhấp vào biểu tượng Android để thêm ứng dụng Android
  3. Nhập ID gói của ứng dụng Android (ví dụ: com.example.flutter_auth_demo)
  4. (Tùy chọn) Nhập nickname cho ứng dụng
  5. Đăng ký ứng dụng

Bước 2: Tải xuống file cấu hình

  1. Tải xuống file google-services.json
  2. Đặt file này vào thư mục android/app của dự án Flutter

Bước 3: Cập nhật Gradle

Mở file android/build.gradle và thêm:

buildscript {
dependencies {
// ... other dependencies
classpath 'com.google.gms:google-services:4.3.15'
}
}

Mở file android/app/build.gradle và thêm:

// Bottom of the file
apply plugin: 'com.google.gms.google-services'

Cấu hình cho iOS

Bước 1: Đăng ký ứng dụng iOS

  1. Trong Firebase Console, chọn dự án của bạn
  2. Nhấp vào biểu tượng iOS để thêm ứng dụng iOS
  3. Nhập ID gói của ứng dụng iOS (ví dụ: com.example.flutterAuthDemo)
  4. (Tùy chọn) Nhập nickname cho ứng dụng
  5. Đăng ký ứng dụng

Bước 2: Tải xuống file cấu hình

  1. Tải xuống file GoogleService-Info.plist
  2. Sử dụng Xcode, thêm file này vào thư mục Runner của dự án (đảm bảo chọn "Copy items if needed")

Bước 3: Cập nhật Podfile

Mở file ios/Podfile và thêm:

platform :ios, '12.0'

4. Xây dựng các màn hình xác thực

Chúng ta sẽ xây dựng các màn hình xác thực cơ bản cho ứng dụng:

Màn hình đăng nhập (login_screen.dart)

import 'package:flutter/material.dart';
import 'package:firebase_auth/firebase_auth.dart';
import '../services/auth_service.dart';

class LoginScreen extends StatefulWidget {

_LoginScreenState createState() => _LoginScreenState();
}

class _LoginScreenState extends State<LoginScreen> {
final AuthService _authService = AuthService();
final _formKey = GlobalKey<FormState>();

String _email = '';
String _password = '';
String _error = '';
bool _isLoading = false;

void _signInWithEmailAndPassword() async {
if (_formKey.currentState!.validate()) {
setState(() {
_isLoading = true;
_error = '';
});

try {
await _authService.signInWithEmailAndPassword(_email, _password);
// Đăng nhập thành công, NavigationService sẽ điều hướng người dùng
} catch (e) {
setState(() {
_error = _handleFirebaseAuthError(e);
_isLoading = false;
});
}
}
}

String _handleFirebaseAuthError(dynamic e) {
if (e is FirebaseAuthException) {
switch (e.code) {
case 'user-not-found':
return 'Không tìm thấy tài khoản với email này.';
case 'wrong-password':
return 'Sai mật khẩu.';
case 'invalid-email':
return 'Email không hợp lệ.';
case 'user-disabled':
return 'Tài khoản đã bị vô hiệu hóa.';
default:
return 'Đăng nhập thất bại: ${e.message}';
}
}
return 'Đã xảy ra lỗi không xác định.';
}


Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Đăng nhập'),
),
body: Center(
child: SingleChildScrollView(
padding: EdgeInsets.all(16.0),
child: Form(
key: _formKey,
child: Column(
crossAxisAlignment: CrossAxisAlignment.stretch,
children: [
Image.asset(
'assets/images/logo.png',
height: 100,
),
SizedBox(height: 48.0),
TextFormField(
decoration: InputDecoration(
labelText: 'Email',
border: OutlineInputBorder(),
prefixIcon: Icon(Icons.email),
),
keyboardType: TextInputType.emailAddress,
validator: (value) {
if (value == null || value.isEmpty) {
return 'Vui lòng nhập email';
}
return null;
},
onChanged: (value) {
_email = value.trim();
},
),
SizedBox(height: 16.0),
TextFormField(
decoration: InputDecoration(
labelText: 'Mật khẩu',
border: OutlineInputBorder(),
prefixIcon: Icon(Icons.lock),
),
obscureText: true,
validator: (value) {
if (value == null || value.isEmpty) {
return 'Vui lòng nhập mật khẩu';
}
return null;
},
onChanged: (value) {
_password = value;
},
),
SizedBox(height: 24.0),
if (_error.isNotEmpty)
Padding(
padding: EdgeInsets.only(bottom: 16.0),
child: Text(
_error,
style: TextStyle(color: Colors.red),
textAlign: TextAlign.center,
),
),
ElevatedButton(
onPressed: _isLoading ? null : _signInWithEmailAndPassword,
child: _isLoading
? CircularProgressIndicator(color: Colors.white)
: Text('ĐĂNG NHẬP'),
style: ElevatedButton.styleFrom(
padding: EdgeInsets.symmetric(vertical: 16.0),
),
),
SizedBox(height: 16.0),
OutlinedButton.icon(
icon: Image.asset(
'assets/images/google_logo.png',
height: 24.0,
),
label: Text('Đăng nhập với Google'),
onPressed: _isLoading
? null
: () async {
setState(() {
_isLoading = true;
_error = '';
});
try {
await _authService.signInWithGoogle();
// Đăng nhập thành công
} catch (e) {
setState(() {
_error = 'Đăng nhập Google thất bại: $e';
_isLoading = false;
});
}
},
style: OutlinedButton.styleFrom(
padding: EdgeInsets.symmetric(vertical: 12.0),
),
),
SizedBox(height: 16.0),
TextButton(
child: Text('Chưa có tài khoản? Đăng ký ngay'),
onPressed: () {
Navigator.pushNamed(context, '/register');
},
),
],
),
),
),
),
);
}
}

Màn hình đăng ký (register_screen.dart)

import 'package:flutter/material.dart';
import 'package:firebase_auth/firebase_auth.dart';
import '../services/auth_service.dart';

class RegisterScreen extends StatefulWidget {

_RegisterScreenState createState() => _RegisterScreenState();
}

class _RegisterScreenState extends State<RegisterScreen> {
final AuthService _authService = AuthService();
final _formKey = GlobalKey<FormState>();

String _email = '';
String _password = '';
String _confirmPassword = '';
String _error = '';
bool _isLoading = false;

void _registerWithEmailAndPassword() async {
if (_formKey.currentState!.validate()) {
setState(() {
_isLoading = true;
_error = '';
});

try {
await _authService.registerWithEmailAndPassword(_email, _password);
// Đăng ký thành công, NavigationService sẽ điều hướng người dùng
} catch (e) {
setState(() {
_error = _handleFirebaseAuthError(e);
_isLoading = false;
});
}
}
}

String _handleFirebaseAuthError(dynamic e) {
if (e is FirebaseAuthException) {
switch (e.code) {
case 'email-already-in-use':
return 'Email này đã được sử dụng.';
case 'invalid-email':
return 'Email không hợp lệ.';
case 'operation-not-allowed':
return 'Đăng ký với email và mật khẩu chưa được bật.';
case 'weak-password':
return 'Mật khẩu quá yếu.';
default:
return 'Đăng ký thất bại: ${e.message}';
}
}
return 'Đã xảy ra lỗi không xác định.';
}


Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Đăng ký'),
),
body: Center(
child: SingleChildScrollView(
padding: EdgeInsets.all(16.0),
child: Form(
key: _formKey,
child: Column(
crossAxisAlignment: CrossAxisAlignment.stretch,
children: [
Image.asset(
'assets/images/logo.png',
height: 100,
),
SizedBox(height: 48.0),
TextFormField(
decoration: InputDecoration(
labelText: 'Email',
border: OutlineInputBorder(),
prefixIcon: Icon(Icons.email),
),
keyboardType: TextInputType.emailAddress,
validator: (value) {
if (value == null || value.isEmpty) {
return 'Vui lòng nhập email';
}
return null;
},
onChanged: (value) {
_email = value.trim();
},
),
SizedBox(height: 16.0),
TextFormField(
decoration: InputDecoration(
labelText: 'Mật khẩu',
border: OutlineInputBorder(),
prefixIcon: Icon(Icons.lock),
),
obscureText: true,
validator: (value) {
if (value == null || value.isEmpty) {
return 'Vui lòng nhập mật khẩu';
}
if (value.length < 6) {
return 'Mật khẩu phải có ít nhất 6 ký tự';
}
return null;
},
onChanged: (value) {
_password = value;
},
),
SizedBox(height: 16.0),
TextFormField(
decoration: InputDecoration(
labelText: 'Xác nhận mật khẩu',
border: OutlineInputBorder(),
prefixIcon: Icon(Icons.lock_outline),
),
obscureText: true,
validator: (value) {
if (value == null || value.isEmpty) {
return 'Vui lòng xác nhận mật khẩu';
}
if (value != _password) {
return 'Mật khẩu không khớp';
}
return null;
},
onChanged: (value) {
_confirmPassword = value;
},
),
SizedBox(height: 24.0),
if (_error.isNotEmpty)
Padding(
padding: EdgeInsets.only(bottom: 16.0),
child: Text(
_error,
style: TextStyle(color: Colors.red),
textAlign: TextAlign.center,
),
),
ElevatedButton(
onPressed: _isLoading ? null : _registerWithEmailAndPassword,
child: _isLoading
? CircularProgressIndicator(color: Colors.white)
: Text('ĐĂNG KÝ'),
style: ElevatedButton.styleFrom(
padding: EdgeInsets.symmetric(vertical: 16.0),
),
),
SizedBox(height: 16.0),
TextButton(
child: Text('Đã có tài khoản? Đăng nhập ngay'),
onPressed: () {
Navigator.pop(context);
},
),
],
),
),
),
),
);
}
}

5. Xác thực với Email và Mật khẩu

Tạo một service class để xử lý xác thực (lib/services/auth_service.dart):

import 'package:firebase_auth/firebase_auth.dart';

class AuthService {
final FirebaseAuth _auth = FirebaseAuth.instance;

// Lấy thông tin người dùng hiện tại
User? get currentUser => _auth.currentUser;

// Stream thông tin người dùng (để lắng nghe trạng thái đăng nhập)
Stream<User?> get authStateChanges => _auth.authStateChanges();

// Đăng nhập với email và mật khẩu
Future<UserCredential> signInWithEmailAndPassword(String email, String password) async {
try {
return await _auth.signInWithEmailAndPassword(
email: email,
password: password,
);
} catch (e) {
rethrow;
}
}

// Đăng ký với email và mật khẩu
Future<UserCredential> registerWithEmailAndPassword(String email, String password) async {
try {
return await _auth.createUserWithEmailAndPassword(
email: email,
password: password,
);
} catch (e) {
rethrow;
}
}

// Đăng xuất
Future<void> signOut() async {
await _auth.signOut();
}

// Quên mật khẩu
Future<void> resetPassword(String email) async {
await _auth.sendPasswordResetEmail(email: email);
}
}

6. Xác thực với Google

Bổ sung phương thức đăng nhập với Google vào AuthService:

import 'package:firebase_auth/firebase_auth.dart';
import 'package:google_sign_in/google_sign_in.dart';

class AuthService {
// ... (Các phương thức đã có)

// Đăng nhập với Google
Future<UserCredential> signInWithGoogle() async {
try {
// Bắt đầu quá trình đăng nhập Google
final GoogleSignInAccount? googleUser = await GoogleSignIn().signIn();

// Nếu người dùng hủy đăng nhập
if (googleUser == null) {
throw FirebaseAuthException(
code: 'sign_in_canceled',
message: 'Đăng nhập bị hủy bởi người dùng',
);
}

// Lấy thông tin xác thực từ request
final GoogleSignInAuthentication googleAuth = await googleUser.authentication;

// Tạo credential mới
final credential = GoogleAuthProvider.credential(
accessToken: googleAuth.accessToken,
idToken: googleAuth.idToken,
);

// Đăng nhập vào Firebase với credential
return await FirebaseAuth.instance.signInWithCredential(credential);
} catch (e) {
rethrow;
}
}
}

Cấu hình Google Sign-In cho Android

Thêm vào file android/app/build.gradle:

android {
defaultConfig {
// ...

// Thêm dòng này
multiDexEnabled true
}
}

Cấu hình Google Sign-In cho iOS

Thêm vào file ios/Runner/Info.plist:

<key>CFBundleURLTypes</key>
<array>
<dict>
<key>CFBundleTypeRole</key>
<string>Editor</string>
<key>CFBundleURLSchemes</key>
<array>
<!-- Thay YOUR_REVERSED_CLIENT_ID bằng giá trị từ GoogleService-Info.plist -->
<string>YOUR_REVERSED_CLIENT_ID</string>
</array>
</dict>
</array>

7. Xác thực với Facebook

Bổ sung phương thức đăng nhập với Facebook vào AuthService:

import 'package:firebase_auth/firebase_auth.dart';
import 'package:flutter_facebook_auth/flutter_facebook_auth.dart';

class AuthService {
// ... (Các phương thức đã có)

// Đăng nhập với Facebook
Future<UserCredential> signInWithFacebook() async {
try {
// Bắt đầu quá trình đăng nhập Facebook
final LoginResult result = await FacebookAuth.instance.login();

// Kiểm tra kết quả
if (result.status != LoginStatus.success) {
throw FirebaseAuthException(
code: 'facebook_login_failed',
message: 'Đăng nhập Facebook thất bại: ${result.message}',
);
}

// Tạo credential từ token
final OAuthCredential credential = FacebookAuthProvider.credential(result.accessToken!.token);

// Đăng nhập vào Firebase với credential
return await FirebaseAuth.instance.signInWithCredential(credential);
} catch (e) {
rethrow;
}
}
}

Cấu hình Facebook Login cho Android

Thêm vào file android/app/src/main/res/values/strings.xml (tạo nếu chưa có):

<?xml version="1.0" encoding="utf-8"?>
<resources>
<string name="facebook_app_id">YOUR_FACEBOOK_APP_ID</string>
<string name="fb_login_protocol_scheme">fbYOUR_FACEBOOK_APP_ID</string>
<string name="facebook_client_token">YOUR_FACEBOOK_CLIENT_TOKEN</string>
</resources>

Thêm vào file android/app/src/main/AndroidManifest.xml:

<manifest xmlns:android="http://schemas.android.com/apk/res/android"
package="com.example.your_app">
<!-- ... -->
<application>
<!-- ... -->

<meta-data
android:name="com.facebook.sdk.ApplicationId"
android:value="@string/facebook_app_id"/>
<meta-data
android:name="com.facebook.sdk.ClientToken"
android:value="@string/facebook_client_token"/>

<activity
android:name="com.facebook.FacebookActivity"
android:configChanges="keyboard|keyboardHidden|screenLayout|screenSize|orientation"
android:label="@string/app_name" />
<activity
android:name="com.facebook.CustomTabActivity"
android:exported="true">
<intent-filter>
<action android:name="android.intent.action.VIEW" />
<category android:name="android.intent.category.DEFAULT" />
<category android:name="android.intent.category.BROWSABLE" />
<data android:scheme="@string/fb_login_protocol_scheme" />
</intent-filter>
</activity>

</application>
</manifest>

Cấu hình Facebook Login cho iOS

Thêm vào file ios/Runner/Info.plist:

<key>CFBundleURLTypes</key>
<array>
<!-- ... Các URL types khác -->
<dict>
<key>CFBundleURLSchemes</key>
<array>
<string>fbYOUR_FACEBOOK_APP_ID</string>
</array>
</dict>
</array>
<key>FacebookAppID</key>
<string>YOUR_FACEBOOK_APP_ID</string>
<key>FacebookClientToken</key>
<string>YOUR_FACEBOOK_CLIENT_TOKEN</string>
<key>FacebookDisplayName</key>
<string>YOUR_APP_NAME</string>
<key>LSApplicationQueriesSchemes</key>
<array>
<string>fbapi</string>
<string>fb-messenger-share-api</string>
<string>fbauth2</string>
<string>fbshareextension</string>
</array>

8. Xác thực với Số điện thoại

Bổ sung phương thức đăng nhập với số điện thoại vào AuthService:

class AuthService {
// ... (Các phương thức đã có)

// Xác thực số điện thoại - Bước 1: Gửi OTP
Future<void> verifyPhoneNumber({
required String phoneNumber,
required Function(PhoneAuthCredential) verificationCompleted,
required Function(FirebaseAuthException) verificationFailed,
required Function(String, int?) codeSent,
required Function(String) codeAutoRetrievalTimeout,
}) async {
await _auth.verifyPhoneNumber(
phoneNumber: phoneNumber,
verificationCompleted: verificationCompleted,
verificationFailed: verificationFailed,
codeSent: codeSent,
codeAutoRetrievalTimeout: codeAutoRetrievalTimeout,
timeout: Duration(seconds: 60),
);
}

// Xác thực số điện thoại - Bước 2: Xác nhận OTP
Future<UserCredential> signInWithPhoneCredential(PhoneAuthCredential credential) async {
try {
return await _auth.signInWithCredential(credential);
} catch (e) {
rethrow;
}
}
}

Tạo màn hình xác thực số điện thoại (phone_auth_screen.dart):

import 'package:flutter/material.dart';
import 'package:firebase_auth/firebase_auth.dart';
import '../services/auth_service.dart';

class PhoneAuthScreen extends StatefulWidget {

_PhoneAuthScreenState createState() => _PhoneAuthScreenState();
}

class _PhoneAuthScreenState extends State<PhoneAuthScreen> {
final AuthService _authService = AuthService();
final _formKey = GlobalKey<FormState>();

String _phoneNumber = '';
String _smsCode = '';
String _verificationId = '';
String _error = '';
bool _isLoading = false;
bool _codeSent = false;

void _verifyPhoneNumber() async {
if (_formKey.currentState!.validate()) {
setState(() {
_isLoading = true;
_error = '';
});

try {
await _authService.verifyPhoneNumber(
phoneNumber: _phoneNumber,
verificationCompleted: (PhoneAuthCredential credential) async {
// Tự động xác thực (Android)
try {
await _authService.signInWithPhoneCredential(credential);
// Đăng nhập thành công
} catch (e) {
setState(() {
_error = 'Xác thực thất bại: $e';
_isLoading = false;
});
}
},
verificationFailed: (FirebaseAuthException e) {
setState(() {
_error = 'Xác thực thất bại: ${e.message}';
_isLoading = false;
});
},
codeSent: (String verificationId, int? resendToken) {
setState(() {
_verificationId = verificationId;
_codeSent = true;
_isLoading = false;
});
},
codeAutoRetrievalTimeout: (String verificationId) {
_verificationId = verificationId;
},
);
} catch (e) {
setState(() {
_error = 'Lỗi gửi mã: $e';
_isLoading = false;
});
}
}
}

void _signInWithSmsCode() async {
if (_formKey.currentState!.validate()) {
setState(() {
_isLoading = true;
_error = '';
});

try {
PhoneAuthCredential credential = PhoneAuthProvider.credential(
verificationId: _verificationId,
smsCode: _smsCode,
);

await _authService.signInWithPhoneCredential(credential);
// Đăng nhập thành công
} catch (e) {
setState(() {
_error = 'Xác thực thất bại: $e';
_isLoading = false;
});
}
}
}


Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Đăng nhập với Số điện thoại'),
),
body: Center(
child: SingleChildScrollView(
padding: EdgeInsets.all(16.0),
child: Form(
key: _formKey,
child: Column(
crossAxisAlignment: CrossAxisAlignment.stretch,
children: [
Image.asset(
'assets/images/logo.png',
height: 100,
),
SizedBox(height: 48.0),
if (!_codeSent) ...[
TextFormField(
decoration: InputDecoration(
labelText: 'Số điện thoại (định dạng: +84...)',
border: OutlineInputBorder(),
prefixIcon: Icon(Icons.phone),
),
keyboardType: TextInputType.phone,
validator: (value) {
if (value == null || value.isEmpty) {
return 'Vui lòng nhập số điện thoại';
}
return null;
},
onChanged: (value) {
_phoneNumber = value.trim();
},
),
] else ...[
TextFormField(
decoration: InputDecoration(
labelText: 'Mã xác thực (OTP)',
border: OutlineInputBorder(),
prefixIcon: Icon(Icons.sms),
),
keyboardType: TextInputType.number,
validator: (value) {
if (value == null || value.isEmpty) {
return 'Vui lòng nhập mã xác thực';
}
return null;
},
onChanged: (value) {
_smsCode = value.trim();
},
),
],
SizedBox(height: 24.0),
if (_error.isNotEmpty)
Padding(
padding: EdgeInsets.only(bottom: 16.0),
child: Text(
_error,
style: TextStyle(color: Colors.red),
textAlign: TextAlign.center,
),
),
ElevatedButton(
onPressed: _isLoading
? null
: (_codeSent ? _signInWithSmsCode : _verifyPhoneNumber),
child: _isLoading
? CircularProgressIndicator(color: Colors.white)
: Text(_codeSent ? 'XÁC THỰC' : 'GỬI MÃ XÁC THỰC'),
style: ElevatedButton.styleFrom(
padding: EdgeInsets.symmetric(vertical: 16.0),
),
),
if (_codeSent) ...[
SizedBox(height: 16.0),
TextButton(
child: Text('Gửi lại mã'),
onPressed: _isLoading ? null : _verifyPhoneNumber,
),
],
],
),
),
),
),
);
}
}

9. Quản lý trạng thái người dùng

Sử dụng Provider để quản lý trạng thái người dùng trong ứng dụng (lib/providers/auth_provider.dart):

import 'package:flutter/material.dart';
import 'package:firebase_auth/firebase_auth.dart';
import '../services/auth_service.dart';

class AuthProvider with ChangeNotifier {
final AuthService _authService = AuthService();
User? _user;
bool _isLoading = false;

AuthProvider() {
_init();
}

void _init() {
_authService.authStateChanges.listen((User? user) {
_user = user;
notifyListeners();
});
}

User? get user => _user;
bool get isAuthenticated => _user != null;
bool get isLoading => _isLoading;

void setLoading(bool value) {
_isLoading = value;
notifyListeners();
}

Future<void> signInWithEmailAndPassword(String email, String password) async {
try {
setLoading(true);
await _authService.signInWithEmailAndPassword(email, password);
} finally {
setLoading(false);
}
}

Future<void> registerWithEmailAndPassword(String email, String password) async {
try {
setLoading(true);
await _authService.registerWithEmailAndPassword(email, password);
} finally {
setLoading(false);
}
}

Future<void> signInWithGoogle() async {
try {
setLoading(true);
await _authService.signInWithGoogle();
} finally {
setLoading(false);
}
}

Future<void> signInWithFacebook() async {
try {
setLoading(true);
await _authService.signInWithFacebook();
} finally {
setLoading(false);
}
}

Future<void> signOut() async {
try {
setLoading(true);
await _authService.signOut();
} finally {
setLoading(false);
}
}
}

Cập nhật main.dart để sử dụng Provider:

import 'package:flutter/material.dart';
import 'package:firebase_core/firebase_core.dart';
import 'package:provider/provider.dart';
import 'providers/auth_provider.dart';
import 'screens/login_screen.dart';
import 'screens/register_screen.dart';
import 'screens/home_screen.dart';
import 'screens/phone_auth_screen.dart';

void main() async {
WidgetsFlutterBinding.ensureInitialized();
await Firebase.initializeApp();
runApp(MyApp());
}

class MyApp extends StatelessWidget {

Widget build(BuildContext context) {
return MultiProvider(
providers: [
ChangeNotifierProvider(create: (_) => AuthProvider()),
],
child: Consumer<AuthProvider>(
builder: (context, authProvider, _) {
return MaterialApp(
title: 'Flutter Firebase Auth',
theme: ThemeData(
primarySwatch: Colors.blue,
visualDensity: VisualDensity.adaptivePlatformDensity,
),
initialRoute: authProvider.isAuthenticated ? '/home' : '/login',
routes: {
'/login': (context) => LoginScreen(),
'/register': (context) => RegisterScreen(),
'/home': (context) => HomeScreen(),
'/phone': (context) => PhoneAuthScreen(),
},
);
},
),
);
}
}

10. Phân quyền và Bảo mật

Để bảo vệ các tuyến đường chỉ cho người dùng đã xác thực, tạo một auth_guard.dart:

import 'package:flutter/material.dart';
import 'package:provider/provider.dart';
import '../providers/auth_provider.dart';

class AuthGuard extends StatelessWidget {
final Widget child;
final String redirectRoute;

const AuthGuard({
Key? key,
required this.child,
this.redirectRoute = '/login',
}) : super(key: key);


Widget build(BuildContext context) {
return Consumer<AuthProvider>(
builder: (context, authProvider, _) {
if (authProvider.isAuthenticated) {
return child;
} else {
// Điều hướng đến trang đăng nhập sau khi render
WidgetsBinding.instance.addPostFrameCallback((_) {
Navigator.of(context).pushReplacementNamed(redirectRoute);
});

// Trả về một widget đơn giản trong khi chờ điều hướng
return Scaffold(
body: Center(
child: CircularProgressIndicator(),
),
);
}
},
);
}
}

Sử dụng trong tuyến đường cần bảo vệ:

// Trong home_screen.dart

Widget build(BuildContext context) {
return AuthGuard(
child: Scaffold(
appBar: AppBar(
title: Text('Trang chủ'),
actions: [
IconButton(
icon: Icon(Icons.exit_to_app),
onPressed: () {
Provider.of<AuthProvider>(context, listen: false).signOut();
},
),
],
),
body: Center(
child: Text('Đã đăng nhập thành công!'),
),
),
);
}

11. Các kỹ thuật nâng cao

11.1. Quản lý thông tin người dùng với Firestore

Tạo một service để quản lý thông tin người dùng trong Firestore (user_service.dart):

import 'package:cloud_firestore/cloud_firestore.dart';
import 'package:firebase_auth/firebase_auth.dart';

class UserService {
final FirebaseFirestore _firestore = FirebaseFirestore.instance;
final FirebaseAuth _auth = FirebaseAuth.instance;

Future<void> createUserProfile(User user, {Map<String, dynamic>? additionalData}) async {
final Map<String, dynamic> userData = {
'email': user.email,
'displayName': user.displayName,
'photoURL': user.photoURL,
'phoneNumber': user.phoneNumber,
'lastLogin': FieldValue.serverTimestamp(),
'createdAt': FieldValue.serverTimestamp(),
...?additionalData,
};

await _firestore.collection('users').doc(user.uid).set(userData, SetOptions(merge: true));
}

Future<DocumentSnapshot> getUserProfile(String userId) async {
return await _firestore.collection('users').doc(userId).get();
}

Future<void> updateUserProfile(String userId, Map<String, dynamic> data) async {
await _firestore.collection('users').doc(userId).update(data);
}

Stream<DocumentSnapshot> getUserProfileStream(String userId) {
return _firestore.collection('users').doc(userId).snapshots();
}

Future<void> updateCurrentUserProfile(Map<String, dynamic> data) async {
final user = _auth.currentUser;
if (user != null) {
await updateUserProfile(user.uid, data);
} else {
throw Exception('Không có người dùng đang đăng nhập');
}
}
}

11.2. Xử lý phiên đăng nhập

Bổ sung chức năng xác thực lại sau một khoảng thời gian nhất định:

class AuthService {
// ... (Các phương thức đã có)

// Xác thực lại người dùng hiện tại
Future<void> reauthenticateUser(String password) async {
try {
User? user = _auth.currentUser;
if (user == null || user.email == null) {
throw Exception('Không thể xác thực lại. Không có thông tin người dùng.');
}

AuthCredential credential = EmailAuthProvider.credential(
email: user.email!,
password: password,
);

await user.reauthenticateWithCredential(credential);
} catch (e) {
rethrow;
}
}

// Thay đổi mật khẩu (yêu cầu xác thực lại trước)
Future<void> changePassword(String newPassword) async {
try {
User? user = _auth.currentUser;
if (user == null) {
throw Exception('Không có người dùng đang đăng nhập');
}

await user.updatePassword(newPassword);
} catch (e) {
rethrow;
}
}

// Kiểm tra thời gian đăng nhập gần nhất
Future<bool> isSessionValid(int maxAgeMinutes) async {
try {
User? user = _auth.currentUser;
if (user == null) {
return false;
}

// Làm mới token người dùng để cập nhật metadata
await user.getIdToken(true);

// Lấy metadata
final metadata = await user.metadata;
final lastSignInTime = metadata.lastSignInTime;

if (lastSignInTime == null) {
return false;
}

final now = DateTime.now();
final diff = now.difference(lastSignInTime).inMinutes;

return diff <= maxAgeMinutes;
} catch (e) {
return false;
}
}
}

11.3. Xác thực hai yếu tố (2FA)

class AuthService {
// ... (Các phương thức đã có)

// Bật xác thực hai yếu tố
Future<String> enableTwoFactorAuth() async {
try {
User? user = _auth.currentUser;
if (user == null) {
throw Exception('Không có người dùng đang đăng nhập');
}

// Tạo URI để cấu hình ứng dụng xác thực (như Google Authenticator)
final appName = 'YourAppName';
final email = user.email ?? user.uid;
final secret = _generateSecret(); // Implement this method

final uri = 'otpauth://totp/$appName:$email?secret=$secret&issuer=$appName';

// Lưu secret vào Firestore (User document)
await FirebaseFirestore.instance.collection('users').doc(user.uid).update({
'twoFactorEnabled': true,
'twoFactorSecret': secret,
});

return uri;
} catch (e) {
rethrow;
}
}

// Xác minh mã OTP
Future<bool> verifyOtp(String otp) async {
try {
User? user = _auth.currentUser;
if (user == null) {
throw Exception('Không có người dùng đang đăng nhập');
}

// Lấy secret từ Firestore
final doc = await FirebaseFirestore.instance.collection('users').doc(user.uid).get();
final secret = doc.data()?['twoFactorSecret'];

if (secret == null) {
throw Exception('Xác thực hai yếu tố chưa được bật');
}

// Xác minh OTP
return _verifyOtp(secret, otp); // Implement this method
} catch (e) {
rethrow;
}
}
}

12. Xử lý lỗi

Tạo một helper class để xử lý các lỗi liên quan đến Firebase Auth (auth_error_handler.dart):

class AuthErrorHandler {
static String handleAuthError(dynamic error) {
if (error is FirebaseAuthException) {
switch (error.code) {
// Email/Password Auth Errors
case 'email-already-in-use':
return 'Email này đã được sử dụng bởi một tài khoản khác.';
case 'invalid-email':
return 'Email không hợp lệ.';
case 'user-disabled':
return 'Tài khoản này đã bị vô hiệu hóa.';
case 'user-not-found':
return 'Không tìm thấy tài khoản với email này.';
case 'wrong-password':
return 'Sai mật khẩu.';
case 'weak-password':
return 'Mật khẩu quá yếu.';
case 'operation-not-allowed':
return 'Phương thức đăng nhập này không được cho phép.';

// Phone Auth Errors
case 'invalid-phone-number':
return 'Số điện thoại không hợp lệ.';
case 'missing-phone-number':
return 'Vui lòng nhập số điện thoại.';
case 'quota-exceeded':
return 'Đã vượt quá giới hạn SMS, vui lòng thử lại sau.';
case 'session-expired':
return 'Phiên xác thực đã hết hạn, vui lòng gửi lại mã.';
case 'invalid-verification-code':
return 'Mã xác thực không hợp lệ.';

// Google/Facebook Auth Errors
case 'account-exists-with-different-credential':
return 'Đã tồn tại tài khoản với email này nhưng sử dụng phương thức đăng nhập khác.';
case 'invalid-credential':
return 'Thông tin xác thực không hợp lệ.';
case 'operation-not-supported-in-this-environment':
return 'Phương thức xác thực này không được hỗ trợ trên nền tảng hiện tại.';
case 'popup-blocked':
return 'Cửa sổ đăng nhập bị chặn. Vui lòng cho phép cửa sổ bật lên và thử lại.';
case 'popup-closed-by-user':
return 'Cửa sổ đăng nhập đã bị đóng trước khi hoàn tất quá trình xác thực.';

// General Auth Errors
case 'network-request-failed':
return 'Lỗi kết nối mạng. Vui lòng kiểm tra kết nối của bạn và thử lại.';
case 'too-many-requests':
return 'Quá nhiều yêu cầu không thành công. Vui lòng thử lại sau.';
case 'internal-error':
return 'Đã xảy ra lỗi nội bộ. Vui lòng thử lại sau.';

default:
return 'Đã xảy ra lỗi: ${error.message}';
}
}

return 'Đã xảy ra lỗi không xác định.';
}
}

13. Kết luận

Flutter Firebase Authentication

Firebase Authentication cung cấp một giải pháp xác thực mạnh mẽ và linh hoạt cho ứng dụng Flutter của bạn. Việc tích hợp Firebase Authentication giúp bạn:

  1. Tiết kiệm thời gian phát triển: Không cần xây dựng hệ thống xác thực từ đầu
  2. Bảo mật cao: Firebase cung cấp các cơ chế bảo mật tiên tiến
  3. Hỗ trợ nhiều phương thức xác thực: Email/mật khẩu, Google, Facebook, số điện thoại,...
  4. Dễ dàng mở rộng: Liên kết với Firebase Firestore để lưu trữ thông tin người dùng
  5. Quản lý người dùng đơn giản: Giao diện Firebase Console thân thiện

Bằng cách tích hợp Firebase Authentication vào ứng dụng Flutter, bạn đã xây dựng một hệ thống xác thực toàn diện, bảo mật, và dễ dàng mở rộng. Hệ thống này có thể đáp ứng nhu cầu của các ứng dụng từ đơn giản đến phức tạp, giúp bạn tập trung vào phát triển các tính năng cốt lõi của ứng dụng.

Hãy luôn cập nhật các gói Firebase mới nhất để đảm bảo ứng dụng của bạn luôn nhận được các cải tiến và bản vá bảo mật mới nhất.

Tài nguyên học tập thêm:

Chúc bạn thành công trong việc xây dựng hệ thống xác thực cho ứng dụng Flutter!