import base64
import logging
from functools import wraps
from google.protobuf import message as _message
from ..pb2.pccodemao_message_pb2 import Message
from ..pb2.pccodemao_messageheader_pb2 import MessageHeader
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())
if log.level == logging.NOTSET:
log.setLevel(logging.WARN)
[docs]def build_request_msg(cmd: int, send_serial: int, request: _message.Message) -> Message:
msg = Message()
msg.bodyData = request.SerializeToString()
msg_header: MessageHeader = MessageHeader()
msg_header.command = cmd
msg_header.id = str(send_serial)
# msg_header.responseSerial = 0
msg.header.CopyFrom(msg_header)
return msg
[docs]def build_response_msg(cmd: int, send_serial: int, response: _message.Message) -> Message:
msg = Message()
msg.bodyData = response.SerializeToString()
msg_header = MessageHeader()
msg_header.command = cmd
msg_header.id = send_serial
# msg_header.responseSerial = response_serial
msg.header.CopyFrom(msg_header)
return msg
[docs]def parse_msg(recv: str) -> Message:
msg = Message()
msg.ParseFromString(recv)
return msg
[docs]def parse_body_msg(recv: bytes, clazz: type(_message.Message)) -> _message.Message:
return clazz().ParseFromString(recv)
# clazz.ParseFromString(recv)
[docs]def base64_encode(b: bytes) -> str:
log.debug("{0} -----> {1}".format(str(b), str(base64.b64encode(b))))
return "{0}&".format(str(base64.b64encode(b))[2:-1])
[docs]def base64_decode(b: str) -> str:
log.debug("{0}".format(str(b)))
return base64.b64decode(b[:-1])
[docs]def coroutine(func):
@wraps(func)
def primer(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return primer
[docs]@coroutine
def id_generator():
identify = 0
while True:
yield identify
identify += 1