''' 参考资料: https://www.gnu.org/software/hp2xx/hp2xx.html https://zhuanlan.zhihu.com/p/622090369 ''' import os import typing import string import re import warnings from enum import Enum import shapely import shapely.ops from shapely.geometry import LineString, Polygon, MultiPolygon, MultiLineString import numpy as np import cv2 import itertools import logging from shapely.validation import make_valid SHOW_PLT_WARNINGS = False # 如果没配置日志则配置日志 if not logging.getLogger().handlers: logging.basicConfig(level=logging.INFO, format='[%(asctime)s][%(name)s][%(levelname)s] %(message)s') def assert_warning(condition, message): if SHOW_PLT_WARNINGS and (not condition): logging.warning(message) class PltCommand(Enum): IN = 0 PU = 1 PD = 2 PA = 3 AbstractSyntaxTree = typing.List[typing.Dict] class VirtualMachineOutput(object): '''虚拟机执行后的结果''' MIN_FILTER_AREA_FACTOR = 0.0068 ** 2 MAX_FILTER_AREA_FACTOR = 0.8 ** 2 def __init__(self, polygons :typing.List[Polygon], dispensable: typing.List[LineString]) -> None: for poly in polygons: assert isinstance(poly, Polygon) for line in dispensable: assert isinstance(line, LineString) # 过滤结果 # 计算画布大小 min_x, min_y, max_x, max_y = MultiPolygon(polygons).bounds filter_polygons = [] for polygon in polygons: if (max_x - min_x) * (max_y - min_y) * VirtualMachineOutput.MIN_FILTER_AREA_FACTOR < polygon.area < (max_x - min_x) * (max_y - min_y) * VirtualMachineOutput.MAX_FILTER_AREA_FACTOR: filter_polygons.append(polygon) else: boundary = polygon.boundary if isinstance(boundary, LineString): dispensable.append(boundary) elif isinstance(boundary, MultiLineString): dispensable.extend(list(boundary.geoms)) logging.info(f"过滤了{len(polygons) - len(filter_polygons)}个多边形") polygons = filter_polygons # 计算多边形的包含关系,构建多边形树 self.nodes = [{"data":polygon, "child":[], "parent":None, "index":index} for index, polygon in enumerate(polygons)] self.dispensable = dispensable # 找每个node的最佳parent for node1 in self.nodes: best_parent = None for node2 in self.nodes: if node1 == node2: continue if node1["index"] == 8 and node2["index"] == 9: pass # 判断是否包含 #if node2["data"].contains(node1["data"]): if node2["data"].area > node1["data"].area: intersection = node2["data"].intersection(node1["data"]) if intersection.area > node1["data"].area * 0.99: if best_parent is None: best_parent = node2 else: if best_parent["data"].contains(node2["data"]): best_parent = node2 node1["parent"] = best_parent # 填充child self.tree = [] for node in self.nodes: if node["parent"] is not None: node["parent"]["child"].append(node) else: self.tree.append(node) # 构建广度优先遍历 self.bfs = self.tree.copy() index = 0 while index < len(self.bfs): self.bfs.extend(self.bfs[index]["child"]) index += 1 def _draw_nodes(self, nodes: typing.List, scale_factor: float, show_id: bool = True) -> np.ndarray: # 计算画布大小 min_x, min_y, max_x, max_y = MultiPolygon([node["data"] for node in nodes]).bounds max_x = int(np.ceil(max_x * scale_factor)) max_y = int(np.ceil(max_y * scale_factor)) min_x = int(np.floor(min_x * scale_factor)) min_y = int(np.floor(min_y * scale_factor)) max_x -= min_x max_y -= min_y #print("画布大小", max_x, max_y) result_img = np.zeros((max_y, max_x, 4), dtype=np.uint8) nodes = sorted(nodes, key=lambda node: self.bfs.index(node)) for node in nodes: pts = (np.asarray(node["data"].exterior.coords) * scale_factor - [min_x, min_y]).astype(np.int32) pts[:,1] = max_y - pts[:,1] cv2.fillPoly(result_img, [pts], (255,255,255,255)) cv2.polylines(result_img, [pts], True, (0,0,0,255), 2) # 画无关紧要的东西 multiPolygon = MultiPolygon([node["data"] for node in nodes]) multiPolygon = make_valid(multiPolygon.buffer(0)) for shape in self.dispensable: if multiPolygon.contains(shape): pts = (np.asarray(shape.coords) * scale_factor - [min_x, min_y]).astype(np.int32) pts[:,1] = max_y - pts[:,1] cv2.polylines(result_img, [pts], False, (0,0,0,255), 1) # 画parent为None的id if show_id: for node in nodes: if node["parent"] is None: centroid = node["data"].centroid text_pos = (int((centroid.x * scale_factor) - min_x), int(max_y - (centroid.y * scale_factor) - min_y)) cv2.putText(result_img, str(node["index"]), text_pos, cv2.FONT_HERSHEY_SIMPLEX, 3, (255,0,0,255), 5) return result_img def full_sheet(self, scale_factor: float = 0.1) -> np.ndarray: '''整幅输出''' return self._draw_nodes(self.bfs, scale_factor) def debug_full_sheet(self, scale_factor: float = 0.1) -> np.ndarray: """ 用随机颜色画出所有多边形和被过滤掉的线段 """ # 计算画布大小,画布大小应该考虑dispensable all_shapes = [node["data"] for node in self.nodes] + self.dispensable temp = [] for shape in all_shapes: if isinstance(shape, LineString): convex_hull = shape.convex_hull if isinstance(convex_hull, Polygon): temp.append(convex_hull) else: temp.append(shape) min_x, min_y, max_x, max_y = MultiPolygon(temp).bounds max_x = int(np.ceil(max_x * scale_factor)) max_y = int(np.ceil(max_y * scale_factor)) min_x = int(np.floor(min_x * scale_factor)) min_y = int(np.floor(min_y * scale_factor)) max_x -= min_x max_y -= min_y result_img = np.zeros((max_y, max_x, 4), dtype=np.uint8) # 画多边形 for node in self.nodes: #overlay = result_img.copy() color = tuple(np.random.randint(0,256,size=3).tolist() + [255]) pts = (np.asarray(node["data"].exterior.coords) * scale_factor - [min_x, min_y]).astype(np.int32) pts[:,1] = max_y - pts[:,1] cv2.fillPoly(result_img, [pts], color) if node["parent"] is None: cv2.polylines(result_img, [pts], True, (0,0,255,255), 1) else: cv2.polylines(result_img, [pts], True, (0,0,0,255), 1) #alpha = 0.5 #cv2.addWeighted(overlay, alpha, result_img, 1 - alpha, 0, result_img) # 画被过滤掉的线段 for shape in self.dispensable: #overlay = result_img.copy() color = tuple(np.random.randint(0,256,size=3).tolist() + [255]) pts = (np.asarray(shape.coords) * scale_factor - [min_x, min_y]).astype(np.int32) pts[:,1] = max_y - pts[:,1] cv2.polylines(result_img, [pts], False, color, 1) #cv2.addWeighted(overlay, alpha, result_img, 1 - alpha, 0, result_img) # 画parent为None的id for node in self.nodes: if node["parent"] is None: centroid = node["data"].centroid text_pos = (int((centroid.x * scale_factor) - min_x), int(max_y - (centroid.y * scale_factor) - min_y)) cv2.putText(result_img, str(node["index"]), text_pos, cv2.FONT_HERSHEY_SIMPLEX, 3, (0,0,255,255), 5) return result_img def _distance_between_cluster(self, cluster1, cluster2): min_distance = float("inf") for node1 in cluster1: for node2 in cluster2: assert isinstance(node1["data"], Polygon) assert isinstance(node2["data"], Polygon) d = node1["data"].distance(node2["data"]) if d < min_distance: min_distance = d return min_distance def get_single_size_info(self, size_num: int) -> typing.List[typing.List]: '''获取单码信息''' logging.info(f"开始进行距离聚类,目标聚类数目: {size_num}") # 计算所有多边形两两之间的距离 distances: list[tuple[float, int, int]] = [] for i in range(len(self.nodes)): for j in range(i + 1, len(self.nodes)): poly1 = self.nodes[i]["data"] poly2 = self.nodes[j]["data"] dist = poly1.distance(poly2) distances.append((dist, i, j)) distances.sort(key=lambda x: x[0]) # 使用并查集进行聚类 parent = list(range(len(self.nodes))) def find(i): if parent[i] != i: parent[i] = find(parent[i]) # 路径压缩 return parent[i] def union(i, j): root_i = find(i) root_j = find(j) parent[root_i] = root_j return root_i != root_j # 进行聚类直到聚类数目达到size_num current_cluster_count = len(self.nodes) for _, i, j in distances: if current_cluster_count <= size_num: break if union(i, j): current_cluster_count -= 1 # 构建聚类结果 clusters_dict = {} for i in range(len(self.nodes)): root = find(i) if root not in clusters_dict: clusters_dict[root] = [] clusters_dict[root].append(self.nodes[i]) clusters = list(clusters_dict.values()) logging.info(f"距离聚类得到{len(clusters)}个聚类") # 排序 sorted_clusters = sorted(clusters, key=lambda x: min([node["index"] for node in x])) return list(sorted_clusters) def single_size(self, size_num: int, scale_factor: float = 0.1) -> typing.List[np.ndarray]: '''单码输出''' sorted_clusters = self.get_single_size_info(size_num) result = [] for cluster in sorted_clusters: result.append(self._draw_nodes(cluster, scale_factor)) return result def single_piece(self, scale_factor: float = 0.1) -> typing.List[np.ndarray]: '''单片输出''' result = [] for node in self.nodes: # 如果是顶层则新建一副图像 if node["parent"] is None: nodes = [node] + node['child'] result.append(self._draw_nodes(nodes, scale_factor)) return result class _VirtualMachine(object): '''运行PLT文件的虚拟机''' def __init__(self, tolerance) -> None: self.tolerance = tolerance def __run_v2(self, abstract_syntax_tree: AbstractSyntaxTree) -> VirtualMachineOutput: current_x, current_y = 0, 0 pen_state = 'UP' raw_segments: typing.List[typing.List[typing.Tuple[float, float]]] = [] current_path: typing.List[typing.Tuple[float, float]] = [] for code in abstract_syntax_tree: try: if code["cmd"] == PltCommand.IN: logging.info("初始化") elif code["cmd"] == PltCommand.PA: new_x = float(code["args"][0]) new_y = float(code["args"][1]) if pen_state == 'DOWN': if not current_path: current_path.append((current_x, current_y)) current_path.append((new_x, new_y)) elif pen_state == 'UP': pass # 更新当前位置 current_x, current_y = new_x, new_y elif code["cmd"] == PltCommand.PU: pen_state = 'UP' if current_path: assert len(current_path) > 1 and len(set(current_path)) > 1 raw_segments.append(current_path) current_path = [] elif code["cmd"] == PltCommand.PD: pen_state = 'DOWN' except Exception as e: assert_warning(False, f"[code {code}] 执行指令异常: {e}") # 处理最后一条路径 if current_path: assert len(current_path) > 1 and len(set(current_path)) > 1 raw_segments.append(current_path) current_path = [] merged_geometry = shapely.ops.linemerge(raw_segments) final_lines: typing.List[LineString] = [] if isinstance(merged_geometry, LineString): final_lines.append(merged_geometry) elif isinstance(merged_geometry, MultiLineString): final_lines.extend(merged_geometry.geoms) return self.__build_polygons_and_dispensable(final_lines) def run(self, abstract_syntax_tree: AbstractSyntaxTree, version: int = 2) -> VirtualMachineOutput: if version == 1: logging.warning("版本1已过时 请使用版本2") return self.__run_v1(abstract_syntax_tree) elif version == 2: return self.__run_v2(abstract_syntax_tree) else: raise ValueError(f"不支持的版本号: {version}") def __run_v1(self, abstract_syntax_tree: AbstractSyntaxTree) -> VirtualMachineOutput: context = abstract_syntax_tree.copy() current_x = 0 current_y = 0 self.pen_state = 'UP' lines :typing.List[typing.List[typing.Tuple]] = [] lines_flag :typing.List[int] = [] while len(context) != 0: code = context[0] try: if code["cmd"] == PltCommand.IN: logging.info("初始化") elif code["cmd"] == PltCommand.PA: if self.pen_state == 'UP': pass elif self.pen_state == 'DOWN': new_x = float(code["args"][0]) new_y = float(code["args"][1]) # 判断是否与已有的线相连接 for i in range(len(lines) - 1, -1, -1): if lines_flag[i] == 1: continue if lines[i][0] == (current_x, current_y): lines[i].insert(0, (new_x, new_y)) if lines[i][0] == lines[i][-1]: lines_flag[i] = 1 break elif lines[i][0] == (new_x, new_y): lines[i].insert(0, (current_x, current_y)) if lines[i][0] == lines[i][-1]: lines_flag[i] = 1 break elif lines[i][-1] == (current_x, current_y): lines[i].append((new_x, new_y)) if lines[i][0] == lines[i][-1]: lines_flag[i] = 1 break elif lines[i][-1] == (new_x, new_y): lines[i].append((current_x, current_y)) if lines[i][0] == lines[i][-1]: lines_flag[i] = 1 break else: # 不与已有的线相连接 lines.append([(current_x, current_y), (new_x, new_y)]) lines_flag.append(0) current_x = float(code["args"][0]) current_y = float(code["args"][1]) elif code["cmd"] == PltCommand.PU: self.pen_state = 'UP' elif code["cmd"] == PltCommand.PD: self.pen_state = 'DOWN' except Exception as e: assert_warning(False, f"[code {code}] 执行指令异常: {e}") finally: context = context[1:] # 创建LineString :typing.List[LineString] = [] lineStrings :typing.List[LineString] = [] lineStrings = [LineString(line) for line in lines if len(set(line)) > 1] ''' import matplotlib.pyplot as plt for line in lineStrings: x, y = line.xy plt.plot(x, y) plt.show() ''' return self.__build_polygons_and_dispensable(lineStrings) def __build_polygons_and_dispensable(self, lineStrings: typing.List[LineString]) -> VirtualMachineOutput: # 为非常近的线搭桥 tree = shapely.STRtree(lineStrings) bridges = [] # 遍历所有线段的端点,寻找需要搭桥的地方 for line in lineStrings: endpoints = [shapely.Point(line.coords[0]), shapely.Point(line.coords[-1])] for p in endpoints: for idx in tree.query(p.buffer(self.tolerance)): other_line = lineStrings[idx] if other_line == line: continue dist = p.distance(other_line) if 0 < dist <= self.tolerance: _, p_target = shapely.ops.nearest_points(p, other_line) bridge = LineString([p, p_target]) bridges.append(bridge) lineStrings.extend(bridges) # 节点化并提取闭合区域 all_shapes = shapely.ops.unary_union(lineStrings) assert isinstance(all_shapes, MultiLineString) polygons :typing.List[Polygon] = list(shapely.ops.polygonize(all_shapes)) # 合并重叠多边形 merged_polygons = shapely.unary_union(polygons) assert isinstance(merged_polygons, MultiPolygon) polygons = list(merged_polygons.geoms) # 只保留外环 polygons = [Polygon(poly.exterior) for poly in polygons] # 找到多余的线段 dispensable :typing.List[LineString] = [] closed_rings = [poly.exterior for poly in polygons] if closed_rings: leftovers = all_shapes.difference(shapely.ops.unary_union(closed_rings)) assert isinstance(leftovers, shapely.MultiLineString) for shape in leftovers.geoms: dispensable.append(shape) else: dispensable = list(all_shapes.geoms) for i in range(len(polygons)): polygons[i] = polygons[i].buffer(0) logging.info(f"共生成多边形{len(polygons)}个,线段{len(dispensable)}条") output = VirtualMachineOutput(polygons, dispensable) return output class _Preprocessor(object): '''PLT文件预处理器''' def __init__(self) -> None: pass def get_abstract_syntax_tree(self, src_content: str) -> AbstractSyntaxTree: content = src_content # 换行符转分号 某些文件的语句末尾没有分号 content = content.replace("\n", ";") content = content.replace(" ", ",") # 删除所有空白字符 for space in string.whitespace: content = content.replace(space, "") result = [] # 遍历每一行命令 for line_index, line in enumerate(content.split(";")): if line == "": continue # 提取开头的英文 pattern = r'^[a-zA-Z]+' match = re.search(pattern, line) if match is None: assert_warning(False, f"[line {line_index}] 未找到命令: {line}") continue # 获取对应的指令 command_str = match.group().upper() try: command = PltCommand[command_str] except KeyError: assert_warning(False, f"[line {line_index}] 不支持的指令: {command_str}") continue # 获取参数 if len(command_str) == len(line): stack = [] else: stack = line[len(command_str):].split(",") if command == PltCommand.IN: result.append({'cmd': PltCommand.IN, 'args': []}) elif command == PltCommand.PA: for i in range(len(stack) // 2): result.append({'cmd': PltCommand.PA, 'args': [stack[0], stack[1]]}) stack = stack[2:] elif command == PltCommand.PU: result.append({'cmd': PltCommand.PU, 'args': []}) for i in range(len(stack) // 2): result.append({'cmd': PltCommand.PA, 'args': [stack[0], stack[1]]}) stack = stack[2:] elif command == PltCommand.PD: result.append({'cmd': PltCommand.PD, 'args': []}) for i in range(len(stack) // 2): result.append({'cmd': PltCommand.PA, 'args': [stack[0], stack[1]]}) stack = stack[2:] assert_warning(len(stack) == 0, f"[line {line_index}] 栈非空: {stack}") return result class PltReader(object): '''PLT文件读取器''' def __init__(self, pltfile: typing.TextIO) -> None: self.pltfile = pltfile logging.info("正在预处理PLT文件...") preprocessor = _Preprocessor() self.abstract_syntax_tree = preprocessor.get_abstract_syntax_tree(self.pltfile.read()) logging.info("预处理完成") def get_output(self, tolerance=5, version: int = 2) -> VirtualMachineOutput: machine = _VirtualMachine(tolerance=tolerance) output = machine.run(self.abstract_syntax_tree, version) return output if __name__ == "__main__": filepath = "标准.plt" with open(filepath, 'r') as f: reader = PltReader(f) output = reader.get_output() debug_full_sheet_path = "debug_full_sheet.png" cv2.imwrite(debug_full_sheet_path, output.debug_full_sheet(scale_factor=72/1016)) print(f"调试整幅输出已写入{debug_full_sheet_path}") full_sheet_path = "full_sheet.png" cv2.imwrite(full_sheet_path, output.full_sheet(scale_factor=72/1016)) print(f"整幅输出已写入{full_sheet_path}") single_size_path = "single_size" for index, img in enumerate(output.single_size(size_num=5, scale_factor=72/1016)): cv2.imwrite(os.path.join(single_size_path, f"{index}.png"), img) print(f"单码输出已写入{single_size_path}") single_piece_path = "single_piece" for index, img in enumerate(output.single_piece(scale_factor=72/1016)): cv2.imwrite(os.path.join(single_piece_path, f"{index}.png"), img) print(f"单片输出已写入{single_piece_path}")