Skip to content

Latest commit

 

History

History
465 lines (356 loc) · 14.2 KB

File metadata and controls

465 lines (356 loc) · 14.2 KB

データベース自動初期化ガイド

概要

このドキュメントは、LabCodeのデータベース自動初期化機能について説明します。 FastAPI起動時にデータベースの状態をチェックし、必要に応じて自動的にテーブルを作成・マイグレーションを実行します。

背景

問題

  • データベースファイルが存在しない、または空の場合にAPIがinternal errorを返す
  • 手動でのDB初期化が必要で、開発環境セットアップが煩雑

解決策

FastAPI起動時に自動的にデータベースの状態を検証し、必要な初期化を行う仕組みを導入。


判定フローチャート

[コンテナ起動]
      ↓
[uvicorn main:app]
      ↓
[lifespan起動時処理]
      ↓
┌─────────────────────────────────┐
│ ensure_database_ready() 呼び出し │
└─────────────────────────────────┘
      ↓
┌─────────────────────────────────┐
│ 1. DBファイル存在チェック         │
│    /data/sql_app.db             │
└─────────────────────────────────┘
      ↓
  [存在しない] ──→ create_all() ──→ [完了]
      ↓
  [存在する]
      ↓
┌─────────────────────────────────┐
│ 2. ファイルサイズチェック          │
└─────────────────────────────────┘
      ↓
  [0バイト] ──→ create_all() ──→ [完了]
      ↓
  [>0バイト]
      ↓
┌─────────────────────────────────┐
│ 3. テーブル存在チェック            │
│    SELECT name FROM sqlite_master│
└─────────────────────────────────┘
      ↓
  [不足あり] ──→ create_all() ──→ カスタムマイグレーション ──→ [完了]
      ↓                                         ↑
  [全て存在]                           既存データ保持!
      ↓
┌─────────────────────────────────┐
│ 4. カスタムマイグレーション        │
│    ALTER TABLE(カラム追加等)    │
└─────────────────────────────────┘
      ↓
[正常起動]

動作モード

モード トリガー 実行内容 対象環境
自動 FastAPI起動時 create_all() + 軽量マイグレーション 開発・テスト
手動 docker exec 個別スクリプト実行 本番

実装詳細

ファイル構成

labcode-log-server/
├── app/
│   ├── main.py              # lifespan コンテキストマネージャ
│   ├── init_db.py           # DB初期化モジュール(新規)
│   ├── define_db/
│   │   ├── database.py      # DB接続設定
│   │   └── models.py        # テーブル定義
│   └── scripts/
│       ├── add_process_type_column.py  # 手動マイグレーション
│       └── migrate_ports.py            # 手動マイグレーション
└── data/
    └── sql_app.db           # データベースファイル

init_db.py

#!/usr/bin/env python3
"""
データベース自動初期化・マイグレーションモジュール

FastAPI起動時に呼び出され:
1. データベースの状態をチェック
2. 必要に応じてテーブルを自動作成(既存データは保持)
3. 必要に応じてカスタムマイグレーションを実行
"""
import os
import logging
from pathlib import Path
from sqlalchemy import text
from define_db.database import engine, SQLALCHEMY_DATABASE_URL
from define_db.models import Base

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

DB_PATH = Path("/data/sql_app.db")

REQUIRED_TABLES = [
    'users', 'projects', 'runs', 'processes',
    'operations', 'edges', 'ports', 'port_connections',
    'process_operations'
]

# ============================================
# カスタムマイグレーション定義
# ============================================
MIGRATIONS = [
    {
        "version": "001",
        "description": "Ensure storage_mode column in runs",
        "check": "SELECT 1 FROM pragma_table_info('runs') WHERE name='storage_mode'",
        "sql": "ALTER TABLE runs ADD COLUMN storage_mode VARCHAR(10)"
    },
    {
        "version": "002",
        "description": "Ensure process_type column in processes",
        "check": "SELECT 1 FROM pragma_table_info('processes') WHERE name='process_type'",
        "sql": "ALTER TABLE processes ADD COLUMN process_type VARCHAR(256)"
    },
    {
        "version": "003",
        "description": "Ensure display_visible column in runs",
        "check": "SELECT 1 FROM pragma_table_info('runs') WHERE name='display_visible'",
        "sql": "ALTER TABLE runs ADD COLUMN display_visible BOOLEAN DEFAULT 1 NOT NULL"
    },
]


