67 lines
2.4 KiB
Python
67 lines
2.4 KiB
Python
import uuid
|
|
import magic
|
|
from fastapi import HTTPException, UploadFile
|
|
from app.config import settings
|
|
from app.enums import DocumentType
|
|
|
|
def generate_document_id() -> str:
|
|
"""Generate UUID for document"""
|
|
return str(uuid.uuid4())
|
|
|
|
def s3_path_prefix(org_id: str, document_id: str) -> str:
|
|
"""Generate S3 path prefix for document operations"""
|
|
return f"documents/{org_id}/{document_id}/"
|
|
|
|
def detect_content_type(file: UploadFile) -> str:
|
|
"""Detect content type using python-magic"""
|
|
file.file.seek(0)
|
|
content = file.file.read(2048)
|
|
file.file.seek(0)
|
|
|
|
mime = magic.Magic(mime=True)
|
|
return mime.from_buffer(content)
|
|
|
|
def detect_document_type(filename: str, content_type: str) -> DocumentType:
|
|
"""Detect document type from filename and content type"""
|
|
# Try content type first
|
|
doc_type = DocumentType.from_mime_type(content_type)
|
|
if doc_type:
|
|
return doc_type
|
|
|
|
# Fall back to extension
|
|
return DocumentType.from_extension(filename)
|
|
|
|
def get_file_size_limit(document_type: DocumentType) -> int:
|
|
"""Get max file size for document type"""
|
|
limits = {
|
|
DocumentType.PDF: settings.max_file_size_pdf,
|
|
DocumentType.DOCX: settings.max_file_size_docx,
|
|
DocumentType.XLSX: settings.max_file_size_xlsx,
|
|
DocumentType.JPG: settings.max_file_size_jpg,
|
|
DocumentType.JPEG: settings.max_file_size_jpeg,
|
|
DocumentType.PNG: settings.max_file_size_png,
|
|
DocumentType.GIF: settings.max_file_size_gif,
|
|
}
|
|
return limits.get(document_type, settings.max_file_size_default)
|
|
|
|
def validate_file_size(file_size: int, document_type: DocumentType) -> None:
|
|
"""Validate file size against limits"""
|
|
max_size = get_file_size_limit(document_type)
|
|
if file_size > max_size:
|
|
raise HTTPException(
|
|
status_code=413,
|
|
detail=f"File size {file_size} exceeds maximum {max_size} for {document_type.value}"
|
|
)
|
|
|
|
def document_s3_key(org_id: str, document_id: str, filename: str) -> str:
|
|
"""Generate S3 key for document"""
|
|
return f"{s3_path_prefix(org_id, document_id)}{filename}"
|
|
|
|
def sanitize_filename(filename: str) -> str:
|
|
"""Sanitize filename for S3"""
|
|
# Remove path separators and special characters
|
|
filename = filename.replace("/", "_").replace("\\", "_")
|
|
# Keep only safe characters
|
|
safe_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789.-_")
|
|
return "".join(c for c in filename if c in safe_chars)
|