Source code for mini.pkg_tool

import asyncio
import enum
import os

import websockets

import mini
from mini import WiFiDevice as _WiFiDevice
from mini.channels import msg_utils
from mini.pb2.pccodemao_message_pb2 import Message as _Message
from mini.pb2.pccodemao_messageheader_pb2 import MessageHeader as _MessageHeader

_found_devices = {}


@enum.unique
class _PCPyCmdId(enum.Enum):
    """
    Tool command number
     """
    PYPI_INSTALL_WHEEL_REQUEST = 1
    """
    Installation package
    """

    PYPI_UNINSTALL_WHEEL_REQUEST = 2
    """
    Uninstall package
    """

    PYPI_GET_WHEEL_INFO_REQUEST = 3
    """
    Get package information
    """

    PYPI_RUN_WHEEL_REQUEST = 4
    """
    execute program
    """

    PYPI_GET_WHEEL_LIST_REQUEST = 5
    """
    Get a list of installers
    """

    PYPI_START_RECORD_REQUEST = 6
    """Get recording data, only supported by Mini
    """

    PYPI_SWITCH_ADB_REQUEST = 7
    """ switch ADB
    """

    PYPI_UPLOAD_SCRIPT_REQUEST = 8
    """Upload python script to the robot
    """

    PYPI_CHECK_UPLOAD_SCRIPT_REQUEST = 9
    """Check if the python script has been uploaded to the robot
    """

    PYPI_RUN_UPLOAD_SCRIPT_REQUEST = 10
    """Execute a python script that has been uploaded
    """

    PYPI_STOP_UPLOAD_SCRIPT_REQUEST = 11
    """Stop executing a certain python script that has been uploaded
    """

    PYPI_LIST_UPLOAD_SCRIPT_REQUEST = 12
    """Get a list of uploaded python scripts
    """


def _read_into_buffer(filename) -> bytes:
    buf = bytearray(os.path.getsize(filename))
    with open(filename, 'rb') as f:
        f.readinto(buf)
    return bytes(buf)


