Files
Transcriptarr/backend/api/scan_rules.py
Dasemu 6272efbcd5 feat(api): add REST API with 45+ endpoints
- Add workers API for pool management
- Add jobs API for queue operations
- Add scan-rules API for CRUD operations
- Add scanner API for control and status
- Add settings API for configuration management
- Add system API for resource monitoring
- Add filesystem API for path browsing
- Add setup wizard API endpoint
2026-01-16 16:57:59 +01:00

352 lines
12 KiB
Python

"""Scan rules management API routes."""
import logging
from typing import List, Optional
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/scan-rules", tags=["scan-rules"])
# === REQUEST/RESPONSE MODELS ===
class ScanRuleConditions(BaseModel):
"""Scan rule conditions."""
audio_language_is: Optional[str] = Field(None, description="Audio language must be (ISO 639-2)")
audio_language_not: Optional[str] = Field(None, description="Audio language must NOT be (comma-separated)")
audio_track_count_min: Optional[int] = Field(None, description="Minimum number of audio tracks")
has_embedded_subtitle_lang: Optional[str] = Field(None, description="Must have embedded subtitle in language")
missing_embedded_subtitle_lang: Optional[str] = Field(None, description="Must NOT have embedded subtitle")
missing_external_subtitle_lang: Optional[str] = Field(None, description="Must NOT have external .srt file")
file_extension: Optional[str] = Field(None, description="File extensions filter (comma-separated)")
class ScanRuleAction(BaseModel):
"""Scan rule action."""
action_type: str = Field("transcribe", description="Action type: transcribe or translate")
target_language: str = Field(..., description="Target subtitle language (ISO 639-2)")
quality_preset: str = Field("fast", description="Quality preset: fast, balanced, best")
job_priority: int = Field(0, description="Priority for created jobs")
class ScanRuleCreateRequest(BaseModel):
"""Request to create a scan rule."""
name: str = Field(..., description="Rule name")
enabled: bool = Field(True, description="Whether rule is enabled")
priority: int = Field(0, description="Rule priority (higher = evaluated first)")
conditions: ScanRuleConditions
action: ScanRuleAction
class Config:
json_schema_extra = {
"example": {
"name": "Japanese anime without Spanish subs",
"enabled": True,
"priority": 10,
"conditions": {
"audio_language_is": "jpn",
"missing_embedded_subtitle_lang": "spa",
"missing_external_subtitle_lang": "spa",
"file_extension": ".mkv,.mp4"
},
"action": {
"action_type": "transcribe",
"target_language": "spa",
"quality_preset": "fast",
"job_priority": 5
}
}
}
class ScanRuleUpdateRequest(BaseModel):
"""Request to update a scan rule."""
name: Optional[str] = Field(None, description="Rule name")
enabled: Optional[bool] = Field(None, description="Whether rule is enabled")
priority: Optional[int] = Field(None, description="Rule priority")
conditions: Optional[ScanRuleConditions] = None
action: Optional[ScanRuleAction] = None
class ScanRuleResponse(BaseModel):
"""Scan rule response."""
id: int
name: str
enabled: bool
priority: int
conditions: dict
action: dict
created_at: Optional[str]
updated_at: Optional[str]
class MessageResponse(BaseModel):
"""Generic message response."""
message: str
# === ROUTES ===
@router.get("/", response_model=List[ScanRuleResponse])
async def get_all_rules(enabled_only: bool = False):
"""
Get all scan rules.
Args:
enabled_only: Only return enabled rules
Returns:
List of scan rules (ordered by priority DESC)
"""
from backend.core.database import database
from backend.scanning.models import ScanRule
with database.get_session() as session:
query = session.query(ScanRule)
if enabled_only:
query = query.filter(ScanRule.enabled == True)
rules = query.order_by(ScanRule.priority.desc()).all()
return [ScanRuleResponse(**rule.to_dict()) for rule in rules]
@router.get("/{rule_id}", response_model=ScanRuleResponse)
async def get_rule(rule_id: int):
"""
Get a specific scan rule.
Args:
rule_id: Rule ID
Returns:
Scan rule object
Raises:
404: Rule not found
"""
from backend.core.database import database
from backend.scanning.models import ScanRule
with database.get_session() as session:
rule = session.query(ScanRule).filter(ScanRule.id == rule_id).first()
if not rule:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Scan rule {rule_id} not found"
)
return ScanRuleResponse(**rule.to_dict())
@router.post("/", response_model=ScanRuleResponse, status_code=status.HTTP_201_CREATED)
async def create_rule(request: ScanRuleCreateRequest):
"""
Create a new scan rule.
Args:
request: Rule creation request
Returns:
Created rule object
Raises:
400: Invalid data
409: Rule with same name already exists
"""
from backend.core.database import database
from backend.scanning.models import ScanRule
with database.get_session() as session:
# Check for duplicate name
existing = session.query(ScanRule).filter(ScanRule.name == request.name).first()
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Scan rule with name '{request.name}' already exists"
)
# Create rule
rule = ScanRule(
name=request.name,
enabled=request.enabled,
priority=request.priority,
# Conditions
audio_language_is=request.conditions.audio_language_is,
audio_language_not=request.conditions.audio_language_not,
audio_track_count_min=request.conditions.audio_track_count_min,
has_embedded_subtitle_lang=request.conditions.has_embedded_subtitle_lang,
missing_embedded_subtitle_lang=request.conditions.missing_embedded_subtitle_lang,
missing_external_subtitle_lang=request.conditions.missing_external_subtitle_lang,
file_extension=request.conditions.file_extension,
# Action
action_type=request.action.action_type,
target_language=request.action.target_language,
quality_preset=request.action.quality_preset,
job_priority=request.action.job_priority,
)
session.add(rule)
session.commit()
session.refresh(rule)
logger.info(f"Scan rule created via API: {rule.name} (ID: {rule.id})")
return ScanRuleResponse(**rule.to_dict())
@router.put("/{rule_id}", response_model=ScanRuleResponse)
async def update_rule(rule_id: int, request: ScanRuleUpdateRequest):
"""
Update a scan rule.
Args:
rule_id: Rule ID to update
request: Rule update request
Returns:
Updated rule object
Raises:
404: Rule not found
409: Name already exists
"""
from backend.core.database import database
from backend.scanning.models import ScanRule
with database.get_session() as session:
rule = session.query(ScanRule).filter(ScanRule.id == rule_id).first()
if not rule:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Scan rule {rule_id} not found"
)
# Check for duplicate name
if request.name and request.name != rule.name:
existing = session.query(ScanRule).filter(ScanRule.name == request.name).first()
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Scan rule with name '{request.name}' already exists"
)
# Update fields
if request.name is not None:
rule.name = request.name
if request.enabled is not None:
rule.enabled = request.enabled
if request.priority is not None:
rule.priority = request.priority
# Update conditions
if request.conditions:
if request.conditions.audio_language_is is not None:
rule.audio_language_is = request.conditions.audio_language_is
if request.conditions.audio_language_not is not None:
rule.audio_language_not = request.conditions.audio_language_not
if request.conditions.audio_track_count_min is not None:
rule.audio_track_count_min = request.conditions.audio_track_count_min
if request.conditions.has_embedded_subtitle_lang is not None:
rule.has_embedded_subtitle_lang = request.conditions.has_embedded_subtitle_lang
if request.conditions.missing_embedded_subtitle_lang is not None:
rule.missing_embedded_subtitle_lang = request.conditions.missing_embedded_subtitle_lang
if request.conditions.missing_external_subtitle_lang is not None:
rule.missing_external_subtitle_lang = request.conditions.missing_external_subtitle_lang
if request.conditions.file_extension is not None:
rule.file_extension = request.conditions.file_extension
# Update action
if request.action:
if request.action.action_type is not None:
rule.action_type = request.action.action_type
if request.action.target_language is not None:
rule.target_language = request.action.target_language
if request.action.quality_preset is not None:
rule.quality_preset = request.action.quality_preset
if request.action.job_priority is not None:
rule.job_priority = request.action.job_priority
session.commit()
session.refresh(rule)
logger.info(f"Scan rule updated via API: {rule.name} (ID: {rule.id})")
return ScanRuleResponse(**rule.to_dict())
@router.delete("/{rule_id}", response_model=MessageResponse)
async def delete_rule(rule_id: int):
"""
Delete a scan rule.
Args:
rule_id: Rule ID to delete
Returns:
Success message
Raises:
404: Rule not found
"""
from backend.core.database import database
from backend.scanning.models import ScanRule
with database.get_session() as session:
rule = session.query(ScanRule).filter(ScanRule.id == rule_id).first()
if not rule:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Scan rule {rule_id} not found"
)
rule_name = rule.name
session.delete(rule)
session.commit()
logger.info(f"Scan rule deleted via API: {rule_name} (ID: {rule_id})")
return MessageResponse(message=f"Scan rule {rule_id} deleted successfully")
@router.post("/{rule_id}/toggle", response_model=ScanRuleResponse)
async def toggle_rule(rule_id: int):
"""
Toggle a scan rule enabled/disabled.
Args:
rule_id: Rule ID to toggle
Returns:
Updated rule object
Raises:
404: Rule not found
"""
from backend.core.database import database
from backend.scanning.models import ScanRule
with database.get_session() as session:
rule = session.query(ScanRule).filter(ScanRule.id == rule_id).first()
if not rule:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Scan rule {rule_id} not found"
)
rule.enabled = not rule.enabled
session.commit()
session.refresh(rule)
logger.info(f"Scan rule toggled via API: {rule.name} -> {'enabled' if rule.enabled else 'disabled'}")
return ScanRuleResponse(**rule.to_dict())