# API Integration và Database Schema

## 🔌 PHẦN 1: API INTEGRATION CHI TIẾT

### 1.1 Kiến Trúc Tổng Quan

plain Copy

```
┌─────────────────────────────────────────────────────────────┐
│                    CLIENT (Next.js)                         │
│         ┌─────────────────┐    ┌─────────────────┐         │
│         │  React Query    │◄──►│  Zustand Store  │         │
│         │  (Caching)      │    │  (State Mgmt)   │         │
│         └────────┬────────┘    └─────────────────┘         │
│                  │                                          │
│         ┌────────▼────────┐                                │
│         │  API Client     │                                │
│         │  (Axios/Fetch)  │                                │
│         └────────┬────────┘                                │
└──────────────────┼──────────────────────────────────────────┘
                   │
┌──────────────────▼──────────────────────────────────────────┐
│                 API ROUTES (Next.js API)                    │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐           │
│  │ /api/stocks │ │/api/realtime│ │ /api/alerts │           │
│  │   (REST)    │ │ (WebSocket) │ │   (REST)    │           │
│  └──────┬──────┘ └──────┬──────┘ └──────┬──────┘           │
└─────────┼───────────────┼───────────────┼───────────────────┘
          │               │               │
    ┌─────┴─────┐   ┌─────┴─────┐   ┌─────┴─────┐
    │ MBS API   │   │ Redis     │   │ PostgreSQL│
    │ (Primary) │   │ (Cache)   │   │ (Storage) │
    └───────────┘   └───────────┘   └───────────┘
```

### 1.2 MBS WebSocket Integration

#### WebSocket Client Service

TypeScript Copy

```typescript
// services/mbs-websocket.ts
import { EventEmitter } from 'events';
import { createHash } from 'crypto';

interface MBSConfig {
  wsUrl: string;
  apiKey?: string;
  reconnectInterval?: number;
  maxReconnectAttempts?: number;
}

interface StockData {
  symbol: string;
  lastPrice: number;
  change: number;
  changePercent: number;
  volume: number;
  value: number;
  bidPrice: number[];
  bidVolume: number[];
  askPrice: number[];
  askVolume: number[];
  high: number;
  low: number;
  open: number;
  reference: number;
  timestamp: string;
}

class MBSWebSocketClient extends EventEmitter {
  private ws: WebSocket | null = null;
  private config: MBSConfig;
  private reconnectAttempts = 0;
  private subscribedSymbols: Set<string> = new Set();
  private isConnected = false;
  private heartbeatInterval: NodeJS.Timeout | null = null;

  constructor(config: MBSConfig) {
    super();
    this.config = {
      reconnectInterval: 5000,
      maxReconnectAttempts: 10,
      ...config
    };
  }

  connect(): void {
    try {
      // MBS WebSocket URL structure (giả định dựa trên phân tích s24.mbs.com.vn)
      const wsUrl = `${this.config.wsUrl}?token=${this.generateToken()}`;
      
      this.ws = new WebSocket(wsUrl);
      
      this.ws.onopen = this.handleOpen.bind(this);
      this.ws.onmessage = this.handleMessage.bind(this);
      this.ws.onclose = this.handleClose.bind(this);
      this.ws.onerror = this.handleError.bind(this);
      
    } catch (error) {
      console.error('WebSocket connection error:', error);
      this.scheduleReconnect();
    }
  }

  private generateToken(): string {
    // Giả lập token generation - cần reverse engineering từ MBS web app
    const timestamp = Date.now();
    const secret = this.config.apiKey || 'mbs-public-key';
    return createHash('sha256')
      .update(`${secret}${timestamp}`)
      .digest('hex')
      .substring(0, 32);
  }

  private handleOpen(): void {
    console.log('MBS WebSocket connected');
    this.isConnected = true;
    this.reconnectAttempts = 0;
    
    // Resubscribe to previous symbols
    this.subscribedSymbols.forEach(symbol => this.subscribe(symbol));
    
    // Start heartbeat
    this.startHeartbeat();
    
    this.emit('connected');
  }

  private handleMessage(event: MessageEvent): void {
    try {
      const data = JSON.parse(event.data);
      
      // Xử lý các loại message khác nhau từ MBS
      switch (data.type) {
        case 'stock_update':
          this.handleStockUpdate(data.payload);
          break;
        case 'market_status':
          this.emit('marketStatus', data.payload);
          break;
        case 'trade_notification':
          this.emit('trade', data.payload);
          break;
        case 'heartbeat':
          // Reset heartbeat timeout
          break;
        default:
          console.log('Unknown message type:', data.type);
      }
    } catch (error) {
      console.error('Error parsing message:', error);
    }
  }

  private handleStockUpdate(payload: any): void {
    const stockData: StockData = {
      symbol: payload.sym,
      lastPrice: parseFloat(payload.last),
      change: parseFloat(payload.change),
      changePercent: parseFloat(payload.changePct),
      volume: parseInt(payload.vol),
      value: parseInt(payload.val),
      bidPrice: payload.bidP || [],
      bidVolume: payload.bidV || [],
      askPrice: payload.askP || [],
      askVolume: payload.askV || [],
      high: parseFloat(payload.high),
      low: parseFloat(payload.low),
      open: parseFloat(payload.open),
      reference: parseFloat(payload.ref),
      timestamp: payload.time
    };

    // Cache to Redis
    this.cacheToRedis(stockData);
    
    // Emit to subscribers
    this.emit('stockUpdate', stockData);
  }

  private async cacheToRedis(data: StockData): Promise<void> {
    // Sử dụng Redis để cache real-time data
    const redis = await getRedisClient();
    await redis.setEx(
      `stock:${data.symbol}:latest`,
      60, // 60 seconds TTL
      JSON.stringify(data)
    );
    
    // Publish to Redis channel cho các service khác
    await redis.publish('stock:updates', JSON.stringify(data));
  }

  private handleClose(event: CloseEvent): void {
    console.log('WebSocket closed:', event.code, event.reason);
    this.isConnected = false;
    this.stopHeartbeat();
    
    if (event.code !== 1000) { // Không phải close chủ động
      this.scheduleReconnect();
    }
  }

  private handleError(error: Event): void {
    console.error('WebSocket error:', error);
    this.emit('error', error);
  }

  private scheduleReconnect(): void {
    if (this.reconnectAttempts >= this.config.maxReconnectAttempts!) {
      console.error('Max reconnect attempts reached');
      this.emit('maxReconnectAttemptsReached');
      return;
    }

    this.reconnectAttempts++;
    console.log(`Reconnecting in ${this.config.reconnectInterval}ms... (Attempt ${this.reconnectAttempts})`);
    
    setTimeout(() => {
      this.connect();
    }, this.config.reconnectInterval);
  }

  subscribe(symbol: string): void {
    if (!this.isConnected || !this.ws) {
      this.subscribedSymbols.add(symbol);
      return;
    }

    const message = {
      action: 'subscribe',
      channel: 'stock',
      symbol: symbol.toUpperCase()
    };

    this.ws.send(JSON.stringify(message));
    this.subscribedSymbols.add(symbol);
    console.log(`Subscribed to ${symbol}`);
  }

  unsubscribe(symbol: string): void {
    if (!this.isConnected || !this.ws) return;

    const message = {
      action: 'unsubscribe',
      channel: 'stock',
      symbol: symbol.toUpperCase()
    };

    this.ws.send(JSON.stringify(message));
    this.subscribedSymbols.delete(symbol);
  }

  subscribeMarketIndices(): void {
    const indices = ['VNINDEX', 'HNXINDEX', 'UPCOMINDEX', 'VN30'];
    indices.forEach(index => {
      const message = {
        action: 'subscribe',
        channel: 'index',
        symbol: index
      };
      this.ws?.send(JSON.stringify(message));
    });
  }

  private startHeartbeat(): void {
    this.heartbeatInterval = setInterval(() => {
      if (this.ws?.readyState === WebSocket.OPEN) {
        this.ws.send(JSON.stringify({ type: 'ping', timestamp: Date.now() }));
      }
    }, 30000); // 30 seconds
  }

  private stopHeartbeat(): void {
    if (this.heartbeatInterval) {
      clearInterval(this.heartbeatInterval);
      this.heartbeatInterval = null;
    }
  }

  disconnect(): void {
    this.stopHeartbeat();
    if (this.ws) {
      this.ws.close(1000, 'Client disconnect');
      this.ws = null;
    }
    this.isConnected = false;
  }
}

// Singleton instance
let mbsClient: MBSWebSocketClient | null = null;

export function getMBSClient(config?: MBSConfig): MBSWebSocketClient {
  if (!mbsClient && config) {
    mbsClient = new MBSWebSocketClient(config);
  }
  return mbsClient!;
}

export default MBSWebSocketClient;
```

