Conversation
Reviewer's GuideThis PR implements a unified control stack for a SynthonX liquid‐handling platform, introducing a thread‐safe RS485 bus and Modbus RTU helper, high‐level XYZ motion and pipette abstractions, a CLI and a full tkinter GUI frontend, a YAML registry for UniLab workflows along with advanced flow routines, and a USB relay controller for stirring. It ties together low‐level communication, coordinate transforms, safe homing/motion, JSON‐backed point management, and process sequences into a cohesive package. Sequence diagram for safe movement and pipetting (SynthonX platform)sequenceDiagram
actor User
participant GUI
participant Station
participant SharedXYZController
participant SOPAPipetteYYQ
User->>GUI: Request move and pipette
GUI->>Station: move_to_work_safe(x, y, z)
Station->>SharedXYZController: move_to_work_safe(x, y, z)
SharedXYZController->>SharedRS485Bus: send movement commands
SharedXYZController-->>Station: movement complete
Station->>SOPAPipetteYYQ: aspirate(volume)
SOPAPipetteYYQ->>SharedRS485Bus: send pipette command
SOPAPipetteYYQ-->>Station: aspirate complete
Station-->>GUI: operation result
Class diagram for SynthonX unified control stackclassDiagram
class SharedRS485Bus {
+port: str
+baudrate: int
+timeout: float
+serial
+lock
+open()
+close()
+reset_input()
+write(data: bytes)
+read(n: int)
+read_exact(n: int, overall_timeout: float)
}
class XYZModbus {
+bus: SharedRS485Bus
+ignore_crc_error: bool
+set_ignore_crc(flag: bool)
+read_regs(slave: int, addr: int, count: int)
+write_reg(slave: int, addr: int, val: int)
+write_regs(slave: int, start: int, values: List[int])
}
class SharedXYZController {
+bus: SharedRS485Bus
+mb: XYZModbus
+cfg: MachineConfig
+origin: CoordinateOrigin
+enable(axis: MotorAxis, on: bool)
+emergency_stop(axis: MotorAxis)
+get_motor_status(axis: MotorAxis)
+move_to_steps(axis: MotorAxis, steps: int, speed_rpm: int, accel: int, precision: int)
+wait_for_completion(axis: MotorAxis, timeout: float)
+home_axis(axis: MotorAxis, direction: int)
+home_all()
+set_work_origin_here()
+move_to_work_safe(x, y, z, speed, accel)
+move_rel_z_mm(dz: float, speed, accel)
}
class SOPAPipetteYYQ {
+bus: SharedRS485Bus
+config: SOPAConfig
+is_initialized: bool
+initialize()
+eject_tip()
+aspirate(volume_uL: float)
+dispense(volume_uL: float)
}
class LiquidStation {
+bus: SharedRS485Bus
+xyz: SharedXYZController
+pip: SOPAPipetteYYQ
+connect()
+disconnect()
+home_all()
+set_work_origin_here()
+move_to(x, y, z, speed, accel)
+move_rel_z(dz_mm: float)
+pipette_init()
+eject_tip()
+aspirate(vol_ul: float)
+dispense(vol_ul: float)
+estop_all()
}
class Station {
+bus: SharedRS485Bus
+xyz: SharedXYZController
+pip: SOPAPipetteYYQ
+connect()
+disconnect()
+set_work_origin_here()
+home_safe()
+emergency_stop()
+get_status_mm()
+move_to_work_safe(x, y, z, speed, acc)
+move_to_work_direct(x, y, z, speed, acc, z_order)
+move_relative_direct(dx, dy, dz, speed, acc)
+pip_init()
+pip_eject()
+pip_asp(ul: float)
+pip_dsp(ul: float)
}
class RelayController {
+port: str
+baudrate: int
+timeout: float
+ser
+connect()
+on(wait_response: bool)
+off(wait_response: bool)
+toggle(wait_response: bool)
+state()
+close()
+ensure_off_on_exit()
}
class SynthonXFlowV2 {
+cfg: FlowConfig
+station: Station
+reactor: RelayController
+system_init()
+pick_tip(tip_point: str, down_mm: float)
+drop_tip(tip_point: str, down_mm: float)
+transfer_A_to_B(...)
+transfer_B_to_D(...)
+filtering(...)
+pushing(...)
+load_for_nmr(...)
+stir_on(...)
+stir_off(...)
+stir_for(...)
}
SharedRS485Bus <|-- XYZModbus
SharedRS485Bus <|-- SOPAPipetteYYQ
SharedRS485Bus <|-- SharedXYZController
SharedXYZController <|-- LiquidStation
SharedXYZController <|-- Station
SOPAPipetteYYQ <|-- LiquidStation
SOPAPipetteYYQ <|-- Station
Station <|-- SynthonXFlowV2
RelayController <|-- SynthonXFlowV2
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes - here's some feedback:
- SynthonX.py is massive and mixes RS485 bus, Modbus, XYZ controller, pipette, and CLI logic—consider splitting it into smaller modules (e.g. bus, modbus, controller, pipette) to improve readability and maintainability.
- I noticed SharedXYZController.move_to_work_safe is defined twice with different logic—remove the redundant definition and consolidate CRC‐ignore handling into a single implementation to avoid confusion.
- The GUI duplicates a lot of the CLI/station interaction code; extract common station workflows into reusable APIs so both CLI and GUI can share logic without copying it.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- SynthonX.py is massive and mixes RS485 bus, Modbus, XYZ controller, pipette, and CLI logic—consider splitting it into smaller modules (e.g. bus, modbus, controller, pipette) to improve readability and maintainability.
- I noticed SharedXYZController.move_to_work_safe is defined twice with different logic—remove the redundant definition and consolidate CRC‐ignore handling into a single implementation to avoid confusion.
- The GUI duplicates a lot of the CLI/station interaction code; extract common station workflows into reusable APIs so both CLI and GUI can share logic without copying it.
## Individual Comments
### Comment 1
<location> `unilabos/devices/SynthonX/SynthonX.py:515` </location>
<code_context>
+ if z is not None and (z < 0 or z > self.cfg.max_travel_z):
+ raise CoordinateSystemError(f"Z out of range: {z}")
+
+ def move_to_work_safe(self, x=None, y=None, z=None, speed=None, accel=None) -> bool:
+ self.check_limits(x, y, z)
+ speed = speed or self.cfg.default_speed
</code_context>
<issue_to_address>
**issue:** Duplicate definition of move_to_work_safe detected.
Remove the first move_to_work_safe definition to prevent confusion and ensure consistent handling of CRC errors.
</issue_to_address>
### Comment 2
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:301-303` </location>
<code_context>
+ p = self._pt(tip_point)
+ self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
+ time.sleep(self.cfg.settle_s)
+ self.station.move_relative_direct(0.0, 0.0, float(down_mm))
+ time.sleep(self.cfg.settle_s)
+ self.station.move_relative_direct(0.0, 0.0, -float(down_mm))
+ time.sleep(self.cfg.settle_s)
+ print(f'{tip_point}枪头已经装载')
</code_context>
<issue_to_address>
**suggestion (bug_risk):** No error handling for pipette pick/drop failures.
Currently, pick_tip and drop_tip always return True, even if move_relative_direct or pip_eject fail. Please add error handling or status checks to ensure failures are detected and handled appropriately.
Suggested implementation:
```python
def pick_tip(self, tip_point: str, down_mm: float = 120) -> bool:
p = self._pt(tip_point)
ok1 = self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
time.sleep(self.cfg.settle_s)
ok2 = self.station.move_relative_direct(0.0, 0.0, float(down_mm))
time.sleep(self.cfg.settle_s)
ok3 = self.station.move_relative_direct(0.0, 0.0, -float(down_mm))
time.sleep(self.cfg.settle_s)
if not (ok1 and ok2 and ok3):
print(f'ERROR: {tip_point}枪头装载失败')
return False
print(f'{tip_point}枪头已经装载')
return True
```
```python
def drop_tip(self, tip_point: str, down_mm: float = 60) -> bool:
p = self._pt(tip_point)
ok1 = self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
time.sleep(self.cfg.settle_s)
ok2 = self.station.move_relative_direct(0.0, 0.0, float(down_mm))
time.sleep(self.cfg.settle_s)
ok3 = self.station.pip_eject()
time.sleep(self.cfg.settle_s)
if not (ok1 and ok2 and ok3):
print(f'ERROR: {tip_point}枪头卸载失败')
return False
print(f'{tip_point}枪头已经卸载')
return True
```
If `move_to_work_safe`, `move_relative_direct`, or `pip_eject` do not currently return a status (True/False), you will need to update those methods to do so, or use another way to detect failure (e.g., exception handling).
</issue_to_address>
### Comment 3
<location> `unilabos/devices/SynthonX/SynthonX.py:75-83` </location>
<code_context>
while len(buf) < n:
if time.time() > deadline:
break
need = n - len(buf)
chunk = self.read(need)
if chunk:
buf += chunk
else:
time.sleep(0.001)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Move a guard clause in a while statement's body into its test ([`while-guard-to-condition`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/while-guard-to-condition))
```suggestion
while len(buf) < n and not time.time() > deadline:
need = n - len(buf)
chunk = self.read(need)
if chunk:
buf += chunk
else:
time.sleep(0.001)
```
<br/><details><summary>Explanation</summary>Removing the guard clause simplifies the code and makes clearer the intention of
the loop.
</details>
</issue_to_address>
### Comment 4
<location> `unilabos/devices/SynthonX/SynthonX.py:79-80` </location>
<code_context>
def read_exact(self, n: int, overall_timeout: float = 0.3) -> bytes:
"""Read exactly n bytes within overall_timeout; return b'' if timeout."""
if n <= 0:
return b""
buf = b""
deadline = time.time() + overall_timeout
while len(buf) < n:
if time.time() > deadline:
break
need = n - len(buf)
chunk = self.read(need)
if chunk:
buf += chunk
else:
time.sleep(0.001)
return buf
</code_context>
<issue_to_address>
**suggestion (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
```suggestion
if chunk := self.read(need):
```
</issue_to_address>
### Comment 5
<location> `unilabos/devices/SynthonX/SynthonX.py:144` </location>
<code_context>
def set_ignore_crc(self, flag: bool):
self.ignore_crc_error = bool(flag)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Remove unnecessary casts to int, str, float or bool ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
```suggestion
self.ignore_crc_error = flag
```
</issue_to_address>
### Comment 6
<location> `unilabos/devices/SynthonX/SynthonX.py:159` </location>
<code_context>
def _xfer(self, slave: int, payload: bytes, retries: int = 3) -> bytes:
req = bytes([slave]) + payload
frame = req + self._crc16(req)
fn_req = payload[0]
# 不做统计,只在最终失败时可选择忽略返回
for attempt in range(1, retries + 1):
with self.bus.lock:
if not self.bus.serial or not self.bus.serial.is_open:
raise ModbusException("Bus not open")
self.bus.reset_input()
self.bus.write(frame)
time.sleep(0.010)
try:
base = 0.30 + 0.15*(attempt-1)
header = self.bus.read_exact(2, overall_timeout=base)
if len(header) < 2:
raise ModbusException("No response")
addr, fn = header[0], header[1]
if addr != slave:
# 把这一帧当成串扰/回波,丢弃后继续本次尝试
time.sleep(0.005)
continue
if (fn & 0x80) != 0:
rest = self.bus.read_exact(3, overall_timeout=base)
resp = header + rest
if len(rest) < 3:
raise ModbusException("Short exception response")
if resp[-2:] != self._crc16(resp[:-2]):
logger.warning(f"CRC mismatch (exception response) attempt {attempt}/{retries} slave={slave} fn=0x{fn_req:02X}")
if attempt >= retries:
if self.ignore_crc_error:
logger.error("CRC mismatch(异常帧)重试耗尽已忽略 (风险:异常码可能失真)")
return resp # 返回未校验异常帧
raise ModbusException("CRC mismatch (exception)")
time.sleep(0.005)
continue
ex_code = resp[2]
raise ModbusException(f"Modbus exception: 0x{ex_code:02X}")
if fn == 0x03:
bc_b = self.bus.read_exact(1, overall_timeout=base)
if len(bc_b) < 1:
raise ModbusException("Short response (no byte count)")
bc = bc_b[0]
data_crc = self.bus.read_exact(bc + 2, overall_timeout=base + 0.20)
resp = header + bc_b + data_crc
if len(data_crc) < bc + 2:
raise ModbusException("Short response (payload)")
elif fn in (0x06, 0x10):
rest = self.bus.read_exact(6, overall_timeout=base + 0.20)
resp = header + rest
if len(rest) < 6:
raise ModbusException("Short response")
else:
tail = self.bus.read_exact(254, overall_timeout=base + 0.30)
resp = header + tail
if len(resp) < 3:
raise ModbusException("Short response")
if resp[-2:] != self._crc16(resp[:-2]):
logger.warning(f"CRC mismatch (attempt {attempt}/{retries}) slave={slave} fn=0x{fn_req:02X}")
if attempt >= retries:
if self.ignore_crc_error:
logger.error("CRC mismatch 重试耗尽已忽略 (风险:数据未校验)")
return resp # 直接返回未校验帧
raise ModbusException("CRC mismatch")
time.sleep(0.005)
continue
if resp[1] != fn_req:
raise ModbusException(f"Unexpected function: {resp[1]:02X} (!={fn_req:02X})")
return resp # 成功
except ModbusException:
if attempt >= retries:
# 已在 CRC 分支处理 ignore 情况;这里直接抛出其他类型异常
raise
time.sleep(0.01)
</code_context>
<issue_to_address>
**issue (code-quality):** Low code quality found in XYZModbus.\_xfer - 7% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))
<br/><details><summary>Explanation</summary>The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines.
- Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.</details>
</issue_to_address>
### Comment 7
<location> `unilabos/devices/SynthonX/SynthonX.py:250-253` </location>
<code_context>
def read_regs(self, slave: int, addr: int, count: int) -> List[int]:
fn = 0x03
payload = bytes([fn]) + addr.to_bytes(2, "big") + count.to_bytes(2, "big")
resp = self._xfer(slave, payload)
byte_count = resp[2]
vals = []
for i in range(0, byte_count, 2):
vals.append(int.from_bytes(resp[3 + i:5 + i], "big"))
return vals
</code_context>
<issue_to_address>
**suggestion (code-quality):** We've found these issues:
- Convert for loop into list comprehension ([`list-comprehension`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/list-comprehension/))
- Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))
```suggestion
return [
int.from_bytes(resp[3 + i : 5 + i], "big")
for i in range(0, byte_count, 2)
]
```
</issue_to_address>
### Comment 8
<location> `unilabos/devices/SynthonX/SynthonX.py:392-395` </location>
<code_context>
def move_to_steps(self, axis: MotorAxis, steps: int, speed_rpm: int = 1000,
accel: int = 1000, precision: int = 100) -> bool:
a = self.addr[axis]
if steps < 0:
steps = (steps + 0x100000000) & 0xFFFFFFFF
hi = (steps >> 16) & 0xFFFF
lo = steps & 0xFFFF
ok = self.mb.write_regs(a, XYZModbus.REG_TARGET_POSITION_HIGH, [
hi, lo, speed_rpm, accel, precision
])
return ok
</code_context>
<issue_to_address>
**suggestion (code-quality):** Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))
```suggestion
return self.mb.write_regs(
a,
XYZModbus.REG_TARGET_POSITION_HIGH,
[hi, lo, speed_rpm, accel, precision],
)
```
</issue_to_address>
### Comment 9
<location> `unilabos/devices/SynthonX/SynthonX.py:791` </location>
<code_context>
def main():
print("\n=== Unified XYZ + YYQ SOPA (Single-Port) ===")
port = input("串口端口 (默认 COM3): ").strip() or "COM3"
station = LiquidStation(port)
station.connect()
station.load_points()
init_pip = input("是否初始化移液器? (y/N): ").strip().lower() in ("y", "yes")
if init_pip:
if station.pipette_init():
print("移液器初始化完成。")
else:
print("移液器初始化失败。")
while True:
print("\n" + "=" * 50)
print("1) 全轴回零(Z→X→Y)")
print("2) 设定当前位置为工作原点")
print("3) 安全移动到点 (X/Y/Z,mm)")
print("4) Z 轴相对移动 (mm)")
print("5) 保存/前往点位")
print("6) 移液:初始化 / 弹出枪头 / 吸液 / 排液")
print("7) 直接移动(不抬Z) X/Y/Z + 顺序(first/last/auto)")
print("99) 紧急停止")
print("0) 退出")
choice = input("选择: ").strip()
if choice == "0":
break
elif choice == "1":
print("回零中…")
print("成功" if station.home_all() else "失败")
elif choice == "2":
print("设定工作原点…")
print("成功" if station.set_work_origin_here() else "失败")
elif choice == "3":
x = input("X(mm, 空=跳过): ").strip()
y = input("Y(mm, 空=跳过): ").strip()
z = input("Z(mm, 空=跳过): ").strip()
x = float(x) if x else None
y = float(y) if y else None
z = float(z) if z else None
ok = station.move_to(x, y, z)
print("到位" if ok else "失败")
elif choice == "4":
dz = float(input("Z 相对位移(mm,正=下降): ").strip())
ok = station.move_rel_z(dz)
print("完成" if ok else "失败")
elif choice == "5":
sub = input("(a)保存点 (b)前往点: ").strip().lower()
if sub == "a":
name = input("点名: ").strip()
x = float(input("X(mm): ").strip())
y = float(input("Y(mm): ").strip())
z = float(input("Z(mm): ").strip())
station._points[name] = {"x": x, "y": y, "z": z}
station.save_points()
print("已保存")
else:
name = input("点名: ").strip()
pt = station._points.get(name)
if not pt:
print("未找到该点")
else:
ok = station.move_to(pt["x"], pt["y"], pt["z"])
print("到位" if ok else "失败")
elif choice == "6":
sub = input("(a)初始化 (b)弹出枪头 (c)吸液 (d)排液: ").strip().lower()
if sub == "a":
print("初始化…")
print("完成" if station.pipette_init() else "失败")
elif sub == "b":
print("弹出枪头…")
station.eject_tip()
print("完成")
elif sub == "c":
vol = float(input("吸液体积(µL): ").strip())
station.aspirate(vol)
elif sub == "d":
vol = float(input("排液体积(µL): ").strip())
station.dispense(vol)
else:
print("无效子选项")
elif choice == "7":
x = input("X(mm, 空=跳过): ").strip()
y = input("Y(mm, 空=跳过): ").strip()
z = input("Z(mm, 空=跳过): ").strip()
z_order = input("Z顺序(first/last/auto, 默认auto): ").strip().lower() or "auto"
x = float(x) if x else None
y = float(y) if y else None
z = float(z) if z else None
ok = station.move_to_direct(x=x, y=y, z=z, z_order=z_order)
print("到位" if ok else "失败")
elif choice == "99":
station.estop_all()
print("已急停")
else:
print("无效选项")
station.disconnect()
print("Bye.")
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Swap positions of nested conditionals ([`swap-nested-ifs`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/swap-nested-ifs/))
- Hoist nested repeated code outside conditional statements ([`hoist-similar-statement-from-if`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/hoist-similar-statement-from-if/))
- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Swap if/else branches ([`swap-if-else-branches`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/swap-if-else-branches/))
- Low code quality found in main - 4% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))
<br/><details><summary>Explanation</summary>
The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines.
- Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.</details>
</issue_to_address>
### Comment 10
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:56-57` </location>
<code_context>
def _idx_from_name(name: str) -> int:
"""提取 'C1' / 'D49' 的编号 -> 1 / 49"""
try:
return int(''.join(c for c in name if c.isdigit()))
except Exception:
raise ValueError(f"无法解析点名编号: {name!r}")
</code_context>
<issue_to_address>
**suggestion (code-quality):** Explicitly raise from a previous error ([`raise-from-previous-error`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/raise-from-previous-error/))
```suggestion
except Exception as e:
raise ValueError(f"无法解析点名编号: {name!r}") from e
```
</issue_to_address>
### Comment 11
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:199` </location>
<code_context>
def stir_for(self, seconds: float, wait_response: bool = True) -> bool:
"""阻塞式搅拌指定秒数,超简单实用。"""
_require(seconds > 0, "seconds 必须>0")
if self.stir_on(wait_response=wait_response):
try:
print(f"持续搅拌 {seconds} 秒...")
time.sleep(float(seconds))
finally:
self.stir_off(wait_response=wait_response)
print("搅拌结束")
return True
return False
</code_context>
<issue_to_address>
**suggestion (code-quality):** Remove unnecessary casts to int, str, float or bool ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
```suggestion
time.sleep(seconds)
```
</issue_to_address>
### Comment 12
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:254` </location>
<code_context>
def _down_rel(self, dz: float):
_require(dz >= 0, "下探距离必须>=0")
self.station.move_relative_direct(0.0, 0.0, -float(dz))
time.sleep(self.cfg.settle_s)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Remove unnecessary casts to int, str, float or bool ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
```suggestion
self.station.move_relative_direct(0.0, 0.0, -dz)
```
</issue_to_address>
### Comment 13
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:259` </location>
<code_context>
def _up_rel(self, dz: float):
_require(dz >= 0, "上移距离必须>=0")
self.station.move_relative_direct(0.0, 0.0, +float(dz))
time.sleep(self.cfg.settle_s)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Remove unnecessary casts to int, str, float or bool ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
```suggestion
self.station.move_relative_direct(0.0, 0.0, +dz)
```
</issue_to_address>
### Comment 14
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:263` </location>
<code_context>
def _xy_rel(self, dx: float, dy: float):
self.station.move_relative_direct(float(dx), float(dy), 0.0)
time.sleep(self.cfg.settle_s)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Remove unnecessary casts to int, str, float or bool [×2] ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
```suggestion
self.station.move_relative_direct(dx, dy, 0.0)
```
</issue_to_address>
### Comment 15
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:301-303` </location>
<code_context>
def pick_tip(self, tip_point: str, down_mm: float = 120) -> bool:
p = self._pt(tip_point)
self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
time.sleep(self.cfg.settle_s)
self.station.move_relative_direct(0.0, 0.0, float(down_mm))
time.sleep(self.cfg.settle_s)
self.station.move_relative_direct(0.0, 0.0, -float(down_mm))
time.sleep(self.cfg.settle_s)
print(f'{tip_point}枪头已经装载')
return True
</code_context>
<issue_to_address>
**issue (code-quality):** Remove unnecessary casts to int, str, float or bool [×2] ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
</issue_to_address>
### Comment 16
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:312-316` </location>
<code_context>
def drop_tip(self, tip_point: str, down_mm: float = 60) -> bool:
p = self._pt(tip_point)
self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
time.sleep(self.cfg.settle_s)
self.station.move_relative_direct(0.0, 0.0, float(down_mm))
time.sleep(self.cfg.settle_s)
self.station.pip_eject()
time.sleep(self.cfg.settle_s)
self.station.move_relative_direct(0.0, 0.0, -float(down_mm))
time.sleep(self.cfg.settle_s)
print(f'枪头已经弃置在{tip_point}')
return True
</code_context>
<issue_to_address>
**issue (code-quality):** Remove unnecessary casts to int, str, float or bool [×2] ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
</issue_to_address>
### Comment 17
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:349` </location>
<code_context>
def _do_transfer(self,
src_name: str,
dst_names: Union[str, Iterable[str]],
tip_c_name: str,
total_ul: float,
down_src_mm: float,
down_dst_mm: float,
split_volumes: Optional[List[float]] = None,
stir_post_s: Optional[float] = None) -> bool:
# 修改:统一解析目标名
dst_list = _split_names(dst_names)
_require(len(dst_list) >= 1, "目标点名至少1个")
# (1) 取枪头
self.pick_tip(tip_c_name, down_mm=120)
time.sleep(self.cfg.settle_s)
# (2) 到源位
src_p = self._pt(src_name)
self.station.move_to_work_safe(x=src_p["x"], y=src_p["y"], z=src_p["z"])
time.sleep(self.cfg.settle_s)
print('到达源点位')
# (3) 下探源位
self.station.move_relative_direct(0.0, 0.0, float(down_src_mm))
time.sleep(self.cfg.settle_s)
print('下探完成')
# (4) 吸液
self.station.pip_asp(float(total_ul))
time.sleep(self.cfg.settle_s)
print('吸取液体完成')
# (5) 回升源位
self.station.move_relative_direct(0.0, 0.0, -float(down_src_mm))
time.sleep(self.cfg.settle_s)
print('回升完成')
# === 目标处理 ===
if len(dst_list) == 1:
# 单目标
dst_p = self._pt(dst_list[0])
self.station.move_to_work_safe(x=dst_p["x"], y=dst_p["y"], z=dst_p["z"])
time.sleep(self.cfg.settle_s)
print('移动到目标点位')
self.station.move_relative_direct(0.0, 0.0, float(down_dst_mm))
time.sleep(self.cfg.settle_s)
print('下探')
self.station.pip_dsp(float(total_ul))
time.sleep(self.cfg.settle_s)
time.sleep(self.cfg.delay_after_dispense)
self.station.move_relative_direct(0.0, 0.0, -float(down_dst_mm))
time.sleep(self.cfg.settle_s)
else:
# 多目标
if split_volumes is not None:
_require(len(split_volumes) == len(dst_list), "split_volumes 长度需与目标数量一致")
vols = [float(v) for v in split_volumes]
else:
each = float(total_ul) / float(len(dst_list))
vols = [each] * len(dst_list)
first_name = dst_list[0]
first_p = self._pt(first_name)
self.station.move_to_work_safe(x=first_p["x"], y=first_p["y"], z=first_p["z"])
time.sleep(self.cfg.settle_s)
self.station.move_relative_direct(0.0, 0.0, float(down_dst_mm))
time.sleep(self.cfg.settle_s)
self.station.pip_dsp(vols[0])
time.sleep(self.cfg.settle_s)
base_x, base_y = first_p["x"], first_p["y"]
for nm, v in zip(dst_list[1:], vols[1:]):
self.station.move_relative_direct(0.0, 0.0, -float(down_dst_mm))
time.sleep(self.cfg.settle_s)
p = self._pt(nm)
dx, dy = p["x"] - base_x, p["y"] - base_y
self.station.move_relative_direct(float(dx), float(dy), 0.0)
time.sleep(self.cfg.settle_s)
self.station.move_relative_direct(0.0, 0.0, float(down_dst_mm))
time.sleep(self.cfg.settle_s)
self.station.pip_dsp(v)
time.sleep(self.cfg.settle_s)
base_x, base_y = p["x"], p["y"]
# —— 如果设定了加液后搅拌时间,则触发搅拌 ——
if stir_post_s is not None and float(stir_post_s) > 0:
try:
print(f"[搅拌] 加液完成,搅拌 {float(stir_post_s)} s ...")
self.stir_for(float(stir_post_s))
except Exception as e:
print(f"[搅拌] 触发失败:{e}(忽略,不影响主流程)")
# 映射 C 槽到 +48 的弃置位
def upgrade_c_name(name: str) -> str:
m = re.fullmatch(r"C(\d+)", name.strip().upper())
if not m:
return name
idx = int(m.group(1))
return f"C{idx + 48}"
tip_c_name_new = upgrade_c_name(tip_c_name)
# (10) 放枪头
self.drop_tip(tip_c_name_new, down_mm=60.0)
return True
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Remove unnecessary casts to int, str, float or bool [×5] ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
- Replace m.group(x) with m[x] for re.Match objects ([`use-getitem-for-re-match-groups`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/use-getitem-for-re-match-groups/))
</issue_to_address>
### Comment 18
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:543` </location>
<code_context>
def pushing(self,
tip_c_name: str) -> bool:
z_down_mm = 136.0
y_forward_mm = 60.0
_require(z_down_mm > 0, "z_down_mm 必须>0")
_require(y_forward_mm > 0, "y_forward_mm 必须>0")
_require(_zone_from_name(tip_c_name) in ("C"), "枪头点名应在C区")
self.pick_tip(tip_c_name)
d25_p = self._pt("D25")
self.station.move_to_work_safe(x=d25_p["x"], y=d25_p["y"], z=d25_p["z"])
time.sleep(self.cfg.settle_s)
print("到达 D25 点位")
self.station.move_relative_direct(0.0, 0.0, float(z_down_mm))
time.sleep(self.cfg.settle_s)
print("下压完成")
self.station.move_relative_direct(0.0, float(y_forward_mm), 0.0)
time.sleep(self.cfg.settle_s)
print("推动完成")
self.station.move_relative_direct(0.0, 0.0, -float(z_down_mm))
time.sleep(self.cfg.settle_s)
print("抬升完成")
def upgrade_c_name(name: str) -> str:
m = re.fullmatch(r"C(\d+)", name.strip().upper())
if not m:
return name
idx = int(m.group(1))
return f"C{idx + 48}"
tip_c_name_new = upgrade_c_name(tip_c_name)
self.drop_tip(tip_c_name_new)
print("推动流程完成")
return True
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Remove unnecessary casts to int, str, float or bool [×3] ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
- Replace m.group(x) with m[x] for re.Match objects ([`use-getitem-for-re-match-groups`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/use-getitem-for-re-match-groups/))
</issue_to_address>
### Comment 19
<location> `unilabos/devices/SynthonX/SynthonX_flow_v3.py:583-592` </location>
<code_context>
def load_for_nmr(self,
src_d_name: str, # 源:D区 (如 D1-D8)
dst_d_name: str, # 目标:D区 (如 D17-D24)
tip_c_name: str, # 枪头:C区
total_ul: float,
stir_post_s: Optional[float] = None) -> bool:
down_src_mm = 138
down_dst_mm = 9
_require(_zone_from_name(src_d_name) == "D", "源位必须在 D 区")
_require(_zone_from_name(dst_d_name) == "D", "目标位必须在 D 区")
_require(src_d_name != dst_d_name, "源与目标不能相同")
_require(_zone_from_name(tip_c_name) == "C", "枪头点名必须在 C 区")
return self._do_transfer(
src_name=src_d_name,
dst_names=dst_d_name,
tip_c_name=tip_c_name,
total_ul=float(total_ul),
down_src_mm=float(down_src_mm),
down_dst_mm=float(down_dst_mm),
split_volumes=None,
stir_post_s=stir_post_s
)
</code_context>
<issue_to_address>
**issue (code-quality):** Remove unnecessary casts to int, str, float or bool ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
</issue_to_address>
### Comment 20
<location> `unilabos/devices/SynthonX/SynthonX_gui.py:211` </location>
<code_context>
def move_relative_direct(self, dx: float, dy: float, dz: float, speed: Optional[int]=None, acc: Optional[int]=None) -> bool:
"""基于当前位置直接到新目标(工作坐标 Δmm),不抬Z;
策略:若目标Z>当前Z,先XY后Z;若目标Z<=当前Z,先Z后XY。
"""
assert self.xyz is not None
speed = speed or self.cfg.default_speed
acc = acc or self.cfg.default_acceleration
# 当前绝对步(机坐标步数)
sx = self.xyz.get_motor_status(MotorAxis.X).steps
sy = self.xyz.get_motor_status(MotorAxis.Y).steps
sz = self.xyz.get_motor_status(MotorAxis.Z).steps
# Δmm→Δsteps(Δ与零点无关,可直接换算)
tx = sx + self.xyz.mm_to_steps(MotorAxis.X, dx)
ty = sy + self.xyz.mm_to_steps(MotorAxis.Y, dy)
tz = sz + self.xyz.mm_to_steps(MotorAxis.Z, dz)
# 顺序:仅按相对大小决定
order = ("xy","z") if tz > sz else ("z","xy")
ok = True
try:
if "z" in order[0]:
ok &= self.xyz.move_to_steps(MotorAxis.Z, tz, speed, acc)
ok &= self.xyz.wait_for_completion(MotorAxis.Z, 20.0)
ok &= self.xyz.move_to_steps(MotorAxis.X, tx, speed, acc)
ok &= self.xyz.move_to_steps(MotorAxis.Y, ty, speed, acc)
ok &= self.xyz.wait_for_completion(MotorAxis.X, 20.0)
ok &= self.xyz.wait_for_completion(MotorAxis.Y, 20.0)
else:
ok &= self.xyz.move_to_steps(MotorAxis.X, tx, speed, acc)
ok &= self.xyz.move_to_steps(MotorAxis.Y, ty, speed, acc)
ok &= self.xyz.wait_for_completion(MotorAxis.X, 20.0)
ok &= self.xyz.wait_for_completion(MotorAxis.Y, 20.0)
ok &= self.xyz.move_to_steps(MotorAxis.Z, tz, speed, acc)
ok &= self.xyz.wait_for_completion(MotorAxis.Z, 20.0)
except Exception:
ok = False
return bool(ok)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Remove unnecessary casts to int, str, float or bool ([`remove-unnecessary-cast`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unnecessary-cast/))
```suggestion
return ok
```
</issue_to_address>
### Comment 21
<location> `unilabos/devices/SynthonX/SynthonX_gui.py:441` </location>
<code_context>
def xyz_move_relative_inputs(self):
s = self._need_station();
if not s: return
try:
sp = int(self.speed_var.get())
ac = int(self.acc_var.get())
dx, dy, dz = self.rx_var.get(), self.ry_var.get(), self.rz_var.get()
ok = s.move_relative_direct(dx, dy, dz, speed=sp, acc=ac)
if ok:
# 获取并输出当前绝对坐标(工作坐标)
pos = s.get_status_mm()
self.console.log(
f"相对位移(工作坐标):OK (Δx={dx}, Δy={dy}, Δz={dz}, speed={sp}, acc={ac}) → 绝对(x={pos['x']:.3f}, y={pos['y']:.3f}, z={pos['z']:.3f})"
)
# 刷新状态标签
try: self.xyz_refresh()
except Exception: pass
else:
self.console.log(f"相对位移(工作坐标):Fail (Δx={dx}, Δy={dy}, Δz={dz}, speed={sp}, acc={ac})")
except Exception as e:
messagebox.showerror("相对位移失败", str(e))
self.console.log(f"相对位移失败:{e}")
</code_context>
<issue_to_address>
**issue (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
</issue_to_address>
### Comment 22
<location> `unilabos/devices/SynthonX/SynthonX_gui.py:500` </location>
<code_context>
def xyz_move_relative(self):
s = self._need_station();
if not s: return
try:
sp = int(self.speed_var.get())
ac = int(self.acc_var.get())
dx, dy, dz = self.x_var.get(), self.y_var.get(), self.z_var.get()
ok = s.move_relative_direct(dx, dy, dz, speed=sp, acc=ac)
if ok:
pos = s.get_status_mm()
self.console.log(
f"相对直接移动(工作坐标):OK (Δx={dx}, Δy={dy}, Δz={dz}, speed={sp}, acc={ac}) → 绝对(x={pos['x']:.3f}, y={pos['y']:.3f}, z={pos['z']:.3f})"
)
try: self.xyz_refresh()
except Exception: pass
else:
self.console.log(f"相对直接移动(工作坐标):Fail (Δx={dx}, Δy={dy}, Δz={dz}, speed={sp}, acc={ac})")
except Exception as e:
messagebox.showerror("相对移动失败", str(e))
self.console.log(f"相对移动失败:{e}")
</code_context>
<issue_to_address>
**issue (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
</issue_to_address>
### Comment 23
<location> `unilabos/devices/SynthonX/SynthonX_gui.py:618-621` </location>
<code_context>
def _workspace_xy(self):
try:
if self.station and self.station.xyz:
mc = self.station.cfg
else:
mc = MachineConfig()
return float(mc.max_travel_x), float(mc.max_travel_y)
except Exception:
return 340.0, 250.0
</code_context>
<issue_to_address>
**suggestion (code-quality):** Replace if statement with if expression ([`assign-if-exp`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/assign-if-exp/))
```suggestion
mc = self.station.cfg if self.station and self.station.xyz else MachineConfig()
```
</issue_to_address>
### Comment 24
<location> `unilabos/devices/SynthonX/SynthonX_gui.py:717` </location>
<code_context>
def map_move_relative(self):
s = self._need_station();
if not s: return
name = self.map_selected.get().strip()
if name not in self.points:
messagebox.showwarning("未选择点", "请先在示意图上点击一个点"); return
p = self.points[name]
try:
cur = s.get_status_mm() # 工作坐标
dx, dy, dz = p['x']-cur['x'], p['y']-cur['y'], p['z']-cur['z']
except Exception:
dx, dy, dz = p['x'], p['y'], p['z']
sp, ac = int(self.speed_var.get()), int(self.acc_var.get())
try:
ok = s.move_relative_direct(dx, dy, dz, speed=sp, acc=ac)
if ok:
pos = s.get_status_mm()
self.console.log(
f"[示意图] 相对移动至 '{name}'(工作坐标): OK (Δx={dx:.3f}, Δy={dy:.3f}, Δz={dz:.3f}) → 绝对(x={pos['x']:.3f}, y={pos['y']:.3f}, z={pos['z']:.3f})"
)
try: self.draw_map()
except Exception: pass
else:
self.console.log(f"[示意图] 相对移动至 '{name}'(工作坐标): Fail (Δx={dx:.3f}, Δy={dy:.3f}, Δz={dz:.3f})")
except Exception as e:
messagebox.showerror("相对移动失败", str(e))
self.console.log(f"[示意图] 相对移动失败:{e}")
</code_context>
<issue_to_address>
**issue (code-quality):** Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
</issue_to_address>
### Comment 25
<location> `unilabos/devices/SynthonX/SynthonX_reactor.py:34` </location>
<code_context>
def connect(self):
if self.ser and self.ser.is_open:
return
try:
self.ser = serial.Serial(self.port, self.baudrate, timeout=self.timeout)
time.sleep(2) # 等待设备初始化
except Exception as e:
raise RuntimeError(f'连接串口失败: {e}')
</code_context>
<issue_to_address>
**suggestion (code-quality):** Explicitly raise from a previous error ([`raise-from-previous-error`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/raise-from-previous-error/))
```suggestion
raise RuntimeError(f'连接串口失败: {e}') from e
```
</issue_to_address>
### Comment 26
<location> `unilabos/devices/SynthonX/SynthonX_reactor.py:43-44` </location>
<code_context>
def _write(self, data: bytes, expect_response: bool = True) -> bytes | None:
if not self.ser or not self.ser.is_open:
raise RuntimeError('串口未连接, 请先调用 connect()')
self.ser.write(data)
if expect_response:
# 读取直到最后一个字节或超时
end_byte = data[-1:]
resp = self.ser.read_until(end_byte)
return resp
return None
</code_context>
<issue_to_address>
**suggestion (code-quality):** Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))
```suggestion
return self.ser.read_until(end_byte)
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| if z is not None and (z < 0 or z > self.cfg.max_travel_z): | ||
| raise CoordinateSystemError(f"Z out of range: {z}") | ||
|
|
||
| def move_to_work_safe(self, x=None, y=None, z=None, speed=None, accel=None) -> bool: |
There was a problem hiding this comment.
issue: Duplicate definition of move_to_work_safe detected.
Remove the first move_to_work_safe definition to prevent confusion and ensure consistent handling of CRC errors.
| self.station.move_relative_direct(0.0, 0.0, float(down_mm)) | ||
| time.sleep(self.cfg.settle_s) | ||
| self.station.move_relative_direct(0.0, 0.0, -float(down_mm)) |
There was a problem hiding this comment.
suggestion (bug_risk): No error handling for pipette pick/drop failures.
Currently, pick_tip and drop_tip always return True, even if move_relative_direct or pip_eject fail. Please add error handling or status checks to ensure failures are detected and handled appropriately.
Suggested implementation:
def pick_tip(self, tip_point: str, down_mm: float = 120) -> bool:
p = self._pt(tip_point)
ok1 = self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
time.sleep(self.cfg.settle_s)
ok2 = self.station.move_relative_direct(0.0, 0.0, float(down_mm))
time.sleep(self.cfg.settle_s)
ok3 = self.station.move_relative_direct(0.0, 0.0, -float(down_mm))
time.sleep(self.cfg.settle_s)
if not (ok1 and ok2 and ok3):
print(f'ERROR: {tip_point}枪头装载失败')
return False
print(f'{tip_point}枪头已经装载')
return True def drop_tip(self, tip_point: str, down_mm: float = 60) -> bool:
p = self._pt(tip_point)
ok1 = self.station.move_to_work_safe(x=p["x"], y=p["y"], z=p["z"])
time.sleep(self.cfg.settle_s)
ok2 = self.station.move_relative_direct(0.0, 0.0, float(down_mm))
time.sleep(self.cfg.settle_s)
ok3 = self.station.pip_eject()
time.sleep(self.cfg.settle_s)
if not (ok1 and ok2 and ok3):
print(f'ERROR: {tip_point}枪头卸载失败')
return False
print(f'{tip_point}枪头已经卸载')
return TrueIf move_to_work_safe, move_relative_direct, or pip_eject do not currently return a status (True/False), you will need to update those methods to do so, or use another way to detect failure (e.g., exception handling).
| while len(buf) < n: | ||
| if time.time() > deadline: | ||
| break | ||
| need = n - len(buf) | ||
| chunk = self.read(need) | ||
| if chunk: | ||
| buf += chunk | ||
| else: | ||
| time.sleep(0.001) |
There was a problem hiding this comment.
suggestion (code-quality): Move a guard clause in a while statement's body into its test (while-guard-to-condition)
| while len(buf) < n: | |
| if time.time() > deadline: | |
| break | |
| need = n - len(buf) | |
| chunk = self.read(need) | |
| if chunk: | |
| buf += chunk | |
| else: | |
| time.sleep(0.001) | |
| while len(buf) < n and not time.time() > deadline: | |
| need = n - len(buf) | |
| chunk = self.read(need) | |
| if chunk: | |
| buf += chunk | |
| else: | |
| time.sleep(0.001) | |
Explanation
Removing the guard clause simplifies the code and makes clearer the intention ofthe loop.
| self.ignore_crc_error = ignore_crc_error | ||
|
|
||
| def set_ignore_crc(self, flag: bool): | ||
| self.ignore_crc_error = bool(flag) |
There was a problem hiding this comment.
suggestion (code-quality): Remove unnecessary casts to int, str, float or bool (remove-unnecessary-cast)
| self.ignore_crc_error = bool(flag) | |
| self.ignore_crc_error = flag |
| crc >>= 1 | ||
| return crc.to_bytes(2, "little") | ||
|
|
||
| def _xfer(self, slave: int, payload: bytes, retries: int = 3) -> bytes: |
There was a problem hiding this comment.
issue (code-quality): Low code quality found in XYZModbus._xfer - 7% (low-code-quality)
Explanation
The quality score for this function is below the quality threshold of 25%.This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines. - Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.
| time.sleep(self.cfg.settle_s) | ||
|
|
||
| def _xy_rel(self, dx: float, dy: float): | ||
| self.station.move_relative_direct(float(dx), float(dy), 0.0) |
There was a problem hiding this comment.
suggestion (code-quality): Remove unnecessary casts to int, str, float or bool [×2] (remove-unnecessary-cast)
| self.station.move_relative_direct(float(dx), float(dy), 0.0) | |
| self.station.move_relative_direct(dx, dy, 0.0) |
| ok &= self.xyz.wait_for_completion(MotorAxis.Z, 20.0) | ||
| except Exception: | ||
| ok = False | ||
| return bool(ok) |
There was a problem hiding this comment.
suggestion (code-quality): Remove unnecessary casts to int, str, float or bool (remove-unnecessary-cast)
| return bool(ok) | |
| return ok |
| if self.station and self.station.xyz: | ||
| mc = self.station.cfg | ||
| else: | ||
| mc = MachineConfig() |
There was a problem hiding this comment.
suggestion (code-quality): Replace if statement with if expression (assign-if-exp)
| if self.station and self.station.xyz: | |
| mc = self.station.cfg | |
| else: | |
| mc = MachineConfig() | |
| mc = self.station.cfg if self.station and self.station.xyz else MachineConfig() |
| self.ser = serial.Serial(self.port, self.baudrate, timeout=self.timeout) | ||
| time.sleep(2) # 等待设备初始化 | ||
| except Exception as e: | ||
| raise RuntimeError(f'连接串口失败: {e}') |
There was a problem hiding this comment.
suggestion (code-quality): Explicitly raise from a previous error (raise-from-previous-error)
| raise RuntimeError(f'连接串口失败: {e}') | |
| raise RuntimeError(f'连接串口失败: {e}') from e |
| resp = self.ser.read_until(end_byte) | ||
| return resp |
There was a problem hiding this comment.
suggestion (code-quality): Inline variable that is immediately returned (inline-immediately-returned-variable)
| resp = self.ser.read_until(end_byte) | |
| return resp | |
| return self.ser.read_until(end_byte) |
全球数智教育创新大赛决赛SynthonX团队代码