Initial commit of document-service
This commit is contained in:
105
app/pdf.py
Normal file
105
app/pdf.py
Normal file
@@ -0,0 +1,105 @@
|
||||
import os
|
||||
from pypdf import PdfReader
|
||||
from typing import Any
|
||||
|
||||
def discover_fields(pdf_path: str) -> list[dict]:
|
||||
"""
|
||||
Introspect a PDF and return all fillable AcroForm fields.
|
||||
Handles any form of AcroForm structure.
|
||||
"""
|
||||
reader = PdfReader(pdf_path)
|
||||
|
||||
# Try multiple methods to get fields
|
||||
fields = None
|
||||
|
||||
# Method 1: Try get_fields() first
|
||||
try:
|
||||
fields = reader.get_fields()
|
||||
except Exception as e:
|
||||
print(f"get_fields() failed: {e}")
|
||||
fields = None
|
||||
|
||||
# Method 2: Try to get fields from AcroForm directly
|
||||
if not fields:
|
||||
try:
|
||||
if "/AcroForm" in reader.trailer["/Root"]:
|
||||
acroform = reader.trailer["/Root"]["/AcroForm"]
|
||||
if "/Fields" in acroform:
|
||||
fields = {}
|
||||
field_array = acroform["/Fields"]
|
||||
for field_ref in field_array:
|
||||
try:
|
||||
field_obj = field_ref.get_object()
|
||||
field_name = field_obj.get("/T", "")
|
||||
if field_name:
|
||||
fields[field_name] = field_obj
|
||||
except Exception as e:
|
||||
print(f"Error processing field: {e}")
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"Direct AcroForm access failed: {e}")
|
||||
fields = None
|
||||
|
||||
# Method 3: Try to get fields from page annotations
|
||||
if not fields:
|
||||
try:
|
||||
fields = {}
|
||||
for page in reader.pages:
|
||||
if "/Annots" in page:
|
||||
for annot in page["/Annots"]:
|
||||
try:
|
||||
annot_obj = annot.get_object()
|
||||
if "/Subtype" in annot_obj and annot_obj["/Subtype"] == "/Widget":
|
||||
field_name = annot_obj.get("/T", "")
|
||||
if field_name and field_name not in fields:
|
||||
fields[field_name] = annot_obj
|
||||
except Exception as e:
|
||||
print(f"Error processing annotation: {e}")
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"Page annotation access failed: {e}")
|
||||
fields = None
|
||||
|
||||
if not fields:
|
||||
return []
|
||||
|
||||
result = []
|
||||
for field_name, field_obj in fields.items():
|
||||
try:
|
||||
field_type = field_obj.get("/FT", "")
|
||||
options = []
|
||||
|
||||
# /Ch = choice field (select/dropdown)
|
||||
if field_type == "/Ch":
|
||||
opt = field_obj.get("/Opt", [])
|
||||
if opt:
|
||||
options = [o if isinstance(o, str) else o[1] for o in opt]
|
||||
|
||||
result.append({
|
||||
"field": field_name,
|
||||
"label": field_name.replace("_", " ").title(),
|
||||
"type": _map_field_type(field_type, field_obj),
|
||||
"required": False,
|
||||
"options": options if options else None
|
||||
})
|
||||
except Exception as e:
|
||||
print(f"Error processing field {field_name}: {e}")
|
||||
continue
|
||||
|
||||
return result
|
||||
|
||||
def _map_field_type(ft: str, field_obj: dict) -> str:
|
||||
mapping = {
|
||||
"/Tx": "string",
|
||||
"/Btn": "boolean",
|
||||
"/Ch": "select",
|
||||
"/Sig": "string"
|
||||
}
|
||||
base = mapping.get(ft, "string")
|
||||
|
||||
# Check if it's a date field by name hint
|
||||
field_name = field_obj.get("/T", "").lower()
|
||||
if any(hint in field_name for hint in ["date", "fecha", "birth", "nacimiento"]):
|
||||
return "date"
|
||||
|
||||
return base
|
||||
Reference in New Issue
Block a user