#### WebSocket Server (Next.js Route Handler)

TypeScript Copy

```typescript
// app/api/realtime/route.ts
import { NextRequest } from 'next/server';
import { getMBSClient } from '@/services/mbs-websocket';

export const dynamic = 'force-dynamic';

export async function GET(request: NextRequest) {
  const upgrade = request.headers.get('upgrade');
  
  if (upgrade !== 'websocket') {
    return new Response('Expected websocket', { status: 400 });
  }

  // Sử dụng Node.js runtime cho WebSocket
  const { socket, response } = await upgradeWebSocket(request);
  
  const client = getMBSClient({
    wsUrl: process.env.MBS_WS_URL || 'wss://realtime.mbs.com.vn/ws',
    apiKey: process.env.MBS_API_KEY
  });

  // Kết nối tới MBS nếu chưa connect
  if (!client.listenerCount('stockUpdate')) {
    client.connect();
  }

  // Gửi data tới client khi có update từ MBS
  const handleUpdate = (data: any) => {
    socket.send(JSON.stringify(data));
  };

  client.on('stockUpdate', handleUpdate);
  client.on('marketStatus', handleUpdate);

  // Xử lý message từ client (subscribe/unsubscribe)
  socket.on('message', (message: string) => {
    try {
      const { action, symbol } = JSON.parse(message);
      
      if (action === 'subscribe') {
        client.subscribe(symbol);
      } else if (action === 'unsubscribe') {
        client.unsubscribe(symbol);
      }
    } catch (error) {
      console.error('Invalid message from client:', error);
    }
  });

  // Cleanup khi client disconnect
  socket.on('close', () => {
    client.off('stockUpdate', handleUpdate);
    client.off('marketStatus', handleUpdate);
  });

  return response;
}

// Helper function để upgrade HTTP thành WebSocket
async function upgradeWebSocket(request: NextRequest) {
  // Implementation tùy thuộc vào runtime (Node.js/Edge)
  // Sử dụng thư viện như 'ws' cho Node.js runtime
  const { default: WebSocket } = await import('ws');
  
  // ... implementation details
}
```

### 1.3 Fallback APIs (VnStock & SSI)

#### VnStock Integration Service

TypeScript Copy

```typescript
// services/vnstock-api.ts
import axios, { AxiosInstance } from 'axios';

interface HistoricalData {
  date: string;
  open: number;
  high: number;
  low: number;
  close: number;
  volume: number;
  value: number;
}

interface CompanyProfile {
  symbol: string;
  companyName: string;
  industry: string;
  sector: string;
  marketCap: number;
  sharesOutstanding: number;
  eps: number;
  pe: number;
  pb: number;
  roe: number;
  roa: number;
}

class VnStockAPI {
  private client: AxiosInstance;
  private baseURL = 'https://apipubaws.tcbs.com.vn';

  constructor() {
    this.client = axios.create({
      baseURL: this.baseURL,
      timeout: 10000,
      headers: {
        'Accept': 'application/json',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
      }
    });

    // Response interceptor để xử lý rate limiting
    this.client.interceptors.response.use(
      (response) => response,
      async (error) => {
        if (error.response?.status === 429) {
          // Rate limited - đợi 1s rồi retry
          await new Promise(resolve => setTimeout(resolve, 1000));
          return this.client.request(error.config);
        }
        return Promise.reject(error);
      }
    );
  }

  // Lấy dữ liệu lịch sử giá
  async getHistoricalData(
    symbol: string, 
    startDate: string, 
    endDate: string,
    resolution: '1' | '5' | '15' | '30' | '60' | '1D' | '1W' | '1M' = '1D'
  ): Promise<HistoricalData[]> {
    try {
      const response = await this.client.get('/stock-insight/v1/stock/bars-long-term', {
        params: {
          ticker: symbol.toUpperCase(),
          type: resolution,
          from: startDate,
          to: endDate
        }
      });

      return response.data.data.map((item: any) => ({
        date: item.tradingDate,
        open: item.open,
        high: item.high,
        low: item.low,
        close: item.close,
        volume: item.volume,
        value: item.value
      }));
    } catch (error) {
      console.error(`Error fetching historical data for ${symbol}:`, error);
      throw error;
    }
  }

  // Lấy thông tin công ty
  async getCompanyProfile(symbol: string): Promise<CompanyProfile> {
    try {
      const [profileRes, financialRes] = await Promise.all([
        this.client.get('/tcanalysis/v1/ticker/${symbol}/overview'),
        this.client.get('/tcanalysis/v1/finance/${symbol}/financialratio')
      ]);

      const profile = profileRes.data;
      const financial = financialRes.data;

      return {
        symbol: symbol.toUpperCase(),
        companyName: profile.companyName,
        industry: profile.industry,
        sector: profile.sector,
        marketCap: profile.marketCap,
        sharesOutstanding: profile.sharesOutstanding,
        eps: financial.eps,
        pe: financial.pe,
        pb: financial.pb,
        roe: financial.roe,
        roa: financial.roa
      };
    } catch (error) {
      console.error(`Error fetching company profile for ${symbol}:`, error);
      throw error;
    }
  }

  // Lấy danh sách cổ phiếu
  async getStockList(exchange: 'HOSE' | 'HNX' | 'UPCOM' | 'ALL' = 'ALL'): Promise<string[]> {
    try {
      const response = await this.client.get('/stock-insight/v1/stock/stock-recommendation', {
        params: { exchange }
      });
      
      return response.data.data.map((item: any) => item.ticker);
    } catch (error) {
      console.error('Error fetching stock list:', error);
      throw error;
    }
  }

  // Lấy dữ liệu intraday (1 phút) cho ngày hiện tại
  async getIntradayData(symbol: string): Promise<HistoricalData[]> {
    const today = new Date().toISOString().split('T')[0];
    
    try {
      const response = await this.client.get('/stock-insight/v1/stock/bars', {
        params: {
          ticker: symbol.toUpperCase(),
          type: '1',
          from: today,
          to: today
        }
      });

      return response.data.data.map((item: any) => ({
        date: item.tradingDate,
        open: item.open,
        high: item.high,
        low: item.low,
        close: item.close,
        volume: item.volume,
        value: item.value
      }));
    } catch (error) {
      console.error(`Error fetching intraday data for ${symbol}:`, error);
      throw error;
    }
  }
}

export const vnstockAPI = new VnStockAPI();
export default VnStockAPI;
```

#### SSI FastConnect API Integration

TypeScript Copy

