- Add workers API for pool management - Add jobs API for queue operations - Add scan-rules API for CRUD operations - Add scanner API for control and status - Add settings API for configuration management - Add system API for resource monitoring - Add filesystem API for path browsing - Add setup wizard API endpoint
352 lines
12 KiB
Python
352 lines
12 KiB
Python
"""Scan rules management API routes."""
|
|
import logging
|
|
from typing import List, Optional
|
|
from fastapi import APIRouter, HTTPException, status
|
|
from pydantic import BaseModel, Field
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/api/scan-rules", tags=["scan-rules"])
|
|
|
|
|
|
# === REQUEST/RESPONSE MODELS ===
|
|
|
|
class ScanRuleConditions(BaseModel):
|
|
"""Scan rule conditions."""
|
|
audio_language_is: Optional[str] = Field(None, description="Audio language must be (ISO 639-2)")
|
|
audio_language_not: Optional[str] = Field(None, description="Audio language must NOT be (comma-separated)")
|
|
audio_track_count_min: Optional[int] = Field(None, description="Minimum number of audio tracks")
|
|
has_embedded_subtitle_lang: Optional[str] = Field(None, description="Must have embedded subtitle in language")
|
|
missing_embedded_subtitle_lang: Optional[str] = Field(None, description="Must NOT have embedded subtitle")
|
|
missing_external_subtitle_lang: Optional[str] = Field(None, description="Must NOT have external .srt file")
|
|
file_extension: Optional[str] = Field(None, description="File extensions filter (comma-separated)")
|
|
|
|
|
|
class ScanRuleAction(BaseModel):
|
|
"""Scan rule action."""
|
|
action_type: str = Field("transcribe", description="Action type: transcribe or translate")
|
|
target_language: str = Field(..., description="Target subtitle language (ISO 639-2)")
|
|
quality_preset: str = Field("fast", description="Quality preset: fast, balanced, best")
|
|
job_priority: int = Field(0, description="Priority for created jobs")
|
|
|
|
|
|
class ScanRuleCreateRequest(BaseModel):
|
|
"""Request to create a scan rule."""
|
|
name: str = Field(..., description="Rule name")
|
|
enabled: bool = Field(True, description="Whether rule is enabled")
|
|
priority: int = Field(0, description="Rule priority (higher = evaluated first)")
|
|
conditions: ScanRuleConditions
|
|
action: ScanRuleAction
|
|
|
|
class Config:
|
|
json_schema_extra = {
|
|
"example": {
|
|
"name": "Japanese anime without Spanish subs",
|
|
"enabled": True,
|
|
"priority": 10,
|
|
"conditions": {
|
|
"audio_language_is": "jpn",
|
|
"missing_embedded_subtitle_lang": "spa",
|
|
"missing_external_subtitle_lang": "spa",
|
|
"file_extension": ".mkv,.mp4"
|
|
},
|
|
"action": {
|
|
"action_type": "transcribe",
|
|
"target_language": "spa",
|
|
"quality_preset": "fast",
|
|
"job_priority": 5
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
class ScanRuleUpdateRequest(BaseModel):
|
|
"""Request to update a scan rule."""
|
|
name: Optional[str] = Field(None, description="Rule name")
|
|
enabled: Optional[bool] = Field(None, description="Whether rule is enabled")
|
|
priority: Optional[int] = Field(None, description="Rule priority")
|
|
conditions: Optional[ScanRuleConditions] = None
|
|
action: Optional[ScanRuleAction] = None
|
|
|
|
|
|
class ScanRuleResponse(BaseModel):
|
|
"""Scan rule response."""
|
|
id: int
|
|
name: str
|
|
enabled: bool
|
|
priority: int
|
|
conditions: dict
|
|
action: dict
|
|
created_at: Optional[str]
|
|
updated_at: Optional[str]
|
|
|
|
|
|
class MessageResponse(BaseModel):
|
|
"""Generic message response."""
|
|
message: str
|
|
|
|
|
|
# === ROUTES ===
|
|
|
|
@router.get("/", response_model=List[ScanRuleResponse])
|
|
async def get_all_rules(enabled_only: bool = False):
|
|
"""
|
|
Get all scan rules.
|
|
|
|
Args:
|
|
enabled_only: Only return enabled rules
|
|
|
|
Returns:
|
|
List of scan rules (ordered by priority DESC)
|
|
"""
|
|
from backend.core.database import database
|
|
from backend.scanning.models import ScanRule
|
|
|
|
with database.get_session() as session:
|
|
query = session.query(ScanRule)
|
|
|
|
if enabled_only:
|
|
query = query.filter(ScanRule.enabled == True)
|
|
|
|
rules = query.order_by(ScanRule.priority.desc()).all()
|
|
|
|
return [ScanRuleResponse(**rule.to_dict()) for rule in rules]
|
|
|
|
|
|
@router.get("/{rule_id}", response_model=ScanRuleResponse)
|
|
async def get_rule(rule_id: int):
|
|
"""
|
|
Get a specific scan rule.
|
|
|
|
Args:
|
|
rule_id: Rule ID
|
|
|
|
Returns:
|
|
Scan rule object
|
|
|
|
Raises:
|
|
404: Rule not found
|
|
"""
|
|
from backend.core.database import database
|
|
from backend.scanning.models import ScanRule
|
|
|
|
with database.get_session() as session:
|
|
rule = session.query(ScanRule).filter(ScanRule.id == rule_id).first()
|
|
|
|
if not rule:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Scan rule {rule_id} not found"
|
|
)
|
|
|
|
return ScanRuleResponse(**rule.to_dict())
|
|
|
|
|
|
@router.post("/", response_model=ScanRuleResponse, status_code=status.HTTP_201_CREATED)
|
|
async def create_rule(request: ScanRuleCreateRequest):
|
|
"""
|
|
Create a new scan rule.
|
|
|
|
Args:
|
|
request: Rule creation request
|
|
|
|
Returns:
|
|
Created rule object
|
|
|
|
Raises:
|
|
400: Invalid data
|
|
409: Rule with same name already exists
|
|
"""
|
|
from backend.core.database import database
|
|
from backend.scanning.models import ScanRule
|
|
|
|
with database.get_session() as session:
|
|
# Check for duplicate name
|
|
existing = session.query(ScanRule).filter(ScanRule.name == request.name).first()
|
|
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail=f"Scan rule with name '{request.name}' already exists"
|
|
)
|
|
|
|
# Create rule
|
|
rule = ScanRule(
|
|
name=request.name,
|
|
enabled=request.enabled,
|
|
priority=request.priority,
|
|
# Conditions
|
|
audio_language_is=request.conditions.audio_language_is,
|
|
audio_language_not=request.conditions.audio_language_not,
|
|
audio_track_count_min=request.conditions.audio_track_count_min,
|
|
has_embedded_subtitle_lang=request.conditions.has_embedded_subtitle_lang,
|
|
missing_embedded_subtitle_lang=request.conditions.missing_embedded_subtitle_lang,
|
|
missing_external_subtitle_lang=request.conditions.missing_external_subtitle_lang,
|
|
file_extension=request.conditions.file_extension,
|
|
# Action
|
|
action_type=request.action.action_type,
|
|
target_language=request.action.target_language,
|
|
quality_preset=request.action.quality_preset,
|
|
job_priority=request.action.job_priority,
|
|
)
|
|
|
|
session.add(rule)
|
|
session.commit()
|
|
session.refresh(rule)
|
|
|
|
logger.info(f"Scan rule created via API: {rule.name} (ID: {rule.id})")
|
|
|
|
return ScanRuleResponse(**rule.to_dict())
|
|
|
|
|
|
@router.put("/{rule_id}", response_model=ScanRuleResponse)
|
|
async def update_rule(rule_id: int, request: ScanRuleUpdateRequest):
|
|
"""
|
|
Update a scan rule.
|
|
|
|
Args:
|
|
rule_id: Rule ID to update
|
|
request: Rule update request
|
|
|
|
Returns:
|
|
Updated rule object
|
|
|
|
Raises:
|
|
404: Rule not found
|
|
409: Name already exists
|
|
"""
|
|
from backend.core.database import database
|
|
from backend.scanning.models import ScanRule
|
|
|
|
with database.get_session() as session:
|
|
rule = session.query(ScanRule).filter(ScanRule.id == rule_id).first()
|
|
|
|
if not rule:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Scan rule {rule_id} not found"
|
|
)
|
|
|
|
# Check for duplicate name
|
|
if request.name and request.name != rule.name:
|
|
existing = session.query(ScanRule).filter(ScanRule.name == request.name).first()
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail=f"Scan rule with name '{request.name}' already exists"
|
|
)
|
|
|
|
# Update fields
|
|
if request.name is not None:
|
|
rule.name = request.name
|
|
if request.enabled is not None:
|
|
rule.enabled = request.enabled
|
|
if request.priority is not None:
|
|
rule.priority = request.priority
|
|
|
|
# Update conditions
|
|
if request.conditions:
|
|
if request.conditions.audio_language_is is not None:
|
|
rule.audio_language_is = request.conditions.audio_language_is
|
|
if request.conditions.audio_language_not is not None:
|
|
rule.audio_language_not = request.conditions.audio_language_not
|
|
if request.conditions.audio_track_count_min is not None:
|
|
rule.audio_track_count_min = request.conditions.audio_track_count_min
|
|
if request.conditions.has_embedded_subtitle_lang is not None:
|
|
rule.has_embedded_subtitle_lang = request.conditions.has_embedded_subtitle_lang
|
|
if request.conditions.missing_embedded_subtitle_lang is not None:
|
|
rule.missing_embedded_subtitle_lang = request.conditions.missing_embedded_subtitle_lang
|
|
if request.conditions.missing_external_subtitle_lang is not None:
|
|
rule.missing_external_subtitle_lang = request.conditions.missing_external_subtitle_lang
|
|
if request.conditions.file_extension is not None:
|
|
rule.file_extension = request.conditions.file_extension
|
|
|
|
# Update action
|
|
if request.action:
|
|
if request.action.action_type is not None:
|
|
rule.action_type = request.action.action_type
|
|
if request.action.target_language is not None:
|
|
rule.target_language = request.action.target_language
|
|
if request.action.quality_preset is not None:
|
|
rule.quality_preset = request.action.quality_preset
|
|
if request.action.job_priority is not None:
|
|
rule.job_priority = request.action.job_priority
|
|
|
|
session.commit()
|
|
session.refresh(rule)
|
|
|
|
logger.info(f"Scan rule updated via API: {rule.name} (ID: {rule.id})")
|
|
|
|
return ScanRuleResponse(**rule.to_dict())
|
|
|
|
|
|
@router.delete("/{rule_id}", response_model=MessageResponse)
|
|
async def delete_rule(rule_id: int):
|
|
"""
|
|
Delete a scan rule.
|
|
|
|
Args:
|
|
rule_id: Rule ID to delete
|
|
|
|
Returns:
|
|
Success message
|
|
|
|
Raises:
|
|
404: Rule not found
|
|
"""
|
|
from backend.core.database import database
|
|
from backend.scanning.models import ScanRule
|
|
|
|
with database.get_session() as session:
|
|
rule = session.query(ScanRule).filter(ScanRule.id == rule_id).first()
|
|
|
|
if not rule:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Scan rule {rule_id} not found"
|
|
)
|
|
|
|
rule_name = rule.name
|
|
session.delete(rule)
|
|
session.commit()
|
|
|
|
logger.info(f"Scan rule deleted via API: {rule_name} (ID: {rule_id})")
|
|
|
|
return MessageResponse(message=f"Scan rule {rule_id} deleted successfully")
|
|
|
|
|
|
@router.post("/{rule_id}/toggle", response_model=ScanRuleResponse)
|
|
async def toggle_rule(rule_id: int):
|
|
"""
|
|
Toggle a scan rule enabled/disabled.
|
|
|
|
Args:
|
|
rule_id: Rule ID to toggle
|
|
|
|
Returns:
|
|
Updated rule object
|
|
|
|
Raises:
|
|
404: Rule not found
|
|
"""
|
|
from backend.core.database import database
|
|
from backend.scanning.models import ScanRule
|
|
|
|
with database.get_session() as session:
|
|
rule = session.query(ScanRule).filter(ScanRule.id == rule_id).first()
|
|
|
|
if not rule:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Scan rule {rule_id} not found"
|
|
)
|
|
|
|
rule.enabled = not rule.enabled
|
|
session.commit()
|
|
session.refresh(rule)
|
|
|
|
logger.info(f"Scan rule toggled via API: {rule.name} -> {'enabled' if rule.enabled else 'disabled'}")
|
|
|
|
return ScanRuleResponse(**rule.to_dict())
|
|
|