def check_database_file() -> dict:
    """データベースファイルの状態をチェック"""
    result = {
        'exists': False,
        'size': 0,
        'is_empty': True,
        'is_readable': False
    }

    if DB_PATH.exists():
        result['exists'] = True
        result['size'] = DB_PATH.stat().st_size
        result['is_empty'] = result['size'] == 0

        try:
            with open(DB_PATH, 'rb') as f:
                f.read(16)
            result['is_readable'] = True
        except (IOError, PermissionError):
            result['is_readable'] = False

    return result


def check_tables() -> dict:
    """データベース内のテーブル存在をチェック"""
    result = {
        'existing_tables': [],
        'missing_tables': [],
        'all_present': False
    }

    try:
        with engine.connect() as conn:
            query = text("SELECT name FROM sqlite_master WHERE type='table'")
            tables = [row[0] for row in conn.execute(query)]
            result['existing_tables'] = tables
            result['missing_tables'] = [t for t in REQUIRED_TABLES if t not in tables]
            result['all_present'] = len(result['missing_tables']) == 0
    except Exception as e:
        logger.warning(f"テーブルチェック中にエラー: {e}")
        result['missing_tables'] = REQUIRED_TABLES

    return result


def create_tables():
    """
    全テーブルを作成

    ⚠️ 重要: create_all()は既存テーブルのデータを削除しない
    既存テーブルはスキップされ、新規テーブルのみ作成される
    """
    logger.info("テーブル作成を開始(既存テーブルはスキップ)...")
    Base.metadata.create_all(engine)
    logger.info("テーブル作成完了")


def run_custom_migrations():
    """
    カスタムマイグレーションを実行

    既存テーブルへのカラム追加など、create_all()で対応できない
    スキーマ変更を実行する。既存データは保持される。
    """
    logger.info("カスタムマイグレーションをチェック...")

    with engine.connect() as conn:
        applied_count = 0
        skipped_count = 0

        for migration in MIGRATIONS:
            version = migration["version"]
            description = migration["description"]

            try:
                result = conn.execute(text(migration["check"]))
                if result.fetchone():
                    logger.debug(f"Migration {version} already applied: {description}")
                    skipped_count += 1
                    continue
            except Exception:
                skipped_count += 1
                continue

            logger.info(f"Applying migration {version}: {description}")
            try:
                conn.execute(text(migration["sql"]))
                conn.commit()
                logger.info(f"Migration {version} completed")
                applied_count += 1
            except Exception as e:
                logger.error(f"Migration {version} failed: {e}")

        if applied_count > 0:
            logger.info(f"マイグレーション完了: {applied_count}件適用, {skipped_count}件スキップ")
        else:
            logger.info(f"マイグレーション: 全て適用済み ({skipped_count}件)")