```typescript
// services/ssi-api.ts
import axios, { AxiosInstance } from 'axios';
import crypto from 'crypto';

interface SSIToken {
  accessToken: string;
  refreshToken: string;
  expiresIn: number;
  obtainedAt: number;
}

class SSIFastConnectAPI {
  private client: AxiosInstance;
  private consumerID: string;
  private consumerSecret: string;
  private token: SSIToken | null = null;
  private baseURL = 'https://fc-data.ssi.com.vn';

  constructor(consumerID: string, consumerSecret: string) {
    this.consumerID = consumerID;
    this.consumerSecret = consumerSecret;
    
    this.client = axios.create({
      baseURL: this.baseURL,
      timeout: 15000,
      headers: {
        'Content-Type': 'application/json'
      }
    });

    // Request interceptor để tự động thêm token
    this.client.interceptors.request.use(async (config) => {
      if (!this.token || this.isTokenExpired()) {
        await this.authenticate();
      }
      
      config.headers.Authorization = `Bearer ${this.token?.accessToken}`;
      return config;
    });

    // Response interceptor để handle token expiration
    this.client.interceptors.response.use(
      (response) => response,
      async (error) => {
        const originalRequest = error.config;
        
        if (error.response?.status === 401 && !originalRequest._retry) {
          originalRequest._retry = true;
          await this.authenticate();
          originalRequest.headers.Authorization = `Bearer ${this.token?.accessToken}`;
          return this.client(originalRequest);
        }
        
        return Promise.reject(error);
      }
    );
  }

  private async authenticate(): Promise<void> {
    try {
      const response = await axios.post(`${this.baseURL}/api/v2/Market/AccessToken`, {
        consumerID: this.consumerID,
        consumerSecret: this.consumerSecret
      });

      this.token = {
        accessToken: response.data.data.accessToken,
        refreshToken: response.data.data.refreshToken,
        expiresIn: response.data.data.expiresIn,
        obtainedAt: Date.now()
      };
    } catch (error) {
      console.error('SSI Authentication failed:', error);
      throw error;
    }
  }

  private isTokenExpired(): boolean {
    if (!this.token) return true;
    const expiresAt = this.token.obtainedAt + (this.token.expiresIn * 1000);
    return Date.now() >= expiresAt - 60000; // Refresh 1 phút trước khi hết hạn
  }

  // Lấy danh mục chứng khoán
  async getSecurities(exchange: string = 'HOSE'): Promise<any[]> {
    const response = await this.client.get('/api/v2/Market/Securities', {
      params: { exchange }
    });
    return response.data.data;
  }

  // Lấy chỉ số thị trường
  async getIndices(): Promise<any[]> {
    const response = await this.client.get('/api/v2/Market/Indices');
    return response.data.data;
  }

  // Lấy báo giá (quote)
  async getQuotes(symbols: string[]): Promise<any[]> {
    const response = await this.client.get('/api/v2/Market/Quotes', {
      params: {
        symbols: symbols.join(',')
      }
    });
    return response.data.data;
  }

  // Lấy dữ liệu lịch sử
  async getDailyOhlc(
    symbol: string, 
    fromDate: string, 
    toDate: string,
    pageIndex: number = 1,
    pageSize: number = 100
  ): Promise<any> {
    const response = await this.client.get('/api/v2/Market/DailyOhlc', {
      params: {
        symbol,
        fromDate,
        toDate,
        pageIndex,
        pageSize,
        ascending: true
      }
    });
    return response.data;
  }

  // Lấy dữ liệu giao dịch intraday
  async getIntradayOhlc(
    symbol: string,
    fromDate: string,
    toDate: string,
    resolution: '1' | '5' | '15' | '30' | '60' = '1',
    pageIndex: number = 1,
    pageSize: number = 100
  ): Promise<any> {
    const response = await this.client.get('/api/v2/Market/IntradayOhlc', {
      params: {
        symbol,
        fromDate,
        toDate,
        resolution,
        pageIndex,
        pageSize,
        ascending: true
      }
    });
    return response.data;
  }
}

export default SSIFastConnectAPI;
```

### 1.4 API Routes (Next.js App Router)

#### Stock Data API

TypeScript Copy

```typescript
// app/api/stocks/[symbol]/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { vnstockAPI } from '@/services/vnstock-api';
import { getRedisClient } from '@/lib/redis';
import { prisma } from '@/lib/prisma';

export async function GET(
  request: NextRequest,
  { params }: { params: { symbol: string } }
) {
  const { symbol } = params;
  const searchParams = request.nextUrl.searchParams;
  const type = searchParams.get('type') || 'overview'; // overview, historical, intraday, profile

  try {
    const cacheKey = `stock:${symbol.toUpperCase()}:${type}`;
    const redis = await getRedisClient();

    // Try cache first
    const cached = await redis.get(cacheKey);
    if (cached) {
      return NextResponse.json(JSON.parse(cached), {
        headers: {
          'X-Cache': 'HIT',
          'X-Cache-TTL': '60'
        }
      });
    }

    let data;

    switch (type) {
      case 'overview':
        data = await getStockOverview(symbol);
        break;
      case 'historical':
        const from = searchParams.get('from') || getDefaultFromDate();
        const to = searchParams.get('to') || getToday();
        const resolution = searchParams.get('resolution') as any || '1D';
        data = await vnstockAPI.getHistoricalData(symbol, from, to, resolution);
        break;
      case 'intraday':
        data = await vnstockAPI.getIntradayData(symbol);
        break;
      case 'profile':
        data = await vnstockAPI.getCompanyProfile(symbol);
        break;
      default:
        return NextResponse.json({ error: 'Invalid type' }, { status: 400 });
    }

    // Cache the result
    await redis.setEx(cacheKey, 60, JSON.stringify(data));

    return NextResponse.json(data, {
      headers: {
        'X-Cache': 'MISS',
        'X-Cache-TTL': '60'
      }
    });

  } catch (error) {
    console.error(`Error fetching stock data for ${symbol}:`, error);
    return NextResponse.json(
      { error: 'Failed to fetch stock data' },
      { status: 500 }
    );
  }
}

async function getStockOverview(symbol: string) {
  // Kết hợp dữ liệu từ nhiều nguồn
  const [latestPrice, profile, historical] = await Promise.all([
    getLatestPrice(symbol),
    vnstockAPI.getCompanyProfile(symbol).catch(() => null),
    vnstockAPI.getHistoricalData(
      symbol,
      getDefaultFromDate(30),
      getToday(),
      '1D'
    ).catch(() => [])
  ]);

  return {
    symbol: symbol.toUpperCase(),
    price: latestPrice,
    profile,
    historical: historical.slice(-30), // 30 ngày gần nhất
    lastUpdated: new Date().toISOString()
  };
}

async function getLatestPrice(symbol: string) {
  // Ưu tiên lấy từ Redis (real-time từ WebSocket)
  const redis = await getRedisClient();
  const cached = await redis.get(`stock:${symbol.toUpperCase()}:latest`);
  
  if (cached) {
    return JSON.parse(cached);
  }

  // Fallback: query từ database
  const latest = await prisma.stockPrice.findFirst({
    where: { symbol: symbol.toUpperCase() },
    orderBy: { timestamp: 'desc' }
  });

  return latest;
}

function getToday(): string {
  return new Date().toISOString().split('T')[0];
}

function getDefaultFromDate(days: number = 365): string {
  const date = new Date();
  date.setDate(date.getDate() - days);
  return date.toISOString().split('T')[0];
}
```

#### Alert API

TypeScript Copy

```typescript
// app/api/alerts/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { getServerSession } from 'next-auth';
import { prisma } from '@/lib/prisma';
import { z } from 'zod';

const alertSchema = z.object({
  symbol: z.string().min(1).max(10),
  conditionType: z.enum([
    'PRICE_ABOVE', 
    'PRICE_BELOW', 
    'CHANGE_PERCENT_ABOVE',
    'CHANGE_PERCENT_BELOW',
    'VOLUME_ABOVE',
    'BREAKOUT_HIGH',
    'BREAKOUT_LOW'
  ]),
  threshold: z.number(),
  notificationChannels: z.array(z.enum(['web', 'email', 'telegram'])),
  expiryDate: z.string().datetime().optional()
});

// GET /api/alerts - Lấy danh sách cảnh báo của user
export async function GET(request: NextRequest) {
  const session = await getServerSession();
  
  if (!session?.user?.id) {
    return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
  }

  const alerts = await prisma.alert.findMany({
    where: {
      userId: session.user.id,
      isActive: true
    },
    orderBy: { createdAt: 'desc' }
  });

  return NextResponse.json(alerts);
}

// POST /api/alerts - Tạo cảnh báo mới
export async function POST(request: NextRequest) {
  const session = await getServerSession();
  
  if (!session?.user?.id) {
    return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
  }

  try {
    const body = await request.json();
    const validated = alertSchema.parse(body);

    // Kiểm tra giới hạn số lượng alert (ví dụ: tối đa 50 alert/user)
    const count = await prisma.alert.count({
      where: { userId: session.user.id, isActive: true }
    });

    if (count >= 50) {
      return NextResponse.json(
        { error: 'Alert limit reached (max 50)' },
        { status: 400 }
      );
    }

    const alert = await prisma.alert.create({
      data: {
        userId: session.user.id,
        symbol: validated.symbol.toUpperCase(),
        conditionType: validated.conditionType,
        threshold: validated.threshold,
        notificationChannels: validated.notificationChannels,
        expiryDate: validated.expiryDate ? new Date(validated.expiryDate) : null,
        isActive: true
      }
    });

    // Thêm vào Redis để Alert Service kiểm tra real-time
    await addAlertToRedis(alert);

    return NextResponse.json(alert, { status: 201 });
  } catch (error) {
    if (error instanceof z.ZodError) {
      return NextResponse.json(
        { error: 'Invalid input', details: error.errors },
        { status: 400 }
      );
    }
    
    console.error('Error creating alert:', error);
    return NextResponse.json(
      { error: 'Failed to create alert' },
      { status: 500 }
    );
  }
}

// DELETE /api/alerts/:id
export async function DELETE(
  request: NextRequest,
  { params }: { params: { id: string } }
) {
  const session = await getServerSession();
  
  if (!session?.user?.id) {
    return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
  }

  const alert = await prisma.alert.findFirst({
    where: { id: params.id, userId: session.user.id }
  });

  if (!alert) {
    return NextResponse.json({ error: 'Alert not found' }, { status: 404 });
  }

  await prisma.alert.update({
    where: { id: params.id },
    data: { isActive: false }
  });

  // Remove from Redis
  await removeAlertFromRedis(params.id);

  return NextResponse.json({ success: true });
}

async function addAlertToRedis(alert: any) {
  const { getRedisClient } = await import('@/lib/redis');
  const redis = await getRedisClient();
  
  // Lưu alert vào sorted set theo symbol để dễ query
  await redis.zAdd(`alerts:${alert.symbol}`, {
    score: Date.now(),
    value: JSON.stringify({
      id: alert.id,
      userId: alert.userId,
      conditionType: alert.conditionType,
      threshold: alert.threshold,
      channels: alert.notificationChannels
    })
  });
}

async function removeAlertFromRedis(alertId: string) {
  // Implementation tùy thuộc vào cách lưu trữ
}
```

