\n<|grounding|>Convert the document to markdown."
tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg')
image.save(tmp.name, 'JPEG', quality=95)
tmp.close()
out_dir = tempfile.mkdtemp()
stdout = sys.stdout
sys.stdout = StringIO()
try:
ocr_model.infer(
tokenizer=ocr_tokenizer,
prompt=prompt,
image_file=tmp.name,
output_path=out_dir,
base_size=config["base_size"],
image_size=config["image_size"],
crop_mode=config["crop_mode"]
)
result = '\n'.join([l for l in sys.stdout.getvalue().split('\n')
if not any(s in l for s in ['image:', 'other:', 'PATCHES', '====', 'BASE:', '%|', 'torch.Size'])]).strip()
finally:
sys.stdout = stdout
try:
os.unlink(tmp.name)
except:
pass
shutil.rmtree(out_dir, ignore_errors=True)
if not result:
return "No text detected"
return clean_output(result, True, True)
def ocr_process_pdf(path, mode, page_num):
doc = fitz.open(path)
total_pages = len(doc)
if page_num < 1 or page_num > total_pages:
doc.close()
return f"Invalid page number. PDF has {total_pages} pages."
page = doc.load_page(page_num - 1)
pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72), alpha=False)
img = Image.open(BytesIO(pix.tobytes("png")))
doc.close()
return ocr_process_image(img, mode)
def ocr_process_file(path, mode, page_num):
if not path:
return "Error: Upload file"
if path.lower().endswith('.pdf'):
return ocr_process_pdf(path, mode, page_num)
else:
return ocr_process_image(Image.open(path), mode)
# ==================== TRANSLATION HELPERS ====================
def split_by_sentences(text: str, max_words: int = 100):
def count_words(t):
return len(t.strip().split())
chunks = []
lines = text.split('\n')
i = 0
while i < len(lines):
line = lines[i]
empty_count = 0
if not line.strip():
while i < len(lines) and not lines[i].strip():
empty_count += 1
i += 1
if chunks:
prev_text, prev_newlines = chunks[-1]
chunks[-1] = (prev_text, prev_newlines + empty_count)
continue
line = line.strip()
is_last_line = (i == len(lines) - 1)
if count_words(line) <= max_words:
chunks.append((line, 0 if is_last_line else 1))
i += 1
continue
sentences = re.split(r'(?<=[.!?])\s+', line)
current_chunk = ""
current_words = 0
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
sentence_words = count_words(sentence)
if sentence_words > max_words:
if current_chunk:
chunks.append((current_chunk.strip(), 0))
current_chunk = ""
current_words = 0
sub_parts = re.split(r',\s*', sentence)
temp_chunk = ""
temp_words = 0
for part in sub_parts:
part_words = count_words(part)
if temp_words + part_words > max_words and temp_chunk:
chunks.append((temp_chunk.strip(), 0))
temp_chunk = part
temp_words = part_words
else:
if temp_chunk:
temp_chunk += ", " + part
else:
temp_chunk = part
temp_words += part_words
if temp_chunk.strip():
current_chunk = temp_chunk.strip()
current_words = temp_words
elif current_words + sentence_words <= max_words:
if current_chunk:
current_chunk += " " + sentence
else:
current_chunk = sentence
current_words += sentence_words
else:
chunks.append((current_chunk.strip(), 0))
current_chunk = sentence
current_words = sentence_words
if current_chunk.strip():
chunks.append((current_chunk.strip(), 0 if is_last_line else 1))
i += 1
return chunks
@spaces.GPU
def translate_chunk(chunk_text):
device = "cuda" if torch.cuda.is_available() else "cpu"
if hasattr(translator, 'model') and hasattr(translator.model, 'to'):
translator.model.to(device)
return translator.translate(chunk_text, max_new_tokens=2048).strip()
def streaming_translate(text: str):
if not text or not text.strip():
yield '⚠️ Vui lòng nhập văn bản tiếng Anh để dịch.
'
return
chunks = split_by_sentences(text, max_words=100)
accumulated = ""
for i, (chunk_text, newline_count) in enumerate(chunks):
try:
translated = translate_chunk(chunk_text)
if accumulated and not accumulated.endswith('\n'):
accumulated += " " + translated
else:
accumulated += translated
chunk_start = len(accumulated) - len(translated)
for j in range(len(translated)):
current_display = accumulated[:chunk_start + j + 1]
html_output = f'{current_display}
'
yield html_output
time.sleep(0.015)
if newline_count > 0:
actual_newlines = min(newline_count, 2)
accumulated += "\n" * actual_newlines
html_output = f'{accumulated}
'
yield html_output
except Exception as e:
yield f'❌ Lỗi dịch chunk {i+1}: {str(e)}
'
return
# ==================== UI HELPERS ====================
def load_image(file_path, page_num_str="1"):
if not file_path:
return None
try:
try:
page_num = int(page_num_str)
except (ValueError, TypeError):
page_num = 1
if file_path.lower().endswith('.pdf'):
doc = fitz.open(file_path)
page_idx = max(0, min(page_num - 1, len(doc) - 1))
page = doc.load_page(page_idx)
pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72), alpha=False)
img = Image.open(BytesIO(pix.tobytes("png")))
doc.close()
return img
else:
return Image.open(file_path)
except Exception as e:
print(f"Error loading image: {e}")
return None
def get_pdf_page_count(file_path):
if not file_path or not file_path.lower().endswith('.pdf'):
return 1
try:
doc = fitz.open(file_path)
count = len(doc)
doc.close()
return count
except Exception as e:
print(f"Error reading PDF page count: {e}")
return 1
def update_page_info(file_path):
if not file_path:
return gr.update(label="Số trang (chỉ dùng cho PDF, mặc định: 1)")
if file_path.lower().endswith('.pdf'):
page_count = get_pdf_page_count(file_path)
return gr.update(
label=f"Số trang (PDF có {page_count} trang, nhập 1-{page_count})",
value="1"
)
return gr.update(
label="Số trang (chỉ dùng cho PDF, mặc định: 1)",
value="1"
)
# ==================== COMBINED OCR + TRANSLATION ====================
def ocr_and_translate_streaming(file_path, mode, page_num_str):
if not file_path:
yield '⚠️ Vui lòng tải file lên trước!
'
return
yield '🔍 Đang quét OCR...
'
try:
try:
page_num = int(page_num_str)
except (ValueError, TypeError):
page_num = 1
markdown = ocr_process_file(file_path, mode, page_num)
if not markdown or markdown.startswith("Error") or markdown.startswith("Invalid"):
yield f'❌ Lỗi OCR: {markdown}
'
return
except Exception as e:
yield f'❌ Lỗi OCR: {str(e)}
'
return
yield '🦀 Đang dịch...
'
time.sleep(0.5)
try:
yield from streaming_translate(markdown)
except Exception as e:
yield f'❌ Lỗi dịch: {str(e)}
'
# ==================== GRADIO INTERFACE ====================
def load_default_example():
src = "images/example1.png"
if not os.path.exists(src):
# fallback: return empty values
return None, None
tmp_path = "/tmp/example1.png"
try:
shutil.copy(src, tmp_path)
except Exception:
# if copy fails, try to use src directly
tmp_path = src
img = Image.open(tmp_path)
return tmp_path, img
with gr.Blocks(theme=gr.themes.Soft(), title="MedCrab Translation") as demo:
gr.Markdown("""
🦀 MedCrab Translation
Quét PDF Y khoa → Dịch trực tiếp sang tiếng Việt (Streaming)
Model: MedCrab-1.5B
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 📤 Tải file lên")
file_in = gr.File(label="PDF hoặc Hình ảnh", file_types=["image", ".pdf"], type="filepath")
input_img = gr.Image(label="Xem trước", type="pil", height=300)
page_input = gr.Textbox(label="Số trang (chỉ dùng cho PDF, mặc định: 1)", value="1", placeholder="Nhập số trang...")
mode = gr.Dropdown(list(MODEL_CONFIGS.keys()), value="Crab", label="Chế độ OCR")
gr.Markdown("### 🦀 Quét và Dịch")
process_btn = gr.Button("🚀 Quét OCR + Dịch tiếng Việt", variant="primary", size="lg")
with gr.Column(scale=2):
gr.Markdown("### 📄 Kết quả dịch tiếng Việt (Streaming)")
translation_output = gr.HTML(label="", value="")
with gr.Accordion("📚 Ví dụ mẫu", open=True):
gr.Markdown("**Thử ngay với các ví dụ có sẵn:**")
gr.Examples(
examples=[
["images/example1.png", "Crab", "1"],
["images/example2.png", "Crab", "1"],
],
inputs=[file_in, mode, page_input],
outputs=[translation_output],
fn=ocr_and_translate_streaming,
cache_examples=False,
label="Nhấp vào ví dụ để thử"
)
with gr.Accordion("⚖️ Giấy phép & Liên hệ", open=False):
gr.Markdown("""
**Giấy phép:** CC BY-NC 4.0
""")
# Events
file_in.change(load_image, [file_in, page_input], [input_img])
file_in.change(update_page_info, [file_in], [page_input])
page_input.change(load_image, [file_in, page_input], [input_img])
process_btn.click(ocr_and_translate_streaming, [file_in, mode, page_input], [translation_output])
# Load default example into both file_in (filepath) and input_img (PIL) when UI starts
demo.load(
load_default_example,
inputs=None,
outputs=[file_in, input_img]
)
if __name__ == "__main__":
print("🚀 Starting MedCrab Translation on Hugging Face Spaces...")
demo.queue(max_size=20).launch()