This commit is contained in:
2025-09-27 22:57:59 +08:00
parent 8a458ff0a4
commit ed26244cdb
12585 changed files with 1914308 additions and 3474 deletions

View File

@@ -0,0 +1,3 @@
"""EAN-13 条形码识别应用包初始化。"""

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,21 @@
import os
import platform
from typing import Any, Dict
import yaml
def load_config(config_path: str = "config/config.yaml") -> Dict[str, Any]:
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
# 动态选择中文字体
sys_name = platform.system().lower()
if sys_name.startswith("win"):
config.setdefault("font", {})["selected"] = config["font"].get("windows")
elif sys_name.startswith("darwin") or sys_name.startswith("mac"):
config.setdefault("font", {})["selected"] = config["font"].get("macos")
else:
config.setdefault("font", {})["selected"] = config["font"].get("linux")
return config

View File

@@ -0,0 +1,204 @@
from typing import List, Optional, Tuple
import numpy as np
# EAN-13 编码表L/G/R 模式),每个数字对应 4 个模块7 宽度)内的宽窄模式
# 采用 7 位宽度单元1 表示黑0 表示白。此处用字符串仅做查表,不做模拟。
L_CODES = {
"0": "0001101",
"1": "0011001",
"2": "0010011",
"3": "0111101",
"4": "0100011",
"5": "0110001",
"6": "0101111",
"7": "0111011",
"8": "0110111",
"9": "0001011",
}
G_CODES = {
"0": "0100111",
"1": "0110011",
"2": "0011011",
"3": "0100001",
"4": "0011101",
"5": "0111001",
"6": "0000101",
"7": "0010001",
"8": "0001001",
"9": "0010111",
}
R_CODES = {
"0": "1110010",
"1": "1100110",
"2": "1101100",
"3": "1000010",
"4": "1011100",
"5": "1001110",
"6": "1010000",
"7": "1000100",
"8": "1001000",
"9": "1110100",
}
# 左侧 6 位的奇偶模式用来编码首位数字
LEADING_PARITY_TO_FIRST = {
"LLLLLL": "0",
"LLGLGG": "1",
"LLGGLG": "2",
"LLGGGL": "3",
"LGLLGG": "4",
"LGGLLG": "5",
"LGGGLL": "6",
"LGLGLG": "7",
"LGLGGL": "8",
"LGGLGL": "9",
}
def _normalize_run_lengths(line: np.ndarray, total_modules: int) -> Tuple[np.ndarray, List[int]]:
# 将行强度阈值化为黑白,再统计 run-length然后按照总模块数归一化为 95 个模块
# 使用中位数作为阈值以抵抗亮度变化
threshold = np.median(line)
binary = (line < threshold).astype(np.uint8) # 黑为 1
# run-length 编码
values = binary.tolist()
runs: List[int] = []
last = values[0]
length = 1
for v in values[1:]:
if v == last:
length += 1
else:
runs.append(length)
last = v
length = 1
runs.append(length)
# 放缩为 total_modules 模块
total_pixels = float(sum(runs))
if total_pixels <= 0:
return binary, runs
scale = total_modules / total_pixels
scaled = [max(1, int(round(r * scale))) for r in runs]
# 对齐长度
diff = total_modules - sum(scaled)
if diff != 0:
# 简单补偿到首个 run
scaled[0] = max(1, scaled[0] + diff)
# 展开为模块级二进制
expanded = []
color = binary[0] # 起始颜色
for r in scaled:
expanded.extend([color] * r)
color = 1 - color
return np.array(expanded[:total_modules], dtype=np.uint8), scaled
def _find_guards(bits: np.ndarray, tol: float) -> Optional[Tuple[int, int, int, int]]:
# 守卫位模式:左 101中 01010右 101
# 以模块位寻找
s = ''.join('1' if b else '0' for b in bits.tolist())
# 直接匹配应对理想情况,否则滑窗匹配
# 找左 101
left_pos = s.find('101')
if left_pos == -1:
return None
# 找中间 01010需位于左与右之间
mid_pos = s.find('01010', left_pos + 3)
if mid_pos == -1:
return None
# 找右 101
right_pos = s.find('101', mid_pos + 5)
if right_pos == -1:
return None
return left_pos, mid_pos, right_pos, right_pos + 3
def _bits_to_digit(bits: np.ndarray, tables: List[Tuple[str, dict]]) -> Optional[Tuple[str, str]]:
pattern = ''.join('1' if b else '0' for b in bits.tolist())
for parity, table in tables:
for d, code in table.items():
if pattern == code:
return d, parity
return None
def decode_ean13_from_row(bits_row: np.ndarray) -> Optional[str]:
# 输入为 0/1 模块位数组,长度应为 95
if bits_row.size != 95:
return None
guards = _find_guards(bits_row, tol=0.35)
if not guards:
return None
left_start, mid_start, right_start, right_end = guards
# 划分区域
left_data = bits_row[left_start + 3 : mid_start]
right_data = bits_row[mid_start + 5 : right_start]
# 左右各 6 个数字,每个 7 位
if left_data.size != 6 * 7 or right_data.size != 6 * 7:
return None
digits_left: List[str] = []
parity_seq: List[str] = []
for i in range(6):
seg = left_data[i * 7 : (i + 1) * 7]
ret = _bits_to_digit(seg, [("L", L_CODES), ("G", G_CODES)])
if not ret:
return None
d, parity = ret
digits_left.append(d)
parity_seq.append(parity)
parity_str = ''.join(parity_seq)
first_digit = LEADING_PARITY_TO_FIRST.get(parity_str)
if first_digit is None:
return None
digits_right: List[str] = []
for i in range(6):
seg = right_data[i * 7 : (i + 1) * 7]
ret = _bits_to_digit(seg, [("R", R_CODES)])
if not ret:
return None
d, _ = ret
digits_right.append(d)
code_12 = first_digit + ''.join(digits_left) + ''.join(digits_right[:-1])
check_digit = int(digits_right[-1])
# 校验位计算
s = 0
for idx, ch in enumerate(code_12):
v = int(ch)
if (idx + 1) % 2 == 0:
s += v * 3
else:
s += v
calc = (10 - (s % 10)) % 10
if calc != check_digit:
return None
return code_12 + str(check_digit)
def sample_and_decode(warped_gray: np.ndarray, sample_rows: List[float], total_modules: int) -> Optional[str]:
h, w = warped_gray.shape[:2]
results: List[str] = []
for r in sample_rows:
row_y = min(h - 1, max(0, int(round(h * r))))
line = warped_gray[row_y, :].astype(np.float32)
# 直方图均衡增强对比
line_eq = line
# 归一化为 0..255
if line_eq.max() > line_eq.min():
line_eq = (line_eq - line_eq.min()) / (line_eq.max() - line_eq.min()) * 255.0
bits, _ = _normalize_run_lengths(line_eq, total_modules)
if bits.size != total_modules:
continue
code = decode_ean13_from_row(bits)
if code:
results.append(code)
if not results:
return None
# 取众数
vals, counts = np.unique(np.array(results), return_counts=True)
return vals[int(np.argmax(counts))]