def ensure_database_ready() -> dict:
    """
    データベースが使用可能な状態であることを保証

    Returns:
        dict: 実行結果サマリー
    """
    summary = {
        'action': None,
        'file_status': None,
        'table_status': None,
        'migrations_run': False,
        'success': False
    }

    # Step 1: ファイル状態チェック
    file_status = check_database_file()
    summary['file_status'] = file_status

    need_create = False

    if not file_status['exists']:
        logger.info(f"[DB Init] データベースファイルが存在しません: {DB_PATH}")
        need_create = True
        summary['action'] = 'create_new'

    elif file_status['is_empty']:
        logger.warning(f"[DB Init] データベースファイルが空です (0 bytes)")
        need_create = True
        summary['action'] = 'initialize_empty'

    if need_create:
        logger.info("[DB Init] テーブルを作成します...")
        create_tables()

        table_status = check_tables()
        summary['table_status'] = table_status

        if table_status['all_present']:
            logger.info("[DB Init] 初期化完了")
            summary['success'] = True
        return summary

    # Step 2: テーブル存在チェック
    table_status = check_tables()
    summary['table_status'] = table_status

    if not table_status['all_present']:
        missing = ', '.join(table_status['missing_tables'])
        logger.info(f"[DB Init] 不足テーブル: {missing}")
        logger.info("[DB Init] 不足テーブルを作成します(既存データは保持)...")
        summary['action'] = 'create_missing'

        create_tables()

        table_status = check_tables()
        summary['table_status'] = table_status

    # Step 3: カスタムマイグレーション
    run_custom_migrations()
    summary['migrations_run'] = True

    if summary['action'] is None:
        summary['action'] = 'none'

    summary['success'] = True
    logger.info(f"[DB Init] データベース準備完了 (テーブル数: {len(table_status['existing_tables'])})")

    return summary


if __name__ == "__main__":
    result = ensure_database_ready()
    print(f"\n=== 実行結果 ===")
    print(f"アクション: {result['action']}")
    print(f"成功: {result['success']}")
    if result['table_status']:
        print(f"テーブル数: {len(result['table_status']['existing_tables'])}")

main.py への統合

from fastapi import FastAPI
from contextlib import asynccontextmanager
import logging

logger = logging.getLogger(__name__)


@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    FastAPIライフサイクル管理

    起動時: データベース初期化チェック
    終了時: リソースクリーンアップ
    """
    # === 起動時処理 ===
    logger.info("=== FastAPI Starting ===")

    # DB初期化
    from init_db import ensure_database_ready
    result = ensure_database_ready()

    if not result['success'] and result['action'] != 'none':
        logger.error("データベース初期化に失敗しました")

    logger.info("=== FastAPI Ready ===")

    yield  # アプリケーション実行

    # === 終了時処理 ===
    logger.info("=== FastAPI Shutting Down ===")


# FastAPIアプリ作成(lifespanを指定)
app = FastAPI(lifespan=lifespan)

冪等性

すべての初期化処理は冪等性があり、何度実行しても安全です。

処理 動作
create_all() 既存テーブルはスキップ
カスタムマイグレーション check クエリで適用済みを確認後スキップ

新しいマイグレーションの追加

Step 1: MIGRATIONS リストに追加

MIGRATIONS = [
    # ... 既存のマイグレーション ...
    {
        "version": "004",  # 連番
        "description": "Add new_column to some_table",
        "check": "SELECT 1 FROM pragma_table_info('some_table') WHERE name='new_column'",
        "sql": "ALTER TABLE some_table ADD COLUMN new_column VARCHAR(256)"
    },
]

Step 2: コンテナ再起動

docker compose restart log_server

Step 3: ログ確認

docker logs labcode_log_server | grep "Migration"

トラブルシューティング

DBファイルが0バイトになる

原因: コンテナ異常終了、ディスク容量不足

解決策:

# バックアップから復元
docker compose stop log_server
sudo cp data/sql_app.db.bak data/sql_app.db
sudo chown $(whoami):$(whoami) data/sql_app.db
docker compose start log_server

マイグレーションが失敗する

原因: SQLite固有の制限(カラム削除不可等)

解決策: 複雑なスキーマ変更は個別スクリプト(scripts/)で対応


Alembic不採用の理由

本プロジェクトではAlembicを採用しませんでした。

理由 説明
既存アプローチとの整合性 create_all() + 個別スクリプト方式が既に実績あり
SQLite環境では過剰 Alembicの強みは複数環境でのスキーマ同期
学習コスト チーム全員がAlembicを習得する必要
ロールバック需要の低さ 開発環境ではDBリセットの方が早い

関連ドキュメント


作成日

2025-12-24

変更履歴

  • v1.0: 初版