mini.pkg_tool 源代码

import asyncio
import enum
import os

import websockets

import mini
from mini import WiFiDevice as _WiFiDevice
from mini.channels import msg_utils
from mini.pb2.pccodemao_message_pb2 import Message as _Message
from mini.pb2.pccodemao_messageheader_pb2 import MessageHeader as _MessageHeader

_found_devices = {}


@enum.unique
class _PCPyCmdId(enum.Enum):
    """
    脱机工具命令号
    """
    PYPI_INSTALL_WHEEL_REQUEST = 1
    """
    安装程序包
    """

    PYPI_UNINSTALL_WHEEL_REQUEST = 2
    """
    卸载程序包
    """

    PYPI_GET_WHEEL_INFO_REQUEST = 3
    """
    获取程序包信息
    """

    PYPI_RUN_WHEEL_REQUEST = 4
    """
    执行程序
    """

    PYPI_GET_WHEEL_LIST_REQUEST = 5
    """
    获取安装程序列表
    """

    PYPI_START_RECORD_REQUEST = 6
    """获取录音数据--不支持
    """

    PYPI_SWITCH_ADB_REQUEST = 7
    """开关ADB
    """

    PYPI_UPLOAD_SCRIPT_REQUEST = 8
    """上传python脚本到机器人
    """

    PYPI_CHECK_UPLOAD_SCRIPT_REQUEST = 9
    """检查python脚本是否已上传到机器人
    """

    PYPI_RUN_UPLOAD_SCRIPT_REQUEST = 10
    """执行已上传的某个python脚本
    """

    PYPI_STOP_UPLOAD_SCRIPT_REQUEST = 11
    """停止执行已上传的某个python脚本
    """

    PYPI_LIST_UPLOAD_SCRIPT_REQUEST = 12
    """获取已上传的某个python脚本列表
    """


def _read_into_buffer(filename) -> bytes:
    buf = bytearray(os.path.getsize(filename))
    with open(filename, 'rb') as f:
        f.readinto(buf)
    return bytes(buf)