### 1.5 Alert Processing Service

TypeScript Copy

```typescript
// services/alert-processor.ts
import { getRedisClient } from '@/lib/redis';
import { prisma } from '@/lib/prisma';
import { sendNotification } from './notification';

interface StockUpdate {
  symbol: string;
  lastPrice: number;
  change: number;
  changePercent: number;
  volume: number;
  high: number;
  low: number;
  timestamp: string;
}

export class AlertProcessor {
  private isRunning = false;

  async start() {
    if (this.isRunning) return;
    this.isRunning = true;

    const redis = await getRedisClient();
    const subscriber = redis.duplicate();

    // Subscribe to stock updates channel
    await subscriber.subscribe('stock:updates', (message) => {
      const update: StockUpdate = JSON.parse(message);
      this.processAlerts(update);
    });

    console.log('Alert processor started');
  }

  private async processAlerts(update: StockUpdate) {
    const redis = await getRedisClient();
    
    // Lấy tất cả alerts cho symbol này
    const alerts = await redis.zRange(`alerts:${update.symbol}`, 0, -1);
    
    if (!alerts.length) return;

    for (const alertJson of alerts) {
      const alert = JSON.parse(alertJson);
      
      if (await this.checkCondition(alert, update)) {
        await this.triggerAlert(alert, update);
      }
    }
  }

  private async checkCondition(alert: any, update: StockUpdate): Promise<boolean> {
    switch (alert.conditionType) {
      case 'PRICE_ABOVE':
        return update.lastPrice >= alert.threshold;
      
      case 'PRICE_BELOW':
        return update.lastPrice <= alert.threshold;
      
      case 'CHANGE_PERCENT_ABOVE':
        return update.changePercent >= alert.threshold;
      
      case 'CHANGE_PERCENT_BELOW':
        return update.changePercent <= alert.threshold;
      
      case 'VOLUME_ABOVE':
        return update.volume >= alert.threshold;
      
      case 'BREAKOUT_HIGH':
        // Cần lấy high của N phiên trước để so sánh
        const recentHigh = await this.getRecentHigh(update.symbol, 20);
        return update.lastPrice > recentHigh;
      
      case 'BREAKOUT_LOW':
        const recentLow = await this.getRecentLow(update.symbol, 20);
        return update.lastPrice < recentLow;
      
      default:
        return false;
    }
  }

  private async triggerAlert(alert: any, update: StockUpdate) {
    // Gửi notification
    await sendNotification({
      userId: alert.userId,
      channels: alert.channels,
      title: `🚨 Cảnh báo ${update.symbol}`,
      body: `${update.symbol} đã chạm ngưỡng ${alert.conditionType} tại giá ${update.lastPrice.toLocaleString('vi-VN')}`,
      data: {
        symbol: update.symbol,
        price: update.lastPrice,
        change: update.changePercent,
        alertId: alert.id
      }
    });

    // Cập nhật trạng thái alert trong database
    await prisma.alert.update({
      where: { id: alert.id },
      data: {
        triggeredAt: new Date(),
        isActive: false // Tắt alert sau khi trigger (hoặc giữ lại tùy yêu cầu)
      }
    });

    // Xóa khỏi Redis
    const redis = await getRedisClient();
    await redis.zRem(`alerts:${update.symbol}`, JSON.stringify(alert));
  }

  private async getRecentHigh(symbol: string, days: number): Promise<number> {
    const prices = await prisma.stockPrice.findMany({
      where: {
        symbol,
        timestamp: {
          gte: new Date(Date.now() - days * 24 * 60 * 60 * 1000)
        }
      },
      orderBy: { timestamp: 'desc' },
      take: days,
      select: { high: true }
    });

    return Math.max(...prices.map(p => p.high));
  }

  private async getRecentLow(symbol: string, days: number): Promise<number> {
    const prices = await prisma.stockPrice.findMany({
      where: {
        symbol,
        timestamp: {
          gte: new Date(Date.now() - days * 24 * 60 * 60 * 1000)
        }
      },
      orderBy: { timestamp: 'desc' },
      take: days,
      select: { low: true }
    });

    return Math.min(...prices.map(p => p.low));
  }
}

export const alertProcessor = new AlertProcessor();
```

***

## 🗄️ PHẦN 2: DATABASE SCHEMA & MIGRATIONS

### 2.1 Cấu Trúc Database

plain Copy

```
┌─────────────────────────────────────────────────────────────┐
│                    PostgreSQL + TimescaleDB                 │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────┐ │
│  │   stock_prices  │  │    trades       │  │   indices   │ │
│  │   (hypertable)  │  │   (hypertable)  │  │  (regular)  │ │
│  └─────────────────┘  └─────────────────┘  └─────────────┘ │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────┐ │
│  │     alerts      │  │  watchlists     │  │  portfolios │ │
│  │   (regular)     │  │   (regular)     │  │  (regular)  │ │
│  └─────────────────┘  └─────────────────┘  └─────────────┘ │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────┐ │
│  │  transactions   │  │    users        │  │ notifications│ │
│  │   (regular)     │  │   (regular)     │  │  (regular)  │ │
│  └─────────────────┘  └─────────────────┘  └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
```

### 2.2 Schema Đầy Đủ với Prisma

prisma Copy