View File

@@ -0,0 +1,84 @@
from typing import List, Optional, Tuple
import cv2
import numpy as np
def resize_keep_aspect(image: np.ndarray, target_width: int) -> np.ndarray:
if target_width <= 0:
return image
h, w = image.shape[:2]
if w == target_width:
return image
scale = target_width / float(w)
new_size = (target_width, int(round(h * scale)))
return cv2.resize(image, new_size, interpolation=cv2.INTER_AREA)
def enhance_barcode_stripes(gray: np.ndarray, gaussian_ksize: int, sobel_ksize: int) -> np.ndarray:
g = gray
if gaussian_ksize and gaussian_ksize > 1:
g = cv2.GaussianBlur(g, (gaussian_ksize, gaussian_ksize), 0)
# 使用水平 Sobel 捕捉垂直边缘(条纹)
grad_x = cv2.Sobel(g, cv2.CV_32F, 1, 0, ksize=sobel_ksize)
grad_x = cv2.convertScaleAbs(grad_x)
return grad_x
def binarize_image(img: np.ndarray, method: str = "otsu") -> np.ndarray:
if method == "adaptive":
return cv2.adaptiveThreshold(
img, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 31, 10
)
# 默认 OTSU
_, th = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU)
return th
def morph_close(img: np.ndarray, kernel_size: int) -> np.ndarray:
if kernel_size <= 1:
return img
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size, kernel_size))
closed = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)
return closed
def find_barcode_roi(binary: np.ndarray, original_shape: Tuple[int, int], min_area_ratio: float, min_wh_ratio: float) -> Optional[Tuple[np.ndarray, Tuple[int, int, int, int]]]:
h, w = original_shape
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
image_area = h * w
candidates: List[Tuple[float, Tuple[int, int, int, int]]] = []
for cnt in contours:
x, y, cw, ch = cv2.boundingRect(cnt)
area = cw * ch
if area / image_area < min_area_ratio:
continue
wh_ratio = cw / float(ch + 1e-6)
if wh_ratio < min_wh_ratio:
continue
candidates.append((area, (x, y, cw, ch)))
if not candidates:
return None
candidates.sort(key=lambda t: t[0], reverse=True)
_, bbox = candidates[0]
x, y, cw, ch = bbox
roi = binary[y : y + ch, x : x + cw]
return roi, bbox
def warp_barcode_region(gray: np.ndarray, bbox: Tuple[int, int, int, int], target_height: int, crop_bottom_ratio: float = 0.0) -> np.ndarray:
x, y, cw, ch = bbox
crop = gray[y : y + ch, x : x + cw]
# 去除底部数字区域干扰
if 0 < crop_bottom_ratio < 1:
hb = int(round(ch * (1.0 - crop_bottom_ratio)))
hb = max(10, min(ch, hb))
crop = crop[:hb, :]
if target_height <= 0:
return crop
scale = target_height / float(ch)
target_width = int(round(cw * scale))
warped = cv2.resize(crop, (target_width, target_height), interpolation=cv2.INTER_CUBIC)
return warped