async def _send_msg1(websocket, message: _Message) -> str:
    await websocket.send(msg_utils.base64_encode(message.SerializeToString()))
    result: str = ""
    while True:
        try:
            _data = await websocket.recv()
            _bytes = msg_utils.base64_decode(_data)
            msg: _Message = msg_utils.parse_msg(_bytes)
            header: _MessageHeader = msg.header
            if header.command == _PCPyCmdId.PYPI_INSTALL_WHEEL_REQUEST.value:
                from mini.tool.pb2.PyPi_InstallWheel_pb2 import InstallWheelResponse
                response: InstallWheelResponse = InstallWheelResponse()
                response.ParseFromString(msg.bodyData)
                print("{0}".format(response.message))
            elif header.command == _PCPyCmdId.PYPI_UNINSTALL_WHEEL_REQUEST.value:
                from mini.tool.pb2.PyPi_UninstallWheel_pb2 import UninstallWheelResponse
                response: UninstallWheelResponse = UninstallWheelResponse()
                response.ParseFromString(msg.bodyData)
                print("{0}".format(response.message))
            elif header.command == _PCPyCmdId.PYPI_RUN_WHEEL_REQUEST.value:
                from mini.tool.pb2.PyPi_RunWheel_pb2 import RunWheelResponse
                response: RunWheelResponse = RunWheelResponse()
                response.ParseFromString(msg.bodyData)
                print("{0}".format(response.message))
            elif header.command == _PCPyCmdId.PYPI_GET_WHEEL_INFO_REQUEST.value:
                from mini.tool.pb2.PyPi_GetWheelInfo_pb2 import GetWheelInfoResponse
                response: GetWheelInfoResponse = GetWheelInfoResponse()
                response.ParseFromString(msg.bodyData)
                return response.message
            elif header.command == _PCPyCmdId.PYPI_GET_WHEEL_LIST_REQUEST.value:
                from mini.tool.pb2.PyPi_GetWheelList_pb2 import GetWheelListResponse
                response: GetWheelListResponse = GetWheelListResponse()
                response.ParseFromString(msg.bodyData)
                if response.resultCode != 0:
                    return response.error
                else:
                    return "\n".join(response.Wheels)
            elif header.command == _PCPyCmdId.PYPI_UPLOAD_SCRIPT_REQUEST.value:
                from mini.tool.pb2.PyPi_UploadScript_pb2 import UploadScriptResponse
                response: UploadScriptResponse = UploadScriptResponse()
                response.ParseFromString(msg.bodyData)
                print("command {0} return <{1}, {2}>".format(header.command, response.resultCode, response.message))
                return response.message
            elif header.command == _PCPyCmdId.PYPI_CHECK_UPLOAD_SCRIPT_REQUEST.value:
                from mini.tool.pb2.PyPi_UploadScript_pb2 import UploadScriptResponse
                response: UploadScriptResponse = UploadScriptResponse()
                response.ParseFromString(msg.bodyData)
                print("command {0} return <{1}, {2}>".format(header.command, response.resultCode, response.message))
                return response.message
            elif header.command == _PCPyCmdId.PYPI_RUN_UPLOAD_SCRIPT_REQUEST.value:
                from mini.tool.pb2.PyPi_UploadScript_pb2 import UploadScriptResponse
                response: UploadScriptResponse = UploadScriptResponse()
                response.ParseFromString(msg.bodyData)
                print("command {0} return <{1}, {2}>".format(header.command, response.resultCode, response.message))
                return response.message
            elif header.command == _PCPyCmdId.PYPI_STOP_UPLOAD_SCRIPT_REQUEST.value:
                from mini.tool.pb2.PyPi_UploadScript_pb2 import UploadScriptResponse
                response: UploadScriptResponse = UploadScriptResponse()
                response.ParseFromString(msg.bodyData)
                print("command {0} return <{1}, {2}>".format(header.command, response.resultCode, response.message))
                return response.message
            elif header.command == _PCPyCmdId.PYPI_LIST_UPLOAD_SCRIPT_REQUEST.value:
                from mini.tool.pb2.PyPi_UploadScript_pb2 import ListUploadScriptResponse
                response: ListUploadScriptResponse = ListUploadScriptResponse()
                response.ParseFromString(msg.bodyData)
                print("command {0} return {1}".format(header.command, response.uploadScripts))
                return "ex"
            elif header.target == -1:
                print(f"Unsupported cmd={header.command}")
            else:
                print(f"Unsupported cmd={header.command}")

        except Exception as e:
            if isinstance(e, websockets.ConnectionClosedOK):
                # print(f"connection closed ok!")
                break
            else:
                raise e
    return result


async def _send_msg0(message: _Message, device: _WiFiDevice) -> str:
    try:
        async with websockets.connect('ws://{}:{!r}'.format(device.address, 8801)) as websocket:
            return await _send_msg1(websocket, message)
    except Exception as e:
        return ""


def _get_eggInfo_path():
    return _get_file('.', 'egg-info', True)


def _get_file(dir_path: str, suffix: str, is_dir: bool = False):
    for temp_path in os.listdir(dir_path):
        temp_path = os.path.join(dir_path, temp_path)
        if is_dir:
            if not os.path.isdir(temp_path):
                continue
        else:
            if not os.path.isfile(temp_path):
                continue

        if temp_path.endswith(suffix):
            result = os.path.abspath(temp_path)
            return result


def _remove_dir(dir_path: str):
    """
    删除目录
    :param dir_path: 需要删除的目录
    """
    if isinstance(dir_path, str) and os.path.exists(dir_path) and os.path.isdir(dir_path):
        import shutil
        shutil.rmtree(dir_path)
    elif dir_path is not None and os.path.exists(dir_path):
        os.remove(dir_path)