```prisma
// prisma/schema.prisma

generator client {
  provider = "prisma-client-js"
}

datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}

// ==================== USER MANAGEMENT ====================

model User {
  id            String    @id @default(uuid())
  email         String    @unique
  password      String    // hashed
  name          String?
  phone         String?
  telegramId    String?   @unique
  emailVerified DateTime?
  image         String?
  role          UserRole  @default(USER)
  isActive      Boolean   @default(true)
  createdAt     DateTime  @default(now())
  updatedAt     DateTime  @updatedAt

  // Relations
  alerts        Alert[]
  watchlists    Watchlist[]
  portfolios    Portfolio[]
  notifications Notification[]
  sessions      Session[]

  @@map("users")
}

enum UserRole {
  USER
  PREMIUM
  ADMIN
}

model Session {
  id           String   @id @default(uuid())
  userId       String
  token        String   @unique
  expiresAt    DateTime
  createdAt    DateTime @default(now())

  user User @relation(fields: [userId], references: [id], onDelete: Cascade)

  @@map("sessions")
}

// ==================== MARKET DATA ====================

// Giá chứng khoán theo thời gian (TimescaleDB hypertable)
model StockPrice {
  id        BigInt   @id @default(autoincrement())
  symbol    String   @db.VarChar(10)
  timestamp DateTime @db.Timestamptz(3)
  
  // OHLC
  open      Decimal  @db.Decimal(12, 2)
  high      Decimal  @db.Decimal(12, 2)
  low       Decimal  @db.Decimal(12, 2)
  close     Decimal  @db.Decimal(12, 2)
  
  // Volume & Value
  volume    BigInt
  value     BigInt
  
  // Bid/Ask (mảng 10 mức giá)
  bidPrice  Decimal[] @db.Decimal(12, 2)
  bidVolume BigInt[]
  askPrice  Decimal[] @db.Decimal(12, 2)
  askVolume BigInt[]
  
  // Thông tin thêm
  reference Decimal?  @db.Decimal(12, 2) // Giá tham chiếu
  ceiling   Decimal?  @db.Decimal(12, 2) // Giá trần
  floor     Decimal?  @db.Decimal(12, 2) // Giá sàn
  
  // Metadata
  source    String    @default("MBS") // MBS, VNDIRECT, SSI
  createdAt DateTime  @default(now())

  @@unique([symbol, timestamp])
  @@index([symbol, timestamp(sort: Desc)])
  @@index([timestamp])
  @@map("stock_prices")
}

// Chi tiết giao dịch (tape) - TimescaleDB hypertable
model Trade {
  id          BigInt   @id @default(autoincrement())
  symbol      String   @db.VarChar(10)
  timestamp   DateTime @db.Timestamptz(3)
  
  price       Decimal  @db.Decimal(12, 2)
  volume      BigInt
  side        TradeSide? // B: Buy, S: Sell, null: unknown
  
  // Thông tin giao dịch
  matchType   String?  @db.VarChar(10) // LO, MP, ATC, PLO, etc.
  orderId     String?  @db.VarChar(50) // ID để trace
  
  createdAt   DateTime @default(now())

  @@index([symbol, timestamp(sort: Desc)])
  @@index([timestamp])
  @@map("trades")
}

enum TradeSide {
  BUY
  SELL
}

// Chỉ số thị trường (VN-Index, VN30, etc.)
model MarketIndex {
  id          BigInt   @id @default(autoincrement())
  symbol      String   @db.VarChar(20) // VNINDEX, VN30, HNXINDEX, etc.
  timestamp   DateTime @db.Timestamptz(3)
  
  value       Decimal  @db.Decimal(12, 2)
  change      Decimal  @db.Decimal(12, 2)
  changePercent Decimal @db.Decimal(5, 2)
  
  // Thông tin thị trường
  advance     Int      // Số mã tăng
  decline     Int      // Số mã giảm
  unchanged   Int      // Số mã đứng giá
  totalVolume BigInt   // Tổng khối lượng
  totalValue  BigInt   // Tổng giá trị
  
  createdAt   DateTime @default(now())

  @@unique([symbol, timestamp])
  @@index([symbol, timestamp(sort: Desc)])
  @@map("market_indices")
}

// Thông tin công ty
model Company {
  id                String   @id @default(uuid())
  symbol            String   @unique @db.VarChar(10)
  
  // Thông tin cơ bản
  companyName       String
  shortName         String?
  industry          String?
  sector            String?
  exchange          String   @db.VarChar(10) // HOSE, HNX, UPCOM
  
  // Thông tin tài chính (cập nhật định kỳ)
  marketCap         BigInt?
  sharesOutstanding BigInt?
  eps               Decimal? @db.Decimal(12, 2)
  pe                Decimal? @db.Decimal(8, 2)
  pb                Decimal? @db.Decimal(8, 2)
  roe               Decimal? @db.Decimal(5, 2)
  roa               Decimal? @db.Decimal(5, 2)
  dividendYield     Decimal? @db.Decimal(5, 2)
  
  // Thông tin liên hệ
  website           String?
  address           String?
  employees         Int?
  
  // Metadata
  lastUpdated       DateTime @updatedAt
  createdAt         DateTime @default(now())

  @@map("companies")
}

// ==================== ALERT SYSTEM ====================

model Alert {
  id                   String   @id @default(uuid())
  userId               String
  
  // Cấu hình alert
  symbol               String   @db.VarChar(10)
  conditionType        AlertCondition
  threshold            Decimal  @db.Decimal(12, 4)
  
  // Thông báo
  notificationChannels NotificationChannel[]
  message              String?
  
  // Trạng thái
  isActive             Boolean  @default(true)
  triggeredAt          DateTime?
  triggerCount         Int      @default(0)
  maxTriggers          Int      @default(1) // Số lần trigger tối đa
  
  // Thời hạn
  expiryDate           DateTime?
  
  createdAt            DateTime @default(now())
  updatedAt            DateTime @updatedAt

  // Relations
  user User @relation(fields: [userId], references: [id], onDelete: Cascade)

  @@index([userId, isActive])
  @@index([symbol, isActive])
  @@map("alerts")
}

enum AlertCondition {
  PRICE_ABOVE
  PRICE_BELOW
  CHANGE_PERCENT_ABOVE
  CHANGE_PERCENT_BELOW
  VOLUME_ABOVE
  BREAKOUT_HIGH
  BREAKOUT_LOW
  RSI_ABOVE
  RSI_BELOW
}

enum NotificationChannel {
  WEB
  EMAIL
  TELEGRAM
  SMS
}

// Lịch sử alert đã trigger
model AlertHistory {
  id        String   @id @default(uuid())
  alertId   String
  userId    String
  
  // Dữ liệu khi trigger
  symbol    String   @db.VarChar(10)
  price     Decimal  @db.Decimal(12, 2)
  value     Decimal  @db.Decimal(12, 4) // Giá trị tại thời điểm trigger
  
  // Thông báo đã gửi
  channels  NotificationChannel[]
  sentAt    DateTime
  
  createdAt DateTime @default(now())

  @@index([userId, createdAt(sort: Desc)])
  @@map("alert_history")
}

// ==================== WATCHLIST & PORTFOLIO ====================

model Watchlist {
  id          String   @id @default(uuid())
  userId      String
  name        String
  description String?
  isDefault   Boolean  @default(false)
  order       Int      @default(0)
  createdAt   DateTime @default(now())
  updatedAt   DateTime @updatedAt

  // Relations
  user   User            @relation(fields: [userId], references: [id], onDelete: Cascade)
  items  WatchlistItem[]

  @@unique([userId, name])
  @@map("watchlists")
}

model WatchlistItem {
  id          String   @id @default(uuid())
  watchlistId String
  symbol      String   @db.VarChar(10)
  order       Int      @default(0)
  notes       String?
  addedAt     DateTime @default(now())

  watchlist Watchlist @relation(fields: [watchlistId], references: [id], onDelete: Cascade)

  @@unique([watchlistId, symbol])
  @@map("watchlist_items")
}

// Danh mục đầu tư
model Portfolio {
  id          String   @id @default(uuid())
  userId      String
  name        String
  description String?
  currency    String   @default("VND")
  isDefault   Boolean  @default(false)
  createdAt   DateTime @default(now())
  updatedAt   DateTime @updatedAt

  // Relations
  user         User          @relation(fields: [userId], references: [id], onDelete: Cascade)
  holdings     Holding[]
  transactions Transaction[]

  @@map("portfolios")
}

// Vị thế nắm giữ
model Holding {
  id          String   @id @default(uuid())
  portfolioId String
  symbol      String   @db.VarChar(10)
  
  // Số lượng
  quantity    Decimal  @db.Decimal(15, 2)
  averageCost Decimal  @db.Decimal(12, 2)
  
  // Thông tin cập nhật
  lastPrice   Decimal? @db.Decimal(12, 2)
  marketValue Decimal? @db.Decimal(15, 2)
  unrealizedPnl Decimal? @db.Decimal(15, 2)
  unrealizedPnlPercent Decimal? @db.Decimal(5, 2)
  
  updatedAt   DateTime @updatedAt
  createdAt   DateTime @default(now())

  portfolio Portfolio @relation(fields: [portfolioId], references: [id], onDelete: Cascade)

  @@unique([portfolioId, symbol])
  @@map("holdings")
}

// Lịch sử giao dịch
model Transaction {
  id          String   @id @default(uuid())
  portfolioId String
  symbol      String   @db.VarChar(10)
  
  type        TransactionType
  quantity    Decimal  @db.Decimal(15, 2)
  price       Decimal  @db.Decimal(12, 2)
  fees        Decimal  @db.Decimal(12, 2) @default(0)
  taxes       Decimal  @db.Decimal(12, 2) @default(0)
  total       Decimal  @db.Decimal(15, 2)
  
  date        DateTime @db.Date
  notes       String?
  
  createdAt   DateTime @default(now())

  portfolio Portfolio @relation(fields: [portfolioId], references: [id], onDelete: Cascade)

  @@index([portfolioId, date(sort: Desc)])
  @@map("transactions")
}

enum TransactionType {
  BUY
  SELL
  DIVIDEND
  SPLIT
  BONUS
  RIGHTS
}

// ==================== NOTIFICATIONS ====================

model Notification {
  id        String   @id @default(uuid())
  userId    String
  
  type      NotificationType
  title     String
  body      String
  data      Json?    // Additional payload
  
  isRead    Boolean  @default(false)
  readAt    DateTime?
  
  createdAt DateTime @default(now())

  user User @relation(fields: [userId], references: [id], onDelete: Cascade)

  @@index([userId, isRead])
  @@index([userId, createdAt(sort: Desc)])
  @@map("notifications")
}

enum NotificationType {
  ALERT
  PRICE_UPDATE
  SYSTEM
  NEWS
}

// ==================== SYSTEM ====================

// Cấu hình hệ thống
model SystemConfig {
  id        String   @id @default(uuid())
  key       String   @unique
  value     Json
  updatedAt DateTime @updatedAt
  createdAt DateTime @default(now())

  @@map("system_config")
}

// Log lỗi và sự kiện
model SystemLog {
  id        BigInt   @id @default(autoincrement())
  level     LogLevel
  service   String   @db.VarChar(50)
  message   String
  metadata  Json?
  timestamp DateTime @default(now())

  @@index([timestamp(sort: Desc)])
  @@index([level, timestamp(sort: Desc)])
  @@map("system_logs")
}

enum LogLevel {
  DEBUG
  INFO
  WARN
  ERROR
  FATAL
}
```