View File

@@ -0,0 +1,39 @@
from typing import Optional
import os
import logging
import numpy as np
import cv2
def read_image_bgr(path: str) -> Optional[np.ndarray]:
"""读取图片为 BGR兼容中文/非 ASCII 路径)。
优先使用 np.fromfile + cv2.imdecode 规避 Windows 路径编码问题,
若失败再回退到 cv2.imread。
"""
logger = logging.getLogger(__name__)
if not path:
logger.warning("read_image_bgr 收到空路径")
return None
# 优先使用 fromfile 方案,处理中文路径
try:
data = np.fromfile(path, dtype=np.uint8)
if data.size > 0:
img = cv2.imdecode(data, cv2.IMREAD_COLOR)
if img is not None:
logger.debug("read_image_bgr 使用 fromfile 解码成功: %s", path)
return img
except Exception as e:
logger.exception("read_image_bgr fromfile 失败: %s", e)
# 回退到 imread
try:
img = cv2.imread(path, cv2.IMREAD_COLOR)
if img is None:
logger.warning("read_image_bgr imread 返回 None: %s", path)
return img
except Exception as e:
logger.exception("read_image_bgr imread 异常: %s", e)
return None

View File

@@ -0,0 +1,56 @@
import logging
import os
from logging.handlers import RotatingFileHandler
from typing import Any, Dict
def setup_logging(config: Dict[str, Any]) -> None:
"""根据配置初始化日志。
- 控制台输出
- 可选文件轮转输出(默认启用)
- 不重复添加 handler幂等
"""
debug_cfg = (config or {}).get("debug", {})
log_level_name = str(debug_cfg.get("log_level", "INFO")).upper()
log_to_file = bool(debug_cfg.get("log_to_file", True))
out_dir = debug_cfg.get("out_dir", "debug_out")
file_name = debug_cfg.get("file_name", "txm.log")
max_bytes = int(debug_cfg.get("max_bytes", 10 * 1024 * 1024))
backup_count = int(debug_cfg.get("backup_count", 5))
try:
level = getattr(logging, log_level_name)
except Exception:
level = logging.INFO
root_logger = logging.getLogger()
if root_logger.handlers:
# 已经初始化过
root_logger.setLevel(level)
return
fmt = (
"%(asctime)s.%(msecs)03d | %(levelname)s | %(name)s | "
"%(message)s"
)
datefmt = "%Y-%m-%d %H:%M:%S"
root_logger.setLevel(level)
console = logging.StreamHandler()
console.setLevel(level)
console.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt))
root_logger.addHandler(console)
if log_to_file:
os.makedirs(out_dir, exist_ok=True)
file_path = os.path.join(out_dir, file_name)
file_handler = RotatingFileHandler(
file_path, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8"
)
file_handler.setLevel(level)
file_handler.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt))
root_logger.addHandler(file_handler)

139
backend/txm/app/pipeline.py Normal file
View File

