SIMCOM Quectel Fbcom 方便 自定义指令 检查网络状态的程序

SIMCOM Quectel Fbcom 方便 自定义指令 检查网络状态的程序 原理类似echo , 不用额外安装串口库等sudo cat /dev/ttyUSB2 sudo echo -ne ATCUSBPIDSWITCH9011,1,1\r\n | sudo tee /dev/ttyUSB2 /dev/null方便一次发送多个指令的程序#!/usr/bin/env python3 import os,termios,time,select,sys ports[/dev/ttyUSB2,/dev/ttyUSB3,/dev/ttyUSB4] baudtermios.B115200 def openp(p): fdos.open(p,os.O_RDWR|os.O_NOCTTY|os.O_NONBLOCK) oldtermios.tcgetattr(fd) atermios.tcgetattr(fd) a[0]a[1]a[3]0 a[2]baud|termios.CS8|termios.CREAD|termios.CLOCAL a[4]a[5]baud a[6][termios.VMIN]0 a[6][termios.VTIME]0 termios.tcsetattr(fd,termios.TCSANOW,a) termios.tcflush(fd,termios.TCIOFLUSH) return fd,old def rd(fd,t1.5,show1): endtime.time()t rb while time.time()end: a,_,_select.select([fd],[],[],0.2) if a: dos.read(fd,4096) if d: rd if show: print(d.decode(errorsreplace),end) endtime.time()0.3 return r def wr(fd,s): os.write(fd,(s\r\n).encode()) fdoldNone portNone for p in ports: if not os.path.exists(p): continue try: fd,oldopenp(p) wr(fd,AT) rrd(fd,2,0).decode(errorsignore) if OK in r: portp break termios.tcsetattr(fd,termios.TCSANOW,old) os.close(fd) except: pass if not port: print(ttyUSB2 ttyUSB3 ttyUSB4 都无响应) sys.exit(1) print(使用,port,115200) print(多条用 ; 分隔 退出输入 q) try: while True: sinput(AT ).strip() if s.lower() in (q,quit,exit): break for c in s.replace(;,\n).splitlines(): cc.strip() if not c: continue print(,c) wr(fd,c) if not rd(fd,1.5,1): print(超时) print(---) finally: termios.tcsetattr(fd,termios.TCSANOW,old) os.close(fd)方便选择拨号上网指令的程序#!/usr/bin/env python3 import os,termios,time,select,sys PORTS[/dev/ttyUSB2,/dev/ttyUSB3,/dev/ttyUSB4] BAUDtermios.B115200 COMMANDS AT ATE1 ATCSQ ATCUSBCFGUSBID,1E0E,9001 ATCUSBCFGUSBID,1E0E,9011 ATCUSBCFGUSBID,1E0E,9018 ATQCFGusbnet,0 ATQCFGusbnet,1 ATQCFGusbnet,2 ATQCFGusbnet,3 ATCFUN1,1 # def openp(p): fdos.open(p,os.O_RDWR|os.O_NOCTTY|os.O_NONBLOCK) oldtermios.tcgetattr(fd) atermios.tcgetattr(fd) a[0]a[1]a[3]0 a[2]BAUD|termios.CS8|termios.CREAD|termios.CLOCAL a[4]a[5]BAUD a[6][termios.VMIN]0 a[6][termios.VTIME]0 termios.tcsetattr(fd,termios.TCSANOW,a) termios.tcflush(fd,termios.TCIOFLUSH) return fd,old def wr(fd,s): os.write(fd,(s\r\n).encode()) def rd(fd,t1.8,show1): endtime.time()t rb while time.time()end: a,_,_select.select([fd],[],[],0.2) if a: dos.read(fd,4096) if d: rd if show: print(d.decode(errorsreplace),end) endtime.time()0.3 return r def closep(fd,old): termios.tcsetattr(fd,termios.TCSANOW,old) os.close(fd) fdoldNone PORTNone for p in PORTS: if not os.path.exists(p): continue try: fd,oldopenp(p) wr(fd,AT) if OK in rd(fd,2,0).decode(errorsignore): PORTp break closep(fd,old) fdoldNone except: pass if not PORT: print(ttyUSB2 ttyUSB3 ttyUSB4 都无响应) sys.exit(1) cmds[] for x in COMMANDS.splitlines(): xx.strip() if x and not x.startswith(#): cmds.append(x) print(使用,PORT,115200) print(选择编号发送 a 全部发送 m 手动输入 q 退出) try: while True: print(\n指令列表) for i,c in enumerate(cmds,1): print(f{i}. {c}) sinput(\n选择: ).strip().lower() if s in (q,quit,exit): break if sm: sinput(AT ).strip() todo[x.strip() for x in s.replace(;,\n).splitlines() if x.strip()] elif sa: todocmds else: try: todo[cmds[int(s)-1]] except: print(选择无效) continue for c in todo: print(\n,c) wr(fd,c) if not rd(fd,2,1): print(超时) print(---) time.sleep(1) finally: if fd: closep(fd,old) print(\n串口已关闭)自定义指令方案1#!/usr/bin/env python3 import subprocess, time TTY ttyUSB2 COMMANDS AT ATE1 ATCSQ PORT /dev/ TTY cat subprocess.Popen([sudo, timeout, 6, cat, PORT]) time.sleep(0.2) for cmd in COMMANDS.strip().splitlines(): subprocess.run( [sudo, tee, PORT], input(cmd.strip() \r\n).encode(), stdoutsubprocess.DEVNULL ) time.sleep(0.5) cat.wait()方案2#!/usr/bin/env python3 import os import termios import time import select # 配置区 PORT /dev/ttyUSB3 BAUD termios.B115200 COMMANDS AT ATE1 ATCSQ # cmds [x.strip() for x in COMMANDS.strip().splitlines() if x.strip()] fd os.open(PORT, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) old termios.tcgetattr(fd) try: attr termios.tcgetattr(fd) # 输入 输出 本地模式 关闭特殊处理 attr[0] 0 attr[1] 0 attr[3] 0 # 8N1 本地连接 允许接收 attr[2] BAUD | termios.CS8 | termios.CREAD | termios.CLOCAL # 关键 这里必须设置输入输出波特率 attr[4] BAUD attr[5] BAUD # 非阻塞读 attr[6][termios.VMIN] 0 attr[6][termios.VTIME] 0 termios.tcsetattr(fd, termios.TCSANOW, attr) termios.tcflush(fd, termios.TCIOFLUSH) print(f{PORT} 已打开 115200 8N1) for i, cmd in enumerate(cmds, 1): data (cmd \r\n).encode() os.write(fd, data) print(f\n[{i}] {cmd}) resp b end time.time() 3 while time.time() end: r, _, _ select.select([fd], [], [], 0.2) if not r: continue chunk os.read(fd, 4096) if chunk: resp chunk print(chunk.decode(errorsreplace), end) if b\r\nOK\r\n in resp or b\r\nERROR\r\n in resp: break if not resp: print(超时未收到响应) print(\n---) finally: termios.tcsetattr(fd, termios.TCSANOW, old) os.close(fd) print(串口已关闭)拨号指令import os, termios, time, select fd os.open(/dev/ttyUSB2, os.O_RDWR) attr termios.tcgetattr(fd) attr[2] termios.B115200 | termios.CS8 | termios.CREAD | termios.CLOCAL attr[4] attr[5] 0 attr[6][termios.VMIN] 0 attr[6][termios.VTIME] 20 # 2秒超时 termios.tcsetattr(fd, termios.TCSANOW, attr) termios.tcflush(fd, termios.TCIOFLUSH) #cmd ATCUSBPIDSWITCH9011,1,1\r cmd ATCUSBPIDSWITCH9003,1,1\r os.write(fd, cmd.encode()) time.sleep(0.1) resp b deadline time.monotonic() 3 while time.monotonic() deadline: r, _, _ select.select([fd], [], [], 0.2) if r: data os.read(fd, 1024) if not data: break resp data if bOK in resp or bERROR in resp: break print(resp.decode(errorsreplace)) termios.tcsetattr(fd, termios.TCSANOW, termios.tcgetattr(fd)) os.close(fd)选择查询网络状态#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import re import sys import glob import time import locale import curses import queue import threading import serial import serial.tools.list_ports as list_ports 默认波特率 115200 串口对象 None 接收队列 queue.Queue() 运行中 True 串口写锁 threading.Lock() 批量发送锁 threading.Lock() 自动测试完成事件 threading.Event() 厂家别名 { SIMCOM: SIMCOM, SIM: SIMCOM, SIMTECH: SIMCOM, QUECTEL: QUECTEL, QC: QUECTEL, 移远: QUECTEL, FIBOCOM: FIBOCOM, FBCOM: FIBOCOM, FIBO: FIBOCOM, 广和通: FIBOCOM, } 测试指令表 { SIMCOM: [ (检查SIM卡是否接触良好, ATCPIN?), (检查APN配置与否, ATCGDCONT?), (检查运营商接入情况, ATCOPS?), (检查联网情况, ATCPSI?), (检查是否成功注册到网络, ATCGREG?), (检查当前环境的信号质量, ATCSQ), (检查网络模式设置是否正确, ATCNMP?), (检查是否开射频 关闭飞行模式, ATCFUN?), (检查固件版本, ATSIMCOMATI), ], QUECTEL: [ (开启回显, ATE1), (开启详细错误提示, ATCMEE2), (检查USB拨号模式, ATQCFGusbnet), (检查SIM卡是否接触良好, ATCPIN?), (检查运营商接入情况, ATCOPS?), (检查当前驻网小区, ATQENGservingcell), (检查APN配置与否, ATCGDCONT?), (检查网络模式偏好, ATQNWPREFCFGmode_pref), (检查5G注册状态, ATC5GREG?), (检查固件版本, ATQGMR), (检查模块信息, ATI), ], FIBOCOM: [ (开启回显, ATE1), (检查模块信息, ATI), (检查SIM卡是否接触良好, ATCPIN?), (检查运营商接入情况, ATCOPS?), (检查APN配置与否, ATCGDCONT?), (开启自动连接, ATGTAUTOCONNECT1), (检查USB模式, ATGTUSBMODE?), (检查RNDIS状态, ATGTRNDIS?), ], } def 安全显示(stdscr, y, x, text, attr0): try: height, width stdscr.getmaxyx() if y 0 or y height: return if x 0 or x width: return text str(text) text text[:max(1, width - x - 1)] if attr: stdscr.addstr(y, x, text, attr) else: stdscr.addstr(y, x, text) except Exception: pass def 获取串口列表(): ports set() try: for p in list_ports.comports(): if p.device: ports.add(p.device) except Exception: pass patterns [ /dev/serial/by-id/*, /dev/ttyUSB*, /dev/ttyACM*, /dev/ttyAMA*, /dev/ttyS*, ] for pattern in patterns: for dev in glob.glob(pattern): if os.path.exists(dev): ports.add(dev) def 排序规则(dev): name os.path.basename(dev) if dev.startswith(/dev/serial/by-id/): group 0 elif name.startswith(ttyS): group 1 elif name.startswith(ttyUSB): group 2 elif name.startswith(ttyACM): group 3 elif name.startswith(ttyAMA): group 4 else: group 9 nums re.findall(r\d, name) num int(nums[-1]) if nums else 9999 return group, num, dev return sorted(ports, key排序规则) def 补全串口名称(name): name name.strip() if not name: return if name.startswith(/dev/): return name if name.startswith(serial/by-id/): return /dev/ name return /dev/ name def 识别厂家(name): if not name: return key name.strip().upper() return 厂家别名.get(key, ) def 接收线程(): global 串口对象 global 运行中 while 运行中: try: if 串口对象 is None or not 串口对象.is_open: time.sleep(0.05) continue data 串口对象.read(4096) if data: text data.decode(utf-8, errorsreplace) 接收队列.put(text) except Exception as e: 接收队列.put(\n[接收异常] str(e) \n) break def 发送一条AT指令(指令, 来源手动发送): global 串口对象 指令 指令.strip() if not 指令: return try: with 串口写锁: if 串口对象 is None or not 串口对象.is_open: 接收队列.put(\n[发送失败] 串口未打开\n) return 接收队列.put(f\n[{来源}] {指令}\n) 串口对象.write((指令 \r\n).encode(utf-8)) except Exception as e: 接收队列.put(\n[发送失败] str(e) \n) def 自动发送测试指令(厂家): global 运行中 if not 批量发送锁.acquire(blockingFalse): 接收队列.put(\n[提示] 上一轮测试指令还没有发送完成\n) return try: 自动测试完成事件.clear() time.sleep(0.5) cmds 测试指令表.get(厂家, []) 接收队列.put(f\n 开始自动发送 {厂家} 测试指令 \n) for 说明, 指令 in cmds: if not 运行中: break 接收队列.put(f\n[说明] {说明}\n) 发送一条AT指令(指令, 自动发送) time.sleep(1) 接收队列.put(f\n {厂家} 测试指令发送完成 \n) 接收队列.put(\n现在可以在底部输入自定义 AT 指令 回车发送\n) 自动测试完成事件.set() finally: 批量发送锁.release() def 选择项目界面(stdscr, 标题, 项目列表, 空提示没有可选项目): curses.curs_set(0) stdscr.keypad(True) stdscr.timeout(-1) index 0 top 0 while True: stdscr.erase() height, width stdscr.getmaxyx() 安全显示(stdscr, 0, 0, 标题 ↑↓移动 Enter选中 q退出 r刷新) if not 项目列表: 安全显示(stdscr, 2, 0, 空提示) else: 可显示行数 max(1, height - 3) if index top: top index if index top 可显示行数: top index - 可显示行数 1 显示列表 项目列表[top:top 可显示行数] for i, item in enumerate(显示列表): real_index top i y i 2 if real_index index: 安全显示(stdscr, y, 0, item, curses.A_REVERSE) else: 安全显示(stdscr, y, 0, item) stdscr.noutrefresh() curses.doupdate() key stdscr.getch() if key in (ord(q), ord(Q)): return None if key in (ord(r), ord(R)) and 串口 in 标题: 项目列表[:] 获取串口列表() index 0 top 0 continue if not 项目列表: continue if key curses.KEY_UP: index max(0, index - 1) elif key curses.KEY_DOWN: index min(len(项目列表) - 1, index 1) elif key in (10, 13, curses.KEY_ENTER): return 项目列表[index] def 打开串口(port, baud): return serial.Serial( portport, baudratebaud, bytesizeserial.EIGHTBITS, parityserial.PARITY_NONE, stopbitsserial.STOPBITS_ONE, timeout0.05, write_timeout1 ) def 串口调试界面(stdscr, 厂家, port, baud): global 串口对象 global 运行中 curses.curs_set(1) stdscr.keypad(True) stdscr.timeout(100) 接收显示 [] 输入内容 AT历史 [] 历史位置 None 需要刷新 True 上次刷新时间 0 try: 串口对象 打开串口(port, baud) 串口对象.reset_input_buffer() 串口对象.reset_output_buffer() except Exception as e: stdscr.erase() 安全显示(stdscr, 0, 0, 打开串口失败) 安全显示(stdscr, 2, 0, str(e)) 安全显示(stdscr, 4, 0, 按任意键退出) stdscr.noutrefresh() curses.doupdate() stdscr.timeout(-1) stdscr.getch() return t1 threading.Thread(target接收线程, daemonTrue) t1.start() t2 threading.Thread(target自动发送测试指令, args(厂家,), daemonTrue) t2.start() def 重画界面(): stdscr.erase() height, width stdscr.getmaxyx() 状态行 f{厂家} {port} {baud}bps CtrlX退出 CtrlL清屏 CtrlT重发测试 CtrlU清输入 Enter发送 安全显示(stdscr, 0, 0, 状态行, curses.A_REVERSE) 接收文本 .join(接收显示) 接收行 接收文本.splitlines(True) 可显示行数 max(1, height - 5) 显示行 接收行[-可显示行数:] for i, line in enumerate(显示行): line line.replace(\r, ) 安全显示(stdscr, i 1, 0, line) if 自动测试完成事件.is_set(): 提示文字 自定义AT模式 输入 AT 指令后回车发送 ↑↓调历史 else: 提示文字 正在自动发送测试指令 也可以提前输入 AT 指令回车发送 安全显示(stdscr, height - 3, 0, - * (width - 1)) 安全显示(stdscr, height - 2, 0, 提示文字) 输入提示 输入内容 安全显示(stdscr, height - 1, 0, 输入提示) try: stdscr.move(height - 1, min(len(输入提示), width - 1)) except Exception: pass stdscr.noutrefresh() curses.doupdate() while True: 有新数据 False try: while True: data 接收队列.get_nowait() 接收显示.append(data) 有新数据 True except queue.Empty: pass if len(接收显示) 1000: 接收显示 接收显示[-500:] 有新数据 True if 有新数据: 需要刷新 True key stdscr.getch() if key ! -1: 需要刷新 True if key 24: break elif key 12: 接收显示.clear() elif key 21: 输入内容 历史位置 None elif key 20: t2 threading.Thread(target自动发送测试指令, args(厂家,), daemonTrue) t2.start() elif key curses.KEY_UP: if AT历史: if 历史位置 is None: 历史位置 len(AT历史) - 1 else: 历史位置 max(0, 历史位置 - 1) 输入内容 AT历史[历史位置] elif key curses.KEY_DOWN: if AT历史 and 历史位置 is not None: 历史位置 1 if 历史位置 len(AT历史): 历史位置 None 输入内容 else: 输入内容 AT历史[历史位置] elif key in (10, 13, curses.KEY_ENTER): 指令 输入内容.strip() if 指令: 发送一条AT指令(指令, 自定义发送) if not AT历史 or AT历史[-1] ! 指令: AT历史.append(指令) if len(AT历史) 50: AT历史 AT历史[-50:] 输入内容 历史位置 None elif key in (curses.KEY_BACKSPACE, 127, 8): 输入内容 输入内容[:-1] 历史位置 None elif 32 key 126: 输入内容 chr(key) 历史位置 None 当前时间 time.time() if 需要刷新 and 当前时间 - 上次刷新时间 0.15: 重画界面() 上次刷新时间 当前时间 需要刷新 False 运行中 False try: if 串口对象 and 串口对象.is_open: 串口对象.close() except Exception: pass def 解析命令参数(): args sys.argv[1:] 厂家 port baud 默认波特率 for arg in args: maybe_vendor 识别厂家(arg) if maybe_vendor: 厂家 maybe_vendor continue if arg.isdigit(): baud int(arg) continue port 补全串口名称(arg) return 厂家, port, baud def 主程序(stdscr): global 运行中 厂家, port, baud 解析命令参数() if not 厂家: 厂家 选择项目界面( stdscr, 请选择模组厂家, [SIMCOM, QUECTEL, FIBOCOM], 没有可选厂家 ) if not 厂家: return if not port: ports 获取串口列表() port 选择项目界面( stdscr, 请选择串口, ports, 没有识别到串口 请检查设备连接 或尝试 sudo 运行 ) if not port: return 运行中 True 串口调试界面(stdscr, 厂家, port, baud) if __name__ __main__: locale.setlocale(locale.LC_ALL, ) try: curses.wrapper(主程序) except KeyboardInterrupt: pass