### 2.3 Migration Scripts

#### Migration 001: Initial Schema

sql Copy

```sql
-- migrations/001_initial_schema.sql

-- Enable TimescaleDB extension
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- ==================== USER MANAGEMENT ====================

CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    email VARCHAR(255) UNIQUE NOT NULL,
    password VARCHAR(255) NOT NULL,
    name VARCHAR(255),
    phone VARCHAR(20),
    telegram_id VARCHAR(50) UNIQUE,
    email_verified TIMESTAMPTZ,
    image VARCHAR(500),
    role VARCHAR(20) DEFAULT 'USER' CHECK (role IN ('USER', 'PREMIUM', 'ADMIN')),
    is_active BOOLEAN DEFAULT true,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE sessions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    token VARCHAR(255) UNIQUE NOT NULL,
    expires_at TIMESTAMPTZ NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- ==================== MARKET DATA ====================

CREATE TABLE stock_prices (
    id BIGSERIAL,
    symbol VARCHAR(10) NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL,
    open DECIMAL(12, 2) NOT NULL,
    high DECIMAL(12, 2) NOT NULL,
    low DECIMAL(12, 2) NOT NULL,
    close DECIMAL(12, 2) NOT NULL,
    volume BIGINT NOT NULL,
    value BIGINT NOT NULL,
    bid_price DECIMAL(12, 2)[] DEFAULT '{}',
    bid_volume BIGINT[] DEFAULT '{}',
    ask_price DECIMAL(12, 2)[] DEFAULT '{}',
    ask_volume BIGINT[] DEFAULT '{}',
    reference DECIMAL(12, 2),
    ceiling DECIMAL(12, 2),
    floor DECIMAL(12, 2),
    source VARCHAR(20) DEFAULT 'MBS',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    
    PRIMARY KEY (symbol, timestamp)
);

-- Convert to hypertable for time-series data
SELECT create_hypertable('stock_prices', 'timestamp', 
    chunk_time_interval => INTERVAL '1 day',
    if_not_exists => TRUE
);

-- Indexes for stock_prices
CREATE INDEX idx_stock_prices_symbol_time_desc ON stock_prices (symbol, timestamp DESC);
CREATE INDEX idx_stock_prices_timestamp ON stock_prices (timestamp DESC);

CREATE TABLE trades (
    id BIGSERIAL,
    symbol VARCHAR(10) NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL,
    price DECIMAL(12, 2) NOT NULL,
    volume BIGINT NOT NULL,
    side VARCHAR(4) CHECK (side IN ('BUY', 'SELL')),
    match_type VARCHAR(10),
    order_id VARCHAR(50),
    created_at TIMESTAMPTZ DEFAULT NOW(),
    
    PRIMARY KEY (id, timestamp)
);

-- Convert to hypertable
SELECT create_hypertable('trades', 'timestamp',
    chunk_time_interval => INTERVAL '1 day',
    if_not_exists => TRUE
);

CREATE INDEX idx_trades_symbol_time ON trades (symbol, timestamp DESC);

CREATE TABLE market_indices (
    id BIGSERIAL,
    symbol VARCHAR(20) NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL,
    value DECIMAL(12, 2) NOT NULL,
    change DECIMAL(12, 2) NOT NULL,
    change_percent DECIMAL(5, 2) NOT NULL,
    advance INT DEFAULT 0,
    decline INT DEFAULT 0,
    unchanged INT DEFAULT 0,
    total_volume BIGINT DEFAULT 0,
    total_value BIGINT DEFAULT 0,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    
    PRIMARY KEY (symbol, timestamp)
);

SELECT create_hypertable('market_indices', 'timestamp',
    chunk_time_interval => INTERVAL '1 day',
    if_not_exists => TRUE
);

CREATE TABLE companies (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    symbol VARCHAR(10) UNIQUE NOT NULL,
    company_name VARCHAR(255) NOT NULL,
    short_name VARCHAR(100),
    industry VARCHAR(100),
    sector VARCHAR(100),
    exchange VARCHAR(10) NOT NULL CHECK (exchange IN ('HOSE', 'HNX', 'UPCOM')),
    market_cap BIGINT,
    shares_outstanding BIGINT,
    eps DECIMAL(12, 2),
    pe DECIMAL(8, 2),
    pb DECIMAL(8, 2),
    roe DECIMAL(5, 2),
    roa DECIMAL(5, 2),
    dividend_yield DECIMAL(5, 2),
    website VARCHAR(255),
    address TEXT,
    employees INT,
    last_updated TIMESTAMPTZ DEFAULT NOW(),
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- ==================== ALERT SYSTEM ====================

CREATE TYPE alert_condition AS ENUM (
    'PRICE_ABOVE', 'PRICE_BELOW', 
    'CHANGE_PERCENT_ABOVE', 'CHANGE_PERCENT_BELOW',
    'VOLUME_ABOVE', 'BREAKOUT_HIGH', 'BREAKOUT_LOW',
    'RSI_ABOVE', 'RSI_BELOW'
);

CREATE TYPE notification_channel AS ENUM ('WEB', 'EMAIL', 'TELEGRAM', 'SMS');

CREATE TABLE alerts (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    symbol VARCHAR(10) NOT NULL,
    condition_type alert_condition NOT NULL,
    threshold DECIMAL(12, 4) NOT NULL,
    notification_channels notification_channel[] DEFAULT '{}',
    message TEXT,
    is_active BOOLEAN DEFAULT true,
    triggered_at TIMESTAMPTZ,
    trigger_count INT DEFAULT 0,
    max_triggers INT DEFAULT 1,
    expiry_date TIMESTAMPTZ,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_alerts_user_active ON alerts(user_id, is_active);
CREATE INDEX idx_alerts_symbol_active ON alerts(symbol, is_active);

CREATE TABLE alert_history (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    alert_id UUID NOT NULL,
    user_id UUID NOT NULL,
    symbol VARCHAR(10) NOT NULL,
    price DECIMAL(12, 2) NOT NULL,
    value DECIMAL(12, 4) NOT NULL,
    channels notification_channel[] DEFAULT '{}',
    sent_at TIMESTAMPTZ NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_alert_history_user ON alert_history(user_id, created_at DESC);

-- ==================== WATCHLIST & PORTFOLIO ====================

CREATE TABLE watchlists (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    is_default BOOLEAN DEFAULT false,
    "order" INT DEFAULT 0,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    
    UNIQUE(user_id, name)
);

CREATE TABLE watchlist_items (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    watchlist_id UUID NOT NULL REFERENCES watchlists(id) ON DELETE CASCADE,
    symbol VARCHAR(10) NOT NULL,
    "order" INT DEFAULT 0,
    notes TEXT,
    added_at TIMESTAMPTZ DEFAULT NOW(),
    
    UNIQUE(watchlist_id, symbol)
);

CREATE TABLE portfolios (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    currency VARCHAR(3) DEFAULT 'VND',
    is_default BOOLEAN DEFAULT false,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TYPE transaction_type AS ENUM ('BUY', 'SELL', 'DIVIDEND', 'SPLIT', 'BONUS', 'RIGHTS');

CREATE TABLE holdings (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    portfolio_id UUID NOT NULL REFERENCES portfolios(id) ON DELETE CASCADE,
    symbol VARCHAR(10) NOT NULL,
    quantity DECIMAL(15, 2) NOT NULL,
    average_cost DECIMAL(12, 2) NOT NULL,
    last_price DECIMAL(12, 2),
    market_value DECIMAL(15, 2),
    unrealized_pnl DECIMAL(15, 2),
    unrealized_pnl_percent DECIMAL(5, 2),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    created_at TIMESTAMPTZ DEFAULT NOW(),
    
    UNIQUE(portfolio_id, symbol)
);

CREATE TABLE transactions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    portfolio_id UUID NOT NULL REFERENCES portfolios(id) ON DELETE CASCADE,
    symbol VARCHAR(10) NOT NULL,
    type transaction_type NOT NULL,
    quantity DECIMAL(15, 2) NOT NULL,
    price DECIMAL(12, 2) NOT NULL,
    fees DECIMAL(12, 2) DEFAULT 0,
    taxes DECIMAL(12, 2) DEFAULT 0,
    total DECIMAL(15, 2) NOT NULL,
    date DATE NOT NULL,
    notes TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_transactions_portfolio_date ON transactions(portfolio_id, date DESC);

-- ==================== NOTIFICATIONS ====================

CREATE TYPE notification_type AS ENUM ('ALERT', 'PRICE_UPDATE', 'SYSTEM', 'NEWS');

CREATE TABLE notifications (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    type notification_type NOT NULL,
    title VARCHAR(255) NOT NULL,
    body TEXT NOT NULL,
    data JSONB,
    is_read BOOLEAN DEFAULT false,
    read_at TIMESTAMPTZ,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_notifications_user_unread ON notifications(user_id, is_read);
CREATE INDEX idx_notifications_user_created ON notifications(user_id, created_at DESC);

-- ==================== SYSTEM ====================

CREATE TABLE system_config (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    key VARCHAR(100) UNIQUE NOT NULL,
    value JSONB NOT NULL,
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TYPE log_level AS ENUM ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL');

CREATE TABLE system_logs (
    id BIGSERIAL PRIMARY KEY,
    level log_level NOT NULL,
    service VARCHAR(50) NOT NULL,
    message TEXT NOT NULL,
    metadata JSONB,
    timestamp TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_system_logs_timestamp ON system_logs(timestamp DESC);
CREATE INDEX idx_system_logs_level_timestamp ON system_logs(level, timestamp DESC);

-- Continuous aggregates for fast queries
CREATE MATERIALIZED VIEW stock_prices_1h
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', timestamp) AS bucket,
    symbol,
    first(open, timestamp) as open,
    max(high) as high,
    min(low) as low,
    last(close, timestamp) as close,
    sum(volume) as volume,
    sum(value) as value
FROM stock_prices
GROUP BY bucket, symbol;

-- Retention policy (giữ data 1 năm cho raw data, aggregate giữ lâu hơn)
SELECT add_retention_policy('stock_prices', INTERVAL '1 year');
SELECT add_retention_policy('trades', INTERVAL '6 months');
```