@@ -0,0 +1,139 @@
from typing import Optional, List, Dict
import cv2
import logging
from .config_loader import load_config
from .image_processing import (
binarize_image,
enhance_barcode_stripes,
find_barcode_roi,
morph_close,
resize_keep_aspect,
warp_barcode_region,
)
from .ean13_decoder import sample_and_decode
from .io_utils import read_image_bgr
from .pyzbar_engine import decode_with_pyzbar
class EAN13Recognizer:
def __init__(self, config_path: str = "config/config.yaml") -> None:
self.logger = logging.getLogger(self.__class__.__name__)
self.config = load_config(config_path)
self.logger.debug("配置加载完成: preprocess=%s, roi=%s, decoder=%s", self.config.get("preprocess"), self.config.get("roi"), self.config.get("decoder"))
def recognize_from_path(self, image_path: str) -> Optional[str]:
image = read_image_bgr(image_path)
if image is None:
self.logger.warning("读取图像失败: path=%s", image_path)
return None
return self.recognize_from_image(image)
def _recognize_with_pyzbar(self, img) -> Optional[str]:
decoder_cfg = self.config["decoder"]
pyz_res = decode_with_pyzbar(
img,
try_invert=decoder_cfg.get("try_invert", True),
rotations=decoder_cfg.get("rotations", [0, 90, 180, 270]),
)
if pyz_res:
self.logger.debug("pyzbar 识别到 %d 条结果", len(pyz_res))
for r in pyz_res:
if r.get("type") in ("EAN13", "EAN-13") and r.get("code"):
self.logger.info("pyzbar 命中 EAN13: %s", r.get("code"))
return r.get("code")
return None
def recognize_from_image(self, image) -> Optional[str]:
cfg = self.config
preprocess_cfg = cfg["preprocess"]
roi_cfg = cfg["roi"]
decoder_cfg = cfg["decoder"]
img = resize_keep_aspect(image, preprocess_cfg.get("resize_width", 0))
self.logger.debug("输入尺寸=%s, 预处理后尺寸=%s", getattr(image, 'shape', None), getattr(img, 'shape', None))
# 先按引擎优先级尝试 Pyzbar
engine_order: List[str] = decoder_cfg.get("engine_order", ["pyzbar", "ean13"])
if "pyzbar" in engine_order:
code = self._recognize_with_pyzbar(img)
if code:
return code
if "ean13" not in engine_order:
return None
# 回退到自研 EAN-13 解码
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
grad = enhance_barcode_stripes(
gray,
gaussian_ksize=preprocess_cfg.get("gaussian_blur_ksize", 3),
sobel_ksize=preprocess_cfg.get("sobel_ksize", 3),
)
bin_img = binarize_image(grad, preprocess_cfg.get("binarize", "otsu"))
closed = morph_close(bin_img, preprocess_cfg.get("close_kernel", 21))
roi_result = find_barcode_roi(
closed,
original_shape=gray.shape[:2],
min_area_ratio=roi_cfg.get("min_area_ratio", 0.01),
min_wh_ratio=roi_cfg.get("min_wh_ratio", 2.0),
)
if not roi_result:
self.logger.debug("未找到条码 ROI")
return None
_, bbox = roi_result
self.logger.debug("ROI bbox=%s", bbox)
warped = warp_barcode_region(
gray,
bbox,
roi_cfg.get("warp_target_height", 120),
crop_bottom_ratio=roi_cfg.get("crop_bottom_ratio", 0.0),
)
self.logger.debug("透视矫正后尺寸=%s", getattr(warped, 'shape', None))
code = sample_and_decode(
warped,
decoder_cfg.get("sample_rows", [0.5]),
decoder_cfg.get("total_modules", 95),
)
if code:
self.logger.info("自研 EAN13 解码成功: %s", code)
else:
self.logger.debug("自研 EAN13 解码失败")
return code
def recognize_any_from_image(self, image) -> Dict[str, object]:
cfg = self.config
decoder_cfg = cfg["decoder"]
img = resize_keep_aspect(image, cfg["preprocess"].get("resize_width", 0))
# Pyzbar 全量识别
pyz_res = decode_with_pyzbar(
img,
try_invert=decoder_cfg.get("try_invert", True),
rotations=decoder_cfg.get("rotations", [0, 90, 180, 270]),
)
self.logger.debug("pyzbar 返回 %d 条结果", len(pyz_res) if isinstance(pyz_res, list) else 0)
ean13 = ""
for r in pyz_res:
if r.get("type") in ("EAN13", "EAN-13") and r.get("code"):
ean13 = r.get("code")
break
others = [r for r in pyz_res if r.get("type") not in ("EAN13", "EAN-13")]
if not ean13:
e = self.recognize_from_image(img)
if e:
ean13 = e
if ean13:
self.logger.info("recognize_any 命中 EAN13: %s, others=%d", ean13, len(others))
else:
self.logger.debug("recognize_any 未命中 EAN13, others=%d", len(others))
return {"ean13": ean13, "others": others}
def recognize_any_from_path(self, image_path: str) -> Dict[str, object]:
image = read_image_bgr(image_path)
if image is None:
return {"ean13": "", "others": []}
return self.recognize_any_from_image(image)