async def _send_msg1(websocket, message: _Message) -> str:
    await websocket.send(msg_utils.base64_encode(message.SerializeToString()))
    result: str = ""
    while True:
        try:
            _data = await websocket.recv()
            _bytes = msg_utils.base64_decode(_data)
            msg: _Message = msg_utils.parse_msg(_bytes)
            header: _MessageHeader = msg.header
            if header.command == _PCPyCmdId.PYPI_INSTALL_WHEEL_REQUEST.value:
                from mini.tool.pb2.PyPi_InstallWheel_pb2 import InstallWheelResponse
                response: InstallWheelResponse = InstallWheelResponse()
                response.ParseFromString(msg.bodyData)
                print("{0}".format(response.message))
            elif header.command == _PCPyCmdId.PYPI_UNINSTALL_WHEEL_REQUEST.value:
                from mini.tool.pb2.PyPi_UninstallWheel_pb2 import UninstallWheelResponse
                response: UninstallWheelResponse = UninstallWheelResponse()
                response.ParseFromString(msg.bodyData)
                print("{0}".format(response.message))
            elif header.command == _PCPyCmdId.PYPI_RUN_WHEEL_REQUEST.value:
                from mini.tool.pb2.PyPi_RunWheel_pb2 import RunWheelResponse
                response: RunWheelResponse = RunWheelResponse()
                response.ParseFromString(msg.bodyData)
                print("{0}".format(response.message))
            elif header.command == _PCPyCmdId.PYPI_GET_WHEEL_INFO_REQUEST.value:
                from mini.tool.pb2.PyPi_GetWheelInfo_pb2 import GetWheelInfoResponse
                response: GetWheelInfoResponse = GetWheelInfoResponse()
                response.ParseFromString(msg.bodyData)
                return response.message
            elif header.command == _PCPyCmdId.PYPI_GET_WHEEL_LIST_REQUEST.value:
                from mini.tool.pb2.PyPi_GetWheelList_pb2 import GetWheelListResponse
                response: GetWheelListResponse = GetWheelListResponse()
                response.ParseFromString(msg.bodyData)
                if response.resultCode != 0:
                    return response.error
                else:
                    return "\n".join(response.Wheels)
            elif header.command == _PCPyCmdId.PYPI_UPLOAD_SCRIPT_REQUEST.value:
                from mini.tool.pb2.PyPi_UploadScript_pb2 import UploadScriptResponse
                response: UploadScriptResponse = UploadScriptResponse()
                response.ParseFromString(msg.bodyData)
                print("command {0} return <{1}, {2}>".format(header.command, response.resultCode, response.message))
                return response.message
            elif header.command == _PCPyCmdId.PYPI_CHECK_UPLOAD_SCRIPT_REQUEST.value:
                from mini.tool.pb2.PyPi_UploadScript_pb2 import UploadScriptResponse
                response: UploadScriptResponse = UploadScriptResponse()
                response.ParseFromString(msg.bodyData)
                print("command {0} return <{1}, {2}>".format(header.command, response.resultCode, response.message))
                return response.message
            elif header.command == _PCPyCmdId.PYPI_RUN_UPLOAD_SCRIPT_REQUEST.value:
                from mini.tool.pb2.PyPi_UploadScript_pb2 import UploadScriptResponse
                response: UploadScriptResponse = UploadScriptResponse()
                response.ParseFromString(msg.bodyData)
                print("command {0} return <{1}, {2}>".format(header.command, response.resultCode, response.message))
                return response.message
            elif header.command == _PCPyCmdId.PYPI_STOP_UPLOAD_SCRIPT_REQUEST.value:
                from mini.tool.pb2.PyPi_UploadScript_pb2 import UploadScriptResponse
                response: UploadScriptResponse = UploadScriptResponse()
                response.ParseFromString(msg.bodyData)
                print("command {0} return <{1}, {2}>".format(header.command, response.resultCode, response.message))
                return response.message
            elif header.command == _PCPyCmdId.PYPI_LIST_UPLOAD_SCRIPT_REQUEST.value:
                from mini.tool.pb2.PyPi_UploadScript_pb2 import ListUploadScriptResponse
                response: ListUploadScriptResponse = ListUploadScriptResponse()
                response.ParseFromString(msg.bodyData)
                print("command {0} return {1}".format(header.command, response.uploadScripts))
                return "ex"
            elif header.target == -1:
                print(f"不支持的指令cmd={header.command}")
            else:
                print(f"不支持的指令cmd={header.command}")

        except Exception as e:
            if isinstance(e, websockets.ConnectionClosedOK):
                # print(f"connection closed ok!")
                break
            else:
                raise e
    return result


async def _send_msg0(message: _Message, device: _WiFiDevice) -> str:
    try:
        async with websockets.connect('ws://{}:{!r}'.format(device.address, 8801)) as websocket:
            return await _send_msg1(websocket, message)
    except Exception as e:
        return ""


def _get_eggInfo_path():
    return _get_file('.', 'egg-info', True)


def _get_file(dir_path: str, suffix: str, is_dir: bool = False):
    for temp_path in os.listdir(dir_path):
        temp_path = os.path.join(dir_path, temp_path)
        if is_dir:
            if not os.path.isdir(temp_path):
                continue
        else:
            if not os.path.isfile(temp_path):
                continue

        if temp_path.endswith(suffix):
            result = os.path.abspath(temp_path)
            return result


def _remove_dir(dir_path: str):
    """
    删除目录
    :param dir_path: 需要删除的目录
    """
    if isinstance(dir_path, str) and os.path.exists(dir_path) and os.path.isdir(dir_path):
        import shutil
        shutil.rmtree(dir_path)
    elif dir_path is not None and os.path.exists(dir_path):
        os.remove(dir_path)