[文档]def setup_py_pkg(project_dir: str) -> str: """ 将一个py工程打包成一个.whl文件。 Args: project_dir: 工程文件根目录 Returns: str : 生成的.whl文件绝对路径 """ # 校验目录 if not os.path.isdir(project_dir): print('project_dir must be a directory') return "" # 将当前进程工作目录切换到工程目录 os.chdir(project_dir) # 清空产物目录 build_path = 'build' egg_info_path = _get_eggInfo_path() dist_path = 'dist' _remove_dir(build_path) _remove_dir(egg_info_path) _remove_dir(dist_path) os.mkdir(build_path) os.mkdir(dist_path) # 找到setup文件 setup_path = 'setup.py' if not os.path.isfile(setup_path): print('setup.py not exist') return "" # 打包 import sys if sys.version_info < (3, 0): os.system(f"python {setup_path} sdist bdist_wheel") else: import platform if platform.system() == 'Windows': os.system(f'python {setup_path} sdist bdist_wheel') else: os.system(f'python3 {setup_path} sdist bdist_wheel') # 删除其他临时文件 egg_info_path = _get_eggInfo_path() _remove_dir(build_path) _remove_dir(egg_info_path) result = _get_file(dist_path, '.whl') print(f'result {result}') return result
def _build_install_py_pkg_msg(package_path: str, debug: bool) -> _Message: from mini.tool.pb2.PyPi_InstallWheel_pb2 import InstallWheelRequest request = InstallWheelRequest() # 获取文件路径中的文件名 request.wheelName = os.path.basename(package_path) # 把文件转成字节 request.serializePacket = _read_into_buffer(package_path) request.debug = debug cmd_id = _PCPyCmdId.PYPI_INSTALL_WHEEL_REQUEST.value # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message
[文档]def install_py_pkg(package_path: str, robot_id: str, debug: bool = False): """ 将一个py程序安装包,安装到指定序列号的机器人上。 Args: package_path: 安装包的绝对路径 robot_id:机器人序列号 debug:是否打印在机器人端卸载pkg时的log Returns: None """ # 校验文件 if not os.path.isfile(package_path): print(f'file is not exist:{package_path}') return base_name = os.path.basename(package_path) if not base_name.endswith('.whl'): print(f'Not a PiPy package ') return device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"Can't find AlphaMini of id (:{robot_id})") return else: _found_devices[robot_id] = device # 上传 asyncio.run(_send_msg0(_build_install_py_pkg_msg(package_path, debug), device))
def _build_uninstall_py_pkg_msg(pkg_name: str, debug: bool) -> _Message: from mini.tool.pb2.PyPi_UninstallWheel_pb2 import UninstallWheelRequest request = UninstallWheelRequest() # pkg name request.wheelName = os.path.basename(pkg_name) request.debug = debug cmd_id = _PCPyCmdId.PYPI_UNINSTALL_WHEEL_REQUEST.value # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message
[文档]def uninstall_py_pkg(pkg_name: str, robot_id: str, debug: bool = False): """ 将一个安装的py程序,从指定序列号的机器人上卸载掉,例如有一个py程序,它的setup.py文件, 配置如下: setuptools.setup( name="tts_demo", ...#省略 ), 那么,它的pkg_name为"tts_demo"。 Args: pkg_name : 程序名称 robot_id : 机器人序列号 debug : 是否打印在机器人端卸载pkg时的log Returns: None """ device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"Can't find AlphaMini of id (:{robot_id})") return else: _found_devices[robot_id] = device # 卸载 asyncio.run(_send_msg0(_build_uninstall_py_pkg_msg(pkg_name, debug), device))
def _build_query_py_pkg_msg(pkg_name: str) -> _Message: from mini.tool.pb2.PyPi_GetWheelInfo_pb2 import GetWheelInfoRequest request = GetWheelInfoRequest() # pkg name request.wheelName = os.path.basename(pkg_name) cmd_id = _PCPyCmdId.PYPI_GET_WHEEL_INFO_REQUEST.value # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message
[文档]def query_py_pkg(pkg_name: str, robot_id: str) -> str: """ 查询机器人上pkg_name指定的py程序, 其详细信息, 例如有一个py程序,它的setup.py文件, 配置如下: setuptools.setup( name="tts_demo", version="0.0.2", author='Gino Deng', author_email='jingjing.deng@ubtrobot.com', description="demo with mini_sdk", long_description='demo with mini_sdk,xxxxxxx', long_description_content_type="text/markdown", license="GPLv3", packages=setuptools.find_packages(), classifiers=[ "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ], install_requires=[ 'alphamini', ], ), 查询时, 指定pkg_name="tts_demo", 返回信息如下: Name: tts-demo Version: 0.0.2 Summary: demo with mini_sdk Home-page: UNKNOWN Author: Gino Deng Author-email: jingjing.deng@ubtrobot.com License: GPLv3 Location: /data/data/com.termux/files/usr/lib/python3.8/site-packages Requires: alphamini Required-by: Args: pkg_name : py程序名, robot_id : 机器人序列号 Returns: str : 安装包相信信息 """ device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"Can't find AlphaMini of id (:{robot_id})") return "" else: _found_devices[robot_id] = device # 查询 return asyncio.run(_send_msg0(_build_query_py_pkg_msg(pkg_name), device))
def _build_list_py_pkg_msg(): from mini.tool.pb2.PyPi_GetWheelList_pb2 import GetWheelListRequest request = GetWheelListRequest() cmd_id = _PCPyCmdId.PYPI_GET_WHEEL_LIST_REQUEST.value # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message
[文档]def list_py_pkg(robot_id: str) -> str: """ 列出机器人上安装的py程序, 例如: Package Version ---------- ------- alphamini 1.1.1 ifaddr 0.1.7 pip 20.1.1 protobuf 3.12.2 setuptools 47.3.1 six 1.15.0 tts-demo 0.0.2 websockets 8.1 wheel 0.34.2 Args: robot_id: 机器人序列号 Returns: str : 所有py程序名称-版本号 """ device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"Can't find AlphaMini of id (:{robot_id})") return "" else: _found_devices[robot_id] = device # 查询 return asyncio.run(_send_msg0(_build_list_py_pkg_msg(), device))
def _build_run_py_pkg_msg(entry_point: str, debug: bool) -> _Message: from mini.tool.pb2.PyPi_RunWheel_pb2 import RunWheelRequest request = RunWheelRequest() # pkg main entry request.wheelName = entry_point request.debug = debug cmd_id = _PCPyCmdId.PYPI_RUN_WHEEL_REQUEST.value # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message
[文档]def run_py_pkg(entry_point: str, robot_id: str, debug: bool = False): """ 触发一个指定的py程序, 在指定的机器人上运行。: Args: entry_point: py程序console scripts名称, 例如在setup.py文件中,如配置 entry_points={ 'console_scripts': [ 'XXX = Packages.Modules:XXX' ], }, 程序入口名称为xxx robot_id: 机器人序列号 debug: 是否回传程序执行时日志 Returns: None """ device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"Can't find AlphaMini of id (:{robot_id})") return else: _found_devices[robot_id] = device # 触发 asyncio.run(_send_msg0(_build_run_py_pkg_msg(entry_point, debug), device))
def _build_switch_adb_msg(switch: bool): from mini.tool.pb2.PyPi_AdbSwitch_pb2 import AdbSwitchRequest request = AdbSwitchRequest() # pkg main entry request.open = switch cmd_id = _PCPyCmdId.PYPI_SWITCH_ADB_REQUEST.value # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message
[文档]def switch_adb(robot_id: str, switch: bool = True): """ 打开机器人ADB调试开关 Args: switch: bool, True or False robot_id: 机器人序列号 Returns: None """ device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"Can't find AlphaMini of id (:{robot_id})") return else: _found_devices[robot_id] = device # 触发 asyncio.run(_send_msg0(_build_switch_adb_msg(switch), device))
def _build_upload_script_msg(file_name: str = None, content: bytes = None, cmd_id: int = 1): from mini.tool.pb2.PyPi_UploadScript_pb2 import UploadScript request = UploadScript() if file_name is not None: request.fileName = file_name if content is not None: request.content = content # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message # 上传python脚本到机器人
[文档]def upload_script(cmd_id: int, robot_id: str, file_name: str = None, content: bytes = None): if cmd_id == 1: py_cmd_id = _PCPyCmdId.PYPI_UPLOAD_SCRIPT_REQUEST.value elif cmd_id == 2: py_cmd_id = _PCPyCmdId.PYPI_CHECK_UPLOAD_SCRIPT_REQUEST.value elif cmd_id == 3: py_cmd_id = _PCPyCmdId.PYPI_RUN_UPLOAD_SCRIPT_REQUEST.value elif cmd_id == 4: py_cmd_id = _PCPyCmdId.PYPI_STOP_UPLOAD_SCRIPT_REQUEST.value elif cmd_id == 5: py_cmd_id = _PCPyCmdId.PYPI_LIST_UPLOAD_SCRIPT_REQUEST.value else: py_cmd_id = _PCPyCmdId.PYPI_LIST_UPLOAD_SCRIPT_REQUEST.value device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"Can't find AlphaMini of id (:{robot_id})") return else: _found_devices[robot_id] = device # 触发 return asyncio.run(_send_msg0(_build_upload_script_msg(file_name, content, py_cmd_id), device))