View File

@@ -0,0 +1,82 @@
from typing import Dict, List, Tuple
import cv2
import numpy as np
import logging
from pyzbar.pyzbar import decode, ZBarSymbol
def _prepare_images(gray: np.ndarray, try_invert: bool, rotations: List[int]) -> List[Tuple[np.ndarray, int, bool]]:
# 返回 (图像, 旋转角度, 是否反色)
images: List[Tuple[np.ndarray, int, bool]] = []
for ang in rotations:
if ang % 360 == 0:
rot = gray
elif ang % 360 == 90:
rot = cv2.rotate(gray, cv2.ROTATE_90_CLOCKWISE)
elif ang % 360 == 180:
rot = cv2.rotate(gray, cv2.ROTATE_180)
elif ang % 360 == 270:
rot = cv2.rotate(gray, cv2.ROTATE_90_COUNTERCLOCKWISE)
else:
# 任意角度插值旋转
h, w = gray.shape[:2]
M = cv2.getRotationMatrix2D((w / 2, h / 2), ang, 1.0)
rot = cv2.warpAffine(gray, M, (w, h), flags=cv2.INTER_LINEAR)
images.append((rot, ang, False))
if try_invert:
images.append((cv2.bitwise_not(rot), ang, True))
return images
def _collect_supported_symbols() -> List[ZBarSymbol]:
names = [
"EAN13",
"EAN8",
"UPCA",
"UPCE",
"CODE128",
"CODE39",
"QRCODE",
# 兼容不同版本:有的叫 ITF有的叫 I25
"ITF",
"I25",
]
symbols: List[ZBarSymbol] = []
for n in names:
if hasattr(ZBarSymbol, n):
symbols.append(getattr(ZBarSymbol, n))
# 若列表为空,退回由 zbar 自行决定的默认集合
return symbols
def decode_with_pyzbar(image_bgr: np.ndarray, try_invert: bool, rotations: List[int]) -> List[Dict[str, str]]:
logger = logging.getLogger("pyzbar_engine")
# 输入 BGR转灰度
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
results: List[Dict[str, str]] = []
symbols = _collect_supported_symbols()
logger.debug("调用 pyzbar: symbols=%d, rotations=%s, try_invert=%s", len(symbols) if symbols else 0, rotations, try_invert)
for img, ang, inv in _prepare_images(gray, try_invert=try_invert, rotations=rotations):
# pyzbar 要求 8bit 图像
decoded = decode(img, symbols=symbols or None)
for obj in decoded:
data = obj.data.decode("utf-8", errors="ignore")
typ = obj.type
results.append({"type": typ, "code": data})
if results:
# 若当前设置已识别出内容,继续下一个旋转场景以收集更多,但不必反复
# 这里不提前返回,以便尽量收集多结果
pass
# 去重(按 type+code
uniq = []
seen = set()
for r in results:
key = (r["type"], r["code"]) if isinstance(r, dict) else (None, None)
if key not in seen:
seen.add(key)
uniq.append(r)
logger.debug("pyzbar 返回结果数: %d", len(uniq))
return uniq

View File

@@ -0,0 +1,2 @@

Binary file not shown.

View File