[docs]def setup_py_pkg(project_dir: str) -> str: """ Package a py project into a .whl file. Args: project_dir: project file root directory Returns: str: the absolute path of the generated .whl file """ # 校验目录 if not os.path.isdir(project_dir): print('project_dir must be a directory') return "" # 将当前进程工作目录切换到工程目录 os.chdir(project_dir) # 清空产物目录 build_path = 'build' egg_info_path = _get_eggInfo_path() dist_path = 'dist' _remove_dir(build_path) _remove_dir(egg_info_path) _remove_dir(dist_path) os.mkdir(build_path) os.mkdir(dist_path) # 找到setup文件 setup_path = 'setup.py' if not os.path.isfile(setup_path): print('setup.py not exist') return "" # 打包 import sys if sys.version_info < (3, 0): os.system(f"python {setup_path} sdist bdist_wheel") else: import platform if platform.system() == 'Windows': os.system(f'python {setup_path} sdist bdist_wheel') else: os.system(f'python3 {setup_path} sdist bdist_wheel') # 删除其他临时文件 egg_info_path = _get_eggInfo_path() _remove_dir(build_path) _remove_dir(egg_info_path) result = _get_file(dist_path, '.whl') print(f'result {result}') return result
def _build_install_py_pkg_msg(package_path: str, debug: bool) -> _Message: from mini.tool.pb2.PyPi_InstallWheel_pb2 import InstallWheelRequest request = InstallWheelRequest() # 获取文件路径中的文件名 request.wheelName = os.path.basename(package_path) # 把文件转成字节 request.serializePacket = _read_into_buffer(package_path) request.debug = debug cmd_id = _PCPyCmdId.PYPI_INSTALL_WHEEL_REQUEST.value # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message
[docs]def install_py_pkg(package_path: str, robot_id: str, debug: bool = False): """ Install a py program installation package on the robot with the specified serial number. Args: package_path: the absolute path of the installation package robot_id: robot serial number debug: Whether to print the log when pkg is uninstalled on the robot side Returns: None """ # 校验文件 if not os.path.isfile(package_path): print(f'file is not exist:{package_path}') return base_name = os.path.basename(package_path) if not base_name.endswith('.whl'): print(f'文件不是一个pypi安装包') return device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"不能找到机器人:{robot_id}") return else: _found_devices[robot_id] = device # 上传 asyncio.run(_send_msg0(_build_install_py_pkg_msg(package_path, debug), device))
def _build_uninstall_py_pkg_msg(pkg_name: str, debug: bool) -> _Message: from mini.tool.pb2.PyPi_UninstallWheel_pb2 import UninstallWheelRequest request = UninstallWheelRequest() # pkg name request.wheelName = os.path.basename(pkg_name) request.debug = debug cmd_id = _PCPyCmdId.PYPI_UNINSTALL_WHEEL_REQUEST.value # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message
[docs]def uninstall_py_pkg(pkg_name: str, robot_id: str, debug: bool = False): """ Uninstall an installed py program from the robot with the specified serial number. For example, there is a py program whose setup.py file is configured as follows: setuptools.setup( name="tts_demo", ...#Omitted ), Then, its pkg_name is "tts_demo". Args: pkg_name: program name robot_id: robot serial number debug: Whether to print the log when pkg is uninstalled on the robot side Returns: None """ device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"不能找到机器人:{robot_id}") return else: _found_devices[robot_id] = device # 卸载 asyncio.run(_send_msg0(_build_uninstall_py_pkg_msg(pkg_name, debug), device))
def _build_query_py_pkg_msg(pkg_name: str) -> _Message: from mini.tool.pb2.PyPi_GetWheelInfo_pb2 import GetWheelInfoRequest request = GetWheelInfoRequest() # pkg name request.wheelName = os.path.basename(pkg_name) cmd_id = _PCPyCmdId.PYPI_GET_WHEEL_INFO_REQUEST.value # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message
[docs]def query_py_pkg(pkg_name: str, robot_id: str) -> str: """ Query the py program specified by pkg_name on the robot, and its detailed information, for example, there is a py program, and its setup.py file is configured as follows: setuptools.setup( name="tts_demo", version="0.0.2", author='Gino Deng', author_email='jingjing.deng@ubtrobot.com', description="demo with mini_sdk", long_description='demo with mini_sdk,xxxxxxx', long_description_content_type="text/markdown", license="GPLv3", packages=setuptools.find_packages(), classifiers=[ "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ], install_requires=[ 'alphamini', ], ), When querying, specify pkg_name="tts_demo", and the returned information is as follows: Name: tts-demo Version: 0.0.2 Summary: demo with mini_sdk Home-page: UNKNOWN Author: Gino Deng Author-email: jingjing.deng@ubtrobot.com License: GPLv3 Location: /data/data/com.termux/files/usr/lib/python3.8/site-packages Requires: alphamini Required-by: Args: pkg_name: py program name, robot_id: robot serial number Returns: str: install package trust information """ device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"不能找到机器人:{robot_id}") return "" else: _found_devices[robot_id] = device # 查询 return asyncio.run(_send_msg0(_build_query_py_pkg_msg(pkg_name), device))
def _build_list_py_pkg_msg(): from mini.tool.pb2.PyPi_GetWheelList_pb2 import GetWheelListRequest request = GetWheelListRequest() cmd_id = _PCPyCmdId.PYPI_GET_WHEEL_LIST_REQUEST.value # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message
[docs]def list_py_pkg(robot_id: str) -> str: """ List the py programs installed on the robot, for example: Package Version ---------- ------- alphamini 0.1.0 ifaddr 0.1.7 pip 20.1.1 protobuf 3.12.2 setuptools 47.3.1 six 1.15.0 tts-demo 0.0.2 websockets 8.1 wheel 0.34.2 Args: robot_id: robot serial number Returns: str: name of all py programs-version number """ device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"不能找到机器人:{robot_id}") return "" else: _found_devices[robot_id] = device # 查询 return asyncio.run(_send_msg0(_build_list_py_pkg_msg(), device))
def _build_run_py_pkg_msg(entry_point: str, debug: bool) -> _Message: from mini.tool.pb2.PyPi_RunWheel_pb2 import RunWheelRequest request = RunWheelRequest() # pkg main entry request.wheelName = entry_point request.debug = debug cmd_id = _PCPyCmdId.PYPI_RUN_WHEEL_REQUEST.value # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message
[docs]def run_py_pkg(entry_point: str, robot_id: str, debug: bool = False): """ Trigger a specified py program to run on the specified robot. : Args: entry_point: The name of the console scripts of the py program, for example, in the setup.py file, such as configuration entry_points=( 'console_scripts': [ 'XXX = Packages.Modules:XXX' ], }, the program entry name is xxx robot_id: robot serial number debug: Whether to return the log when the program is executed Returns: None """ device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"不能找到机器人:{robot_id}") return else: _found_devices[robot_id] = device # 触发 asyncio.run(_send_msg0(_build_run_py_pkg_msg(entry_point, debug), device))
def _build_switch_adb_msg(switch: bool): from mini.tool.pb2.PyPi_AdbSwitch_pb2 import AdbSwitchRequest request = AdbSwitchRequest() # pkg main entry request.open = switch cmd_id = _PCPyCmdId.PYPI_SWITCH_ADB_REQUEST.value # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message
[docs]def switch_adb(robot_id: str, switch: bool = True): """ Turn on the robot ADB debugging switch Args: switch: bool, True or False robot_id: robot serial number Returns: None """ device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"不能找到机器人:{robot_id}") return else: _found_devices[robot_id] = device # 触发 asyncio.run(_send_msg0(_build_switch_adb_msg(switch), device))
def _build_upload_script_msg(file_name: str = None, content: bytes = None, cmd_id: int = 1): from mini.tool.pb2.PyPi_UploadScript_pb2 import UploadScript request = UploadScript() if file_name is not None: request.fileName = file_name if content is not None: request.content = content # message message: _Message = msg_utils.build_request_msg(cmd_id, 0, request) return message # 上传python脚本到机器人
[docs]def upload_script(cmd_id: int, robot_id: str, file_name: str = None, content: bytes = None): pyCmdId = _PCPyCmdId.PYPI_UPLOAD_SCRIPT_REQUEST if cmd_id == 1: pyCmdId = _PCPyCmdId.PYPI_UPLOAD_SCRIPT_REQUEST elif cmd_id == 2: pyCmdId = _PCPyCmdId.PYPI_CHECK_UPLOAD_SCRIPT_REQUEST elif cmd_id == 3: pyCmdId = _PCPyCmdId.PYPI_RUN_UPLOAD_SCRIPT_REQUEST elif cmd_id == 4: pyCmdId = _PCPyCmdId.PYPI_STOP_UPLOAD_SCRIPT_REQUEST elif cmd_id == 5: pyCmdId = _PCPyCmdId.PYPI_LIST_UPLOAD_SCRIPT_REQUEST else: pyCmdId = _PCPyCmdId.PYPI_LIST_UPLOAD_SCRIPT_REQUEST device: _WiFiDevice = _found_devices.get(robot_id) # 搜索设备 if device is None: device = asyncio.get_event_loop().run_until_complete(mini.get_device_by_name(robot_id, 10)) if device is None: print(f"不能找到机器人:{robot_id}") return else: _found_devices[robot_id] = device # 触发 return asyncio.run(_send_msg0(_build_upload_script_msg(file_name, content, pyCmdId.value), device))