#### Migration 002: Add Functions & Triggers

sql Copy

```sql
-- migrations/002_functions_triggers.sql

-- Function để tự động cập nhật updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ language 'plpgsql';

-- Apply cho các bảng cần updated_at
CREATE TRIGGER update_users_updated_at BEFORE UPDATE ON users
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER update_alerts_updated_at BEFORE UPDATE ON alerts
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER update_watchlists_updated_at BEFORE UPDATE ON watchlists
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER update_portfolios_updated_at BEFORE UPDATE ON portfolios
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER update_holdings_updated_at BEFORE UPDATE ON holdings
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

-- Function để tính toán P&L tự động khi cập nhật holding
CREATE OR REPLACE FUNCTION calculate_holding_pnl()
RETURNS TRIGGER AS $$
BEGIN
    IF NEW.last_price IS NOT NULL THEN
        NEW.market_value := NEW.quantity * NEW.last_price;
        NEW.unrealized_pnl := (NEW.last_price - NEW.average_cost) * NEW.quantity;
        
        IF NEW.average_cost > 0 THEN
            NEW.unrealized_pnl_percent := ((NEW.last_price - NEW.average_cost) / NEW.average_cost) * 100;
        END IF;
    END IF;
    
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trigger_calculate_holding_pnl
    BEFORE INSERT OR UPDATE ON holdings
    FOR EACH ROW
    EXECUTE FUNCTION calculate_holding_pnl();

-- Function để tính total khi insert transaction
CREATE OR REPLACE FUNCTION calculate_transaction_total()
RETURNS TRIGGER AS $$
BEGIN
    NEW.total := (NEW.quantity * NEW.price) + NEW.fees + NEW.taxes;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trigger_calculate_transaction_total
    BEFORE INSERT ON transactions
    FOR EACH ROW
    EXECUTE FUNCTION calculate_transaction_total();

-- Function để cập nhật holding khi có transaction mới
CREATE OR REPLACE FUNCTION update_holding_on_transaction()
RETURNS TRIGGER AS $$
DECLARE
    existing_holding_id UUID;
    current_qty DECIMAL(15,2);
    current_avg DECIMAL(12,2);
BEGIN
    -- Tìm holding hiện tại
    SELECT id, quantity, average_cost 
    INTO existing_holding_id, current_qty, current_avg
    FROM holdings 
    WHERE portfolio_id = NEW.portfolio_id AND symbol = NEW.symbol;
    
    IF NEW.type = 'BUY' THEN
        IF existing_holding_id IS NOT NULL THEN
            -- Cập nhật holding hiện có
            UPDATE holdings SET
                quantity = current_qty + NEW.quantity,
                average_cost = ((current_qty * current_avg) + (NEW.quantity * NEW.price)) / (current_qty + NEW.quantity),
                updated_at = NOW()
            WHERE id = existing_holding_id;
        ELSE
            -- Tạo holding mới
            INSERT INTO holdings (portfolio_id, symbol, quantity, average_cost)
            VALUES (NEW.portfolio_id, NEW.symbol, NEW.quantity, NEW.price);
        END IF;
        
    ELSIF NEW.type = 'SELL' THEN
        IF existing_holding_id IS NOT NULL THEN
            IF current_qty <= NEW.quantity THEN
                -- Xóa holding nếu bán hết
                DELETE FROM holdings WHERE id = existing_holding_id;
            ELSE
                -- Giảm số lượng (không đổi average cost)
                UPDATE holdings SET
                    quantity = current_qty - NEW.quantity,
                    updated_at = NOW()
                WHERE id = existing_holding_id;
            END IF;
        END IF;
    END IF;
    
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trigger_update_holding_on_transaction
    AFTER INSERT ON transactions
    FOR EACH ROW
    EXECUTE FUNCTION update_holding_on_transaction();

-- Function để lấy top movers
CREATE OR REPLACE FUNCTION get_top_movers(
    p_exchange VARCHAR(10) DEFAULT NULL,
    p_limit INT DEFAULT 10,
    p_order_by VARCHAR(20) DEFAULT 'change_percent' -- 'change_percent', 'volume', 'value'
)
RETURNS TABLE (
    symbol VARCHAR(10),
    company_name VARCHAR(255),
    last_price DECIMAL(12,2),
    change DECIMAL(12,2),
    change_percent DECIMAL(5,2),
    volume BIGINT,
    value BIGINT,
    market_cap BIGINT
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        sp.symbol,
        c.company_name,
        sp.close as last_price,
        sp.close - sp.reference as change,
        ((sp.close - sp.reference) / sp.reference * 100)::DECIMAL(5,2) as change_percent,
        sp.volume,
        sp.value,
        c.market_cap
    FROM stock_prices sp
    JOIN companies c ON sp.symbol = c.symbol
    WHERE sp.timestamp = (
        SELECT MAX(timestamp) FROM stock_prices
    )
    AND (p_exchange IS NULL OR c.exchange = p_exchange)
    ORDER BY 
        CASE p_order_by
            WHEN 'change_percent' THEN ABS((sp.close - sp.reference) / sp.reference * 100)
            WHEN 'volume' THEN sp.volume
            WHEN 'value' THEN sp.value
        END DESC
    LIMIT p_limit;
END;
$$ LANGUAGE plpgsql;

-- Function để lấy market overview
CREATE OR REPLACE FUNCTION get_market_overview()
RETURNS TABLE (
    exchange VARCHAR(10),
    index_value DECIMAL(12,2),
    index_change DECIMAL(12,2),
    index_change_percent DECIMAL(5,2),
    advance INT,
    decline INT,
    unchanged INT,
    total_volume BIGINT,
    total_value BIGINT
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        CASE 
            WHEN mi.symbol = 'VNINDEX' THEN 'HOSE'
            WHEN mi.symbol = 'HNXINDEX' THEN 'HNX'
            WHEN mi.symbol = 'UPCOMINDEX' THEN 'UPCOM'
        END as exchange,
        mi.value as index_value,
        mi.change as index_change,
        mi.change_percent as index_change_percent,
        mi.advance,
        mi.decline,
        mi.unchanged,
        mi.total_volume,
        mi.total_value
    FROM market_indices mi
    WHERE mi.timestamp = (
        SELECT MAX(timestamp) FROM market_indices
    )
    AND mi.symbol IN ('VNINDEX', 'HNXINDEX', 'UPCOMINDEX');
END;
$$ LANGUAGE plpgsql;
```

