feat: AI套图分层方案 + Gemini集成 - 4种图案类型处理 + 正片叠底 + 宽高比 + 模型选择

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
2026-02-07 16:59:56 +08:00
parent 12395d8eca
commit dae906aba7
277 changed files with 15009 additions and 19922 deletions

620
Server/pltreader.py Normal file
View File

@@ -0,0 +1,620 @@
'''
参考资料:
https://www.gnu.org/software/hp2xx/hp2xx.html
https://zhuanlan.zhihu.com/p/622090369
'''
import os
import typing
import string
import re
import warnings
from enum import Enum
import shapely
import shapely.ops
from shapely.geometry import LineString, Polygon, MultiPolygon, MultiLineString
import numpy as np
import cv2
import itertools
import logging
from shapely.validation import make_valid
SHOW_PLT_WARNINGS = False
# 如果没配置日志则配置日志
if not logging.getLogger().handlers:
logging.basicConfig(level=logging.INFO, format='[%(asctime)s][%(name)s][%(levelname)s] %(message)s')
def assert_warning(condition, message):
if SHOW_PLT_WARNINGS and (not condition):
logging.warning(message)
class PltCommand(Enum):
IN = 0
PU = 1
PD = 2
PA = 3
AbstractSyntaxTree = typing.List[typing.Dict]
class VirtualMachineOutput(object):
'''虚拟机执行后的结果'''
MIN_FILTER_AREA_FACTOR = 0.0068 ** 2
MAX_FILTER_AREA_FACTOR = 0.8 ** 2
def __init__(self, polygons :typing.List[Polygon], dispensable: typing.List[LineString]) -> None:
for poly in polygons:
assert isinstance(poly, Polygon)
for line in dispensable:
assert isinstance(line, LineString)
# 过滤结果
# 计算画布大小
min_x, min_y, max_x, max_y = MultiPolygon(polygons).bounds
filter_polygons = []
for polygon in polygons:
if (max_x - min_x) * (max_y - min_y) * VirtualMachineOutput.MIN_FILTER_AREA_FACTOR < polygon.area < (max_x - min_x) * (max_y - min_y) * VirtualMachineOutput.MAX_FILTER_AREA_FACTOR:
filter_polygons.append(polygon)
else:
boundary = polygon.boundary
if isinstance(boundary, LineString):
dispensable.append(boundary)
elif isinstance(boundary, MultiLineString):
dispensable.extend(list(boundary.geoms))
logging.info(f"过滤了{len(polygons) - len(filter_polygons)}个多边形")
polygons = filter_polygons
# 计算多边形的包含关系,构建多边形树
self.nodes = [{"data":polygon, "child":[], "parent":None, "index":index} for index, polygon in enumerate(polygons)]
self.dispensable = dispensable
# 找每个node的最佳parent
for node1 in self.nodes:
best_parent = None
for node2 in self.nodes:
if node1 == node2:
continue
if node1["index"] == 8 and node2["index"] == 9:
pass
# 判断是否包含
#if node2["data"].contains(node1["data"]):
if node2["data"].area > node1["data"].area:
intersection = node2["data"].intersection(node1["data"])
if intersection.area > node1["data"].area * 0.99:
if best_parent is None:
best_parent = node2
else:
if best_parent["data"].contains(node2["data"]):
best_parent = node2
node1["parent"] = best_parent
# 填充child
self.tree = []
for node in self.nodes:
if node["parent"] is not None:
node["parent"]["child"].append(node)
else:
self.tree.append(node)
# 构建广度优先遍历
self.bfs = self.tree.copy()
index = 0
while index < len(self.bfs):
self.bfs.extend(self.bfs[index]["child"])
index += 1
def _draw_nodes(self, nodes: typing.List, scale_factor: float, show_id: bool = True) -> np.ndarray:
# 计算画布大小
min_x, min_y, max_x, max_y = MultiPolygon([node["data"] for node in nodes]).bounds
max_x = int(np.ceil(max_x * scale_factor))
max_y = int(np.ceil(max_y * scale_factor))
min_x = int(np.floor(min_x * scale_factor))
min_y = int(np.floor(min_y * scale_factor))
max_x -= min_x
max_y -= min_y
#print("画布大小", max_x, max_y)
result_img = np.zeros((max_y, max_x, 4), dtype=np.uint8)
nodes = sorted(nodes, key=lambda node: self.bfs.index(node))
for node in nodes:
pts = (np.asarray(node["data"].exterior.coords) * scale_factor - [min_x, min_y]).astype(np.int32)
pts[:,1] = max_y - pts[:,1]
cv2.fillPoly(result_img, [pts], (255,255,255,255))
cv2.polylines(result_img, [pts], True, (0,0,0,255), 2)
# 画无关紧要的东西
multiPolygon = MultiPolygon([node["data"] for node in nodes])
multiPolygon = make_valid(multiPolygon.buffer(0))
for shape in self.dispensable:
if multiPolygon.contains(shape):
pts = (np.asarray(shape.coords) * scale_factor - [min_x, min_y]).astype(np.int32)
pts[:,1] = max_y - pts[:,1]
cv2.polylines(result_img, [pts], False, (0,0,0,255), 1)
# 画parent为None的id
if show_id:
for node in nodes:
if node["parent"] is None:
centroid = node["data"].centroid
text_pos = (int((centroid.x * scale_factor) - min_x), int(max_y - (centroid.y * scale_factor) - min_y))
cv2.putText(result_img, str(node["index"]), text_pos, cv2.FONT_HERSHEY_SIMPLEX, 3, (255,0,0,255), 5)
return result_img
def full_sheet(self, scale_factor: float = 0.1) -> np.ndarray:
'''整幅输出'''
return self._draw_nodes(self.bfs, scale_factor)
def debug_full_sheet(self, scale_factor: float = 0.1) -> np.ndarray:
"""
用随机颜色画出所有多边形和被过滤掉的线段
"""
# 计算画布大小画布大小应该考虑dispensable
all_shapes = [node["data"] for node in self.nodes] + self.dispensable
temp = []
for shape in all_shapes:
if isinstance(shape, LineString):
convex_hull = shape.convex_hull
if isinstance(convex_hull, Polygon):
temp.append(convex_hull)
else:
temp.append(shape)
min_x, min_y, max_x, max_y = MultiPolygon(temp).bounds
max_x = int(np.ceil(max_x * scale_factor))
max_y = int(np.ceil(max_y * scale_factor))
min_x = int(np.floor(min_x * scale_factor))
min_y = int(np.floor(min_y * scale_factor))
max_x -= min_x
max_y -= min_y
result_img = np.zeros((max_y, max_x, 4), dtype=np.uint8)
# 画多边形
for node in self.nodes:
#overlay = result_img.copy()
color = tuple(np.random.randint(0,256,size=3).tolist() + [255])
pts = (np.asarray(node["data"].exterior.coords) * scale_factor - [min_x, min_y]).astype(np.int32)
pts[:,1] = max_y - pts[:,1]
cv2.fillPoly(result_img, [pts], color)
if node["parent"] is None:
cv2.polylines(result_img, [pts], True, (0,0,255,255), 1)
else:
cv2.polylines(result_img, [pts], True, (0,0,0,255), 1)
#alpha = 0.5
#cv2.addWeighted(overlay, alpha, result_img, 1 - alpha, 0, result_img)
# 画被过滤掉的线段
for shape in self.dispensable:
#overlay = result_img.copy()
color = tuple(np.random.randint(0,256,size=3).tolist() + [255])
pts = (np.asarray(shape.coords) * scale_factor - [min_x, min_y]).astype(np.int32)
pts[:,1] = max_y - pts[:,1]
cv2.polylines(result_img, [pts], False, color, 1)
#cv2.addWeighted(overlay, alpha, result_img, 1 - alpha, 0, result_img)
# 画parent为None的id
for node in self.nodes:
if node["parent"] is None:
centroid = node["data"].centroid
text_pos = (int((centroid.x * scale_factor) - min_x), int(max_y - (centroid.y * scale_factor) - min_y))
cv2.putText(result_img, str(node["index"]), text_pos, cv2.FONT_HERSHEY_SIMPLEX, 3, (0,0,255,255), 5)
return result_img
def _distance_between_cluster(self, cluster1, cluster2):
min_distance = float("inf")
for node1 in cluster1:
for node2 in cluster2:
assert isinstance(node1["data"], Polygon)
assert isinstance(node2["data"], Polygon)
d = node1["data"].distance(node2["data"])
if d < min_distance:
min_distance = d
return min_distance
def get_single_size_info(self, size_num: int) -> typing.List[typing.List]:
'''获取单码信息'''
logging.info(f"开始进行距离聚类,目标聚类数目: {size_num}")
# 计算所有多边形两两之间的距离
distances: list[tuple[float, int, int]] = []
for i in range(len(self.nodes)):
for j in range(i + 1, len(self.nodes)):
poly1 = self.nodes[i]["data"]
poly2 = self.nodes[j]["data"]
dist = poly1.distance(poly2)
distances.append((dist, i, j))
distances.sort(key=lambda x: x[0])
# 使用并查集进行聚类
parent = list(range(len(self.nodes)))
def find(i):
if parent[i] != i:
parent[i] = find(parent[i]) # 路径压缩
return parent[i]
def union(i, j):
root_i = find(i)
root_j = find(j)
parent[root_i] = root_j
return root_i != root_j
# 进行聚类直到聚类数目达到size_num
current_cluster_count = len(self.nodes)
for _, i, j in distances:
if current_cluster_count <= size_num:
break
if union(i, j):
current_cluster_count -= 1
# 构建聚类结果
clusters_dict = {}
for i in range(len(self.nodes)):
root = find(i)
if root not in clusters_dict:
clusters_dict[root] = []
clusters_dict[root].append(self.nodes[i])
clusters = list(clusters_dict.values())
logging.info(f"距离聚类得到{len(clusters)}个聚类")
# 排序
sorted_clusters = sorted(clusters, key=lambda x: min([node["index"] for node in x]))
return list(sorted_clusters)
def single_size(self, size_num: int, scale_factor: float = 0.1) -> typing.List[np.ndarray]:
'''单码输出'''
sorted_clusters = self.get_single_size_info(size_num)
result = []
for cluster in sorted_clusters:
result.append(self._draw_nodes(cluster, scale_factor))
return result
def single_piece(self, scale_factor: float = 0.1) -> typing.List[np.ndarray]:
'''单片输出'''
result = []
for node in self.nodes:
# 如果是顶层则新建一副图像
if node["parent"] is None:
nodes = [node] + node['child']
result.append(self._draw_nodes(nodes, scale_factor))
return result
class _VirtualMachine(object):
'''运行PLT文件的虚拟机'''
def __init__(self, tolerance) -> None:
self.tolerance = tolerance
def __run_v2(self, abstract_syntax_tree: AbstractSyntaxTree) -> VirtualMachineOutput:
current_x, current_y = 0, 0
pen_state = 'UP'
raw_segments: typing.List[typing.List[typing.Tuple[float, float]]] = []
current_path: typing.List[typing.Tuple[float, float]] = []
for code in abstract_syntax_tree:
try:
if code["cmd"] == PltCommand.IN:
logging.info("初始化")
elif code["cmd"] == PltCommand.PA:
new_x = float(code["args"][0])
new_y = float(code["args"][1])
if pen_state == 'DOWN':
if not current_path:
current_path.append((current_x, current_y))
current_path.append((new_x, new_y))
elif pen_state == 'UP':
pass
# 更新当前位置
current_x, current_y = new_x, new_y
elif code["cmd"] == PltCommand.PU:
pen_state = 'UP'
if current_path:
assert len(current_path) > 1 and len(set(current_path)) > 1
raw_segments.append(current_path)
current_path = []
elif code["cmd"] == PltCommand.PD:
pen_state = 'DOWN'
except Exception as e:
assert_warning(False, f"[code {code}] 执行指令异常: {e}")
# 处理最后一条路径
if current_path:
assert len(current_path) > 1 and len(set(current_path)) > 1
raw_segments.append(current_path)
current_path = []
merged_geometry = shapely.ops.linemerge(raw_segments)
final_lines: typing.List[LineString] = []
if isinstance(merged_geometry, LineString):
final_lines.append(merged_geometry)
elif isinstance(merged_geometry, MultiLineString):
final_lines.extend(merged_geometry.geoms)
return self.__build_polygons_and_dispensable(final_lines)
def run(self, abstract_syntax_tree: AbstractSyntaxTree, version: int = 2) -> VirtualMachineOutput:
if version == 1:
logging.warning("版本1已过时 请使用版本2")
return self.__run_v1(abstract_syntax_tree)
elif version == 2:
return self.__run_v2(abstract_syntax_tree)
else:
raise ValueError(f"不支持的版本号: {version}")
def __run_v1(self, abstract_syntax_tree: AbstractSyntaxTree) -> VirtualMachineOutput:
context = abstract_syntax_tree.copy()
current_x = 0
current_y = 0
self.pen_state = 'UP'
lines :typing.List[typing.List[typing.Tuple]] = []
lines_flag :typing.List[int] = []
while len(context) != 0:
code = context[0]
try:
if code["cmd"] == PltCommand.IN:
logging.info("初始化")
elif code["cmd"] == PltCommand.PA:
if self.pen_state == 'UP':
pass
elif self.pen_state == 'DOWN':
new_x = float(code["args"][0])
new_y = float(code["args"][1])
# 判断是否与已有的线相连接
for i in range(len(lines) - 1, -1, -1):
if lines_flag[i] == 1:
continue
if lines[i][0] == (current_x, current_y):
lines[i].insert(0, (new_x, new_y))
if lines[i][0] == lines[i][-1]:
lines_flag[i] = 1
break
elif lines[i][0] == (new_x, new_y):
lines[i].insert(0, (current_x, current_y))
if lines[i][0] == lines[i][-1]:
lines_flag[i] = 1
break
elif lines[i][-1] == (current_x, current_y):
lines[i].append((new_x, new_y))
if lines[i][0] == lines[i][-1]:
lines_flag[i] = 1
break
elif lines[i][-1] == (new_x, new_y):
lines[i].append((current_x, current_y))
if lines[i][0] == lines[i][-1]:
lines_flag[i] = 1
break
else:
# 不与已有的线相连接
lines.append([(current_x, current_y), (new_x, new_y)])
lines_flag.append(0)
current_x = float(code["args"][0])
current_y = float(code["args"][1])
elif code["cmd"] == PltCommand.PU:
self.pen_state = 'UP'
elif code["cmd"] == PltCommand.PD:
self.pen_state = 'DOWN'
except Exception as e:
assert_warning(False, f"[code {code}] 执行指令异常: {e}")
finally:
context = context[1:]
# 创建LineString :typing.List[LineString] = []
lineStrings :typing.List[LineString] = []
lineStrings = [LineString(line) for line in lines if len(set(line)) > 1]
'''
import matplotlib.pyplot as plt
for line in lineStrings:
x, y = line.xy
plt.plot(x, y)
plt.show()
'''
return self.__build_polygons_and_dispensable(lineStrings)
def __build_polygons_and_dispensable(self, lineStrings: typing.List[LineString]) -> VirtualMachineOutput:
# 为非常近的线搭桥
tree = shapely.STRtree(lineStrings)
bridges = []
# 遍历所有线段的端点,寻找需要搭桥的地方
for line in lineStrings:
endpoints = [shapely.Point(line.coords[0]), shapely.Point(line.coords[-1])]
for p in endpoints:
for idx in tree.query(p.buffer(self.tolerance)):
other_line = lineStrings[idx]
if other_line == line:
continue
dist = p.distance(other_line)
if 0 < dist <= self.tolerance:
_, p_target = shapely.ops.nearest_points(p, other_line)
bridge = LineString([p, p_target])
bridges.append(bridge)
lineStrings.extend(bridges)
# 节点化并提取闭合区域
all_shapes = shapely.ops.unary_union(lineStrings)
assert isinstance(all_shapes, MultiLineString)
polygons :typing.List[Polygon] = list(shapely.ops.polygonize(all_shapes))
# 合并重叠多边形
merged_polygons = shapely.unary_union(polygons)
assert isinstance(merged_polygons, MultiPolygon)
polygons = list(merged_polygons.geoms)
# 只保留外环
polygons = [Polygon(poly.exterior) for poly in polygons]
# 找到多余的线段
dispensable :typing.List[LineString] = []
closed_rings = [poly.exterior for poly in polygons]
if closed_rings:
leftovers = all_shapes.difference(shapely.ops.unary_union(closed_rings))
assert isinstance(leftovers, shapely.MultiLineString)
for shape in leftovers.geoms:
dispensable.append(shape)
else:
dispensable = list(all_shapes.geoms)
for i in range(len(polygons)):
polygons[i] = polygons[i].buffer(0)
logging.info(f"共生成多边形{len(polygons)}个,线段{len(dispensable)}")
output = VirtualMachineOutput(polygons, dispensable)
return output
class _Preprocessor(object):
'''PLT文件预处理器'''
def __init__(self) -> None:
pass
def get_abstract_syntax_tree(self, src_content: str) -> AbstractSyntaxTree:
content = src_content
# 换行符转分号 某些文件的语句末尾没有分号
content = content.replace("\n", ";")
content = content.replace(" ", ",")
# 删除所有空白字符
for space in string.whitespace:
content = content.replace(space, "")
result = []
# 遍历每一行命令
for line_index, line in enumerate(content.split(";")):
if line == "":
continue
# 提取开头的英文
pattern = r'^[a-zA-Z]+'
match = re.search(pattern, line)
if match is None:
assert_warning(False, f"[line {line_index}] 未找到命令: {line}")
continue
# 获取对应的指令
command_str = match.group().upper()
try:
command = PltCommand[command_str]
except KeyError:
assert_warning(False, f"[line {line_index}] 不支持的指令: {command_str}")
continue
# 获取参数
if len(command_str) == len(line):
stack = []
else:
stack = line[len(command_str):].split(",")
if command == PltCommand.IN:
result.append({'cmd': PltCommand.IN, 'args': []})
elif command == PltCommand.PA:
for i in range(len(stack) // 2):
result.append({'cmd': PltCommand.PA, 'args': [stack[0], stack[1]]})
stack = stack[2:]
elif command == PltCommand.PU:
result.append({'cmd': PltCommand.PU, 'args': []})
for i in range(len(stack) // 2):
result.append({'cmd': PltCommand.PA, 'args': [stack[0], stack[1]]})
stack = stack[2:]
elif command == PltCommand.PD:
result.append({'cmd': PltCommand.PD, 'args': []})
for i in range(len(stack) // 2):
result.append({'cmd': PltCommand.PA, 'args': [stack[0], stack[1]]})
stack = stack[2:]
assert_warning(len(stack) == 0, f"[line {line_index}] 栈非空: {stack}")
return result
class PltReader(object):
'''PLT文件读取器'''
def __init__(self, pltfile: typing.TextIO) -> None:
self.pltfile = pltfile
logging.info("正在预处理PLT文件...")
preprocessor = _Preprocessor()
self.abstract_syntax_tree = preprocessor.get_abstract_syntax_tree(self.pltfile.read())
logging.info("预处理完成")
def get_output(self, tolerance=5, version: int = 2) -> VirtualMachineOutput:
machine = _VirtualMachine(tolerance=tolerance)
output = machine.run(self.abstract_syntax_tree, version)
return output
if __name__ == "__main__":
filepath = "标准.plt"
with open(filepath, 'r') as f:
reader = PltReader(f)
output = reader.get_output()
debug_full_sheet_path = "debug_full_sheet.png"
cv2.imwrite(debug_full_sheet_path, output.debug_full_sheet(scale_factor=72/1016))
print(f"调试整幅输出已写入{debug_full_sheet_path}")
full_sheet_path = "full_sheet.png"
cv2.imwrite(full_sheet_path, output.full_sheet(scale_factor=72/1016))
print(f"整幅输出已写入{full_sheet_path}")
single_size_path = "single_size"
for index, img in enumerate(output.single_size(size_num=5, scale_factor=72/1016)):
cv2.imwrite(os.path.join(single_size_path, f"{index}.png"), img)
print(f"单码输出已写入{single_size_path}")
single_piece_path = "single_piece"
for index, img in enumerate(output.single_piece(scale_factor=72/1016)):
cv2.imwrite(os.path.join(single_piece_path, f"{index}.png"), img)
print(f"单片输出已写入{single_piece_path}")