@@ -0,0 +1,103 @@
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
import uvicorn
import io
import time
import logging
import numpy as np
import cv2
from ..config_loader import load_config
from ..pipeline import EAN13Recognizer
from ..logging_utils import setup_logging
config = load_config()
setup_logging(config)
logger = logging.getLogger(__name__)
app = FastAPI(title="条形码识别端口 API", version="1.0.0")
recognizer = EAN13Recognizer()
@app.post("/recognize/ean13")
async def recognize_ean13(file: UploadFile = File(...)):
# 上传大小简易校验
t0 = time.time()
content = await file.read()
max_bytes = int(config["app"]["server"]["max_upload_mb"]) * 1024 * 1024
if len(content) > max_bytes:
logger.warning("/recognize/ean13 上传超限: size=%d, limit=%d", len(content), max_bytes)
raise HTTPException(status_code=413, detail="文件过大")
# 读取为图像
data = np.frombuffer(content, dtype=np.uint8)
img = cv2.imdecode(data, cv2.IMREAD_COLOR)
if img is None:
logger.error("/recognize/ean13 解码为图像失败, filename=%s, size=%d", getattr(file, 'filename', ''), len(content))
raise HTTPException(status_code=400, detail="无法解析为图像")
logger.debug("/recognize/ean13 收到图像: shape=%s, dtype=%s", getattr(img, 'shape', None), getattr(img, 'dtype', None))
merged = recognizer.recognize_any_from_image(img)
code = merged.get("ean13") or ""
resp = {
"code": code,
"type": "EAN13" if code else "",
"others": merged.get("others", []),
"message": "ok" if code or merged.get("others") else "未识别",
}
logger.info("/recognize/ean13 识别完成: code=%s, others=%d, cost_ms=%.1f", code, len(merged.get("others", []) or []), (time.time()-t0)*1000)
return JSONResponse(resp)
@app.post("/api/barcode/scan")
async def api_barcode_scan(file: UploadFile = File(...)):
t0 = time.time()
content = await file.read()
max_bytes = int(config["app"]["server"]["max_upload_mb"]) * 1024 * 1024
if len(content) > max_bytes:
logger.warning("/api/barcode/scan 上传超限: size=%d, limit=%d", len(content), max_bytes)
raise HTTPException(status_code=413, detail="文件过大")
data = np.frombuffer(content, dtype=np.uint8)
img = cv2.imdecode(data, cv2.IMREAD_COLOR)
if img is None:
logger.error("/api/barcode/scan 解码为图像失败, filename=%s, size=%d", getattr(file, 'filename', ''), len(content))
raise HTTPException(status_code=400, detail="无法解析为图像")
logger.debug("/api/barcode/scan 收到图像: shape=%s, dtype=%s", getattr(img, 'shape', None), getattr(img, 'dtype', None))
merged = recognizer.recognize_any_from_image(img)
ean13 = merged.get("ean13") or ""
others = merged.get("others", []) or []
# 优先返回 EAN-13否则回退到任意码制的第一个结果
if ean13:
resp = {
"success": True,
"barcodeType": "EAN13",
"barcode": ean13,
"others": others,
}
logger.info("/api/barcode/scan 命中 EAN13: code=%s, others=%d, cost_ms=%.1f", ean13, len(others), (time.time()-t0)*1000)
return JSONResponse(resp)
if isinstance(others, list) and others:
first = others[0] if isinstance(others[0], dict) else None
if first and first.get("code"):
resp = {
"success": True,
"barcodeType": first.get("type", ""),
"barcode": first.get("code", ""),
"others": others,
}
logger.info("/api/barcode/scan 命中非 EAN: type=%s, code=%s, cost_ms=%.1f", first.get("type", ""), first.get("code", ""), (time.time()-t0)*1000)
return JSONResponse(resp)
logger.warning("/api/barcode/scan 未识别, others=%d, cost_ms=%.1f", len(others), (time.time()-t0)*1000)
return JSONResponse({"success": False, "message": "无法识别,请重新上传"}, status_code=400)
def main():
host = config["app"]["server"]["host"]
port = int(config["app"]["server"]["port"])
logger.info("启动 FastAPI 服务器: %s:%d", host, port)
uvicorn.run("app.server.main:app", host=host, port=port, reload=False)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,2 @@

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,162 @@
import os
import tkinter as tk
from tkinter import filedialog, messagebox
from typing import Optional
import cv2
from PIL import Image, ImageTk
from ..config_loader import load_config
from ..io_utils import read_image_bgr
from ..pipeline import EAN13Recognizer
class TkEAN13App:
def __init__(self) -> None:
self.config = load_config()
self.root = tk.Tk()
self.root.title(self.config["app"]["ui"]["window_title"])
self.image_panel = tk.Label(self.root)
self.image_panel.pack(side=tk.TOP, padx=10, pady=10)
btn_frame = tk.Frame(self.root)
btn_frame.pack(side=tk.TOP, pady=5)
self.btn_open = tk.Button(btn_frame, text="选择图片", command=self.on_open)
self.btn_open.pack(side=tk.LEFT, padx=5)
self.btn_recognize = tk.Button(btn_frame, text="识别条码", command=self.on_recognize)
self.btn_recognize.pack(side=tk.LEFT, padx=5)
self.btn_camera = tk.Button(btn_frame, text="摄像头识别", command=self.on_camera)
self.btn_camera.pack(side=tk.LEFT, padx=5)
self.result_var = tk.StringVar(value="结果:-")
self.result_label = tk.Label(self.root, textvariable=self.result_var)
self.result_label.pack(side=tk.TOP, pady=5)
self.current_image_path: Optional[str] = None
self.recognizer = EAN13Recognizer()
self.cap = None
self.cam_running = False
def on_open(self) -> None:
path = filedialog.askopenfilename(
title="选择条码图片",
filetypes=[("Image Files", "*.png;*.jpg;*.jpeg;*.bmp;*.tif;*.tiff")],
)
if not path:
return
self.current_image_path = path
img_bgr = read_image_bgr(path)
if img_bgr is None:
messagebox.showerror("错误", "无法读取图片")
return
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
# 限制显示尺寸
max_w = 800
h, w = img_rgb.shape[:2]
if w > max_w:
scale = max_w / float(w)
img_rgb = cv2.resize(img_rgb, (max_w, int(h * scale)), interpolation=cv2.INTER_AREA)
im = Image.fromarray(img_rgb)
self.tk_img = ImageTk.PhotoImage(im)
self.image_panel.configure(image=self.tk_img)
self.result_var.set("结果:-")
def on_recognize(self) -> None:
if not self.current_image_path:
messagebox.showinfo("提示", "请先选择图片")
return
result = self.recognizer.recognize_any_from_path(self.current_image_path)
ean13 = result.get("ean13", "")
if ean13:
self.result_var.set(f"结果EAN-13 {ean13}")
return
others = result.get("others", [])
if others:
first = others[0]
self.result_var.set(f"结果:{first.get('type')} {first.get('code')}")
return
self.result_var.set("结果:未识别")
def on_camera(self) -> None:
if self.cam_running:
# 若已在运行,视为停止
self.stop_camera()
return
cam_cfg = self.config.get("camera", {})
index = int(cam_cfg.get("index", 0))
self.cap = cv2.VideoCapture(index, cv2.CAP_DSHOW)
if not self.cap.isOpened():
messagebox.showerror("错误", f"无法打开摄像头 index={index}")
self.cap.release()
self.cap = None
return
# 设置分辨率(尽力设置)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, float(cam_cfg.get("width", 1280)))
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, float(cam_cfg.get("height", 720)))
self.cam_running = True
self.result_var.set("结果:摄像头开启,正在识别...")
self.btn_camera.configure(text="停止摄像头")
self._camera_loop()
def stop_camera(self) -> None:
self.cam_running = False
try:
if self.cap is not None:
self.cap.release()
finally:
self.cap = None
self.btn_camera.configure(text="摄像头识别")
def _camera_loop(self) -> None:
if not self.cam_running or self.cap is None:
return
ret, frame = self.cap.read()
if not ret:
# 读取失败,稍后重试
self.root.after(int(self.config.get("camera", {}).get("interval_ms", 80)), self._camera_loop)
return
# 显示到面板
show = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
max_w = 800
h, w = show.shape[:2]
if w > max_w:
scale = max_w / float(w)
show = cv2.resize(show, (max_w, int(h * scale)), interpolation=cv2.INTER_AREA)
im = Image.fromarray(show)
self.tk_img = ImageTk.PhotoImage(im)
self.image_panel.configure(image=self.tk_img)
# 识别
result = self.recognizer.recognize_any_from_image(frame)
ean13 = result.get("ean13", "")
if ean13:
self.result_var.set(f"结果EAN-13 {ean13}")
self.stop_camera()
return
others = result.get("others", [])
if others:
first = others[0]
self.result_var.set(f"结果:{first.get('type')} {first.get('code')}")
self.stop_camera()
return
# 未识别,继续下一帧
self.root.after(int(self.config.get("camera", {}).get("interval_ms", 80)), self._camera_loop)
def run(self) -> None:
self.root.mainloop()
def main() -> None:
app = TkEAN13App()
app.run()
if __name__ == "__main__":
main()