#### Migration 003: Seed Data

sql Copy

```sql
-- migrations/003_seed_data.sql

-- Insert các công ty mẫu (top cổ phiếu VN30)
INSERT INTO companies (symbol, company_name, short_name, industry, sector, exchange) VALUES
('VNM', 'Vinamilk', 'Vinamilk', 'Thực phẩm', 'Tiêu dùng', 'HOSE'),
('VIC', 'Vingroup', 'Vingroup', 'Bất động sản', 'Bất động sản', 'HOSE'),
('HPG', 'Hòa Phát', 'Hòa Phát', 'Thép', 'Vật liệu', 'HOSE'),
('MWG', 'Thế Giới Di Động', 'TGDD', 'Bán lẻ', 'Tiêu dùng', 'HOSE'),
('FPT', 'FPT Corporation', 'FPT', 'Công nghệ', 'Công nghệ', 'HOSE'),
('VCB', 'Vietcombank', 'Vietcombank', 'Ngân hàng', 'Tài chính', 'HOSE'),
('VHM', 'Vinhomes', 'Vinhomes', 'Bất động sản', 'Bất động sản', 'HOSE'),
('GAS', 'PV Gas', 'PV Gas', 'Dầu khí', 'Năng lượng', 'HOSE'),
('MSN', 'Masan Group', 'Masan', 'Tiêu dùng', 'Tiêu dùng', 'HOSE'),
('TCH', 'Techcombank', 'Techcombank', 'Ngân hàng', 'Tài chính', 'HOSE');

-- Insert system config mặc định
INSERT INTO system_config (key, value) VALUES
('market_hours', '{"open": "09:00", "close": "15:00", "lunch_start": "11:30", "lunch_end": "13:00"}'::jsonb),
('data_sources', '{"primary": "MBS", "fallbacks": ["VNDIRECT", "SSI"]}'::jsonb),
('alert_limits', '{"free": 5, "premium": 50}'::jsonb),
('websocket_config', '{"reconnect_interval": 5000, "max_reconnect": 10}'::jsonb);

-- Insert sample data cho testing (giả lập 1 ngày giao dịch)
DO $$
DECLARE
    v_date DATE := CURRENT_DATE;
    v_time TIME;
    v_base_price DECIMAL(12,2);
    v_price DECIMAL(12,2);
BEGIN
    -- Tạo dữ liệu cho VNM
    v_base_price := 78500;
    v_time := '09:00:00';
    
    WHILE v_time <= '15:00:00' LOOP
        -- Random walk price
        v_price := v_base_price + (random() - 0.5) * 1000;
        
        INSERT INTO stock_prices (
            symbol, timestamp, open, high, low, close, 
            volume, value, bid_price, bid_volume, ask_price, ask_volume,
            reference, ceiling, floor
        ) VALUES (
            'VNM',
            v_date + v_time,
            v_price - 50,
            v_price + 100,
            v_price - 100,
            v_price,
            (random() * 100000)::BIGINT,
            (random() * 1000000000)::BIGINT,
            ARRAY[v_price-100, v_price-200, v_price-300],
            ARRAY[(random()*1000)::BIGINT, (random()*1000)::BIGINT, (random()*1000)::BIGINT],
            ARRAY[v_price+100, v_price+200, v_price+300],
            ARRAY[(random()*1000)::BIGINT, (random()*1000)::BIGINT, (random()*1000)::BIGINT],
            78500,
            86300,
            70700
        );
        
        v_time := v_time + INTERVAL '5 minutes';
        v_base_price := v_price;
    END LOOP;
END $$;
```

### 2.4 Docker Compose Setup

yaml Copy

```yaml
# docker-compose.yml
version: '3.8'

services:
  postgres:
    image: timescale/timescaledb:latest-pg15
    container_name: mbs_tracker_db
    environment:
      POSTGRES_USER: mbs_user
      POSTGRES_PASSWORD: ${DB_PASSWORD:-secure_password_123}
      POSTGRES_DB: mbs_tracker
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./migrations:/docker-entrypoint-initdb.d
    command: >
      postgres 
      -c shared_preload_libraries=timescaledb
      -c max_connections=200
      -c shared_buffers=2GB
      -c effective_cache_size=6GB
      -c maintenance_work_mem=512MB
      -c checkpoint_completion_target=0.9
      -c wal_buffers=16MB
      -c default_statistics_target=100
      -c random_page_cost=1.1
      -c effective_io_concurrency=200
      -c work_mem=5242kB
      -c min_wal_size=1GB
      -c max_wal_size=4GB
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U mbs_user -d mbs_tracker"]
      interval: 10s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine
    container_name: mbs_tracker_redis
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 3s
      retries: 5

  # Optional: Grafana for monitoring
  grafana:
    image: grafana/grafana:latest
    container_name: mbs_tracker_grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD:-admin}
    volumes:
      - grafana_data:/var/lib/grafana
    depends_on:
      - postgres

volumes:
  postgres_data:
  redis_data:
  grafana_data:
```

### 2.5 Database Connection & Client Setup

TypeScript Copy

```typescript
// lib/prisma.ts
import { PrismaClient } from '@prisma/client';

const globalForPrisma = globalThis as unknown as {
  prisma: PrismaClient | undefined;
};

export const prisma = globalForPrisma.prisma ?? new PrismaClient({
  log: process.env.NODE_ENV === 'development' 
    ? ['query', 'error', 'warn'] 
    : ['error'],
});

if (process.env.NODE_ENV !== 'production') globalForPrisma.prisma = prisma;

// Connection pool settings
export const prismaExtended = prisma.$extends({
  query: {
    $allModels: {
      async findMany({ model, operation, args, query }) {
        // Add query timeout
        const start = Date.now();
        const result = await query(args);
        const duration = Date.now() - start;
        
        if (duration > 1000) {
          console.warn(`Slow query detected: ${model}.${operation} took ${duration}ms`);
        }
        
        return result;
      },
    },
  },
});
```

TypeScript Copy

```typescript
// lib/redis.ts
import { createClient, RedisClientType } from 'redis';

let redisClient: RedisClientType | null = null;

export async function getRedisClient(): Promise<RedisClientType> {
  if (!redisClient) {
    redisClient = createClient({
      url: process.env.REDIS_URL || 'redis://localhost:6379',
      socket: {
        reconnectStrategy: (retries) => {
          if (retries > 10) {
            console.error('Max redis reconnection attempts reached');
            return new Error('Max retries');
          }
          return Math.min(retries * 100, 3000);
        },
      },
    });

    redisClient.on('error', (err) => console.error('Redis Client Error', err));
    redisClient.on('connect', () => console.log('Redis Client Connected'));
    
    await redisClient.connect();
  }
  
  return redisClient;
}

// Helper functions cho common operations
export async function getCachedStock(symbol: string) {
  const client = await getRedisClient();
  const data = await client.get(`stock:${symbol.toUpperCase()}:latest`);
  return data ? JSON.parse(data) : null;
}

export async function setCachedStock(symbol: string, data: any, ttl: number = 60) {
  const client = await getRedisClient();
  await client.setEx(`stock:${symbol.toUpperCase()}:latest`, ttl, JSON.stringify(data));
}

export async function publishStockUpdate(data: any) {
  const client = await getRedisClient();
  await client.publish('stock:updates', JSON.stringify(data));
}
```

***

Đây là toàn bộ spec kỹ thuật chi tiết cho hệ thống. Bạn cần tôi giải thích thêm phần nào hoặc triển khai thêm tính năng nào không?
