时间:2022-09-07 09:59:34 | 栏目:Python代码 | 点击:次
bsonrpc 是python中⼀个基于json或bson的远程过程调⽤的库,提供了服务端与客户端实现,其底层采⽤的是基于TCP连接的通信。
bsonrpc主要包括以下⽂件:
本⽂主要描述库包中对于不同协议的分包组包的处理,涉及到socket_queue.py和framing.py⽂件,主要采⽤的是对象组合的技术。
socket_queue.py中的SocketQueue类是⽤来处理从socket接收数据,主要的⽅法为_receiver()和put()⽅法,分别对应分包和组包,分包的主要内容如下:
def _receiver(self): bbuffer = b'' while True: try: chunk = self.socket.recv(self.BUFSIZE) # 从socket上接收数据 bbuffer = self._to_queue(bbuffer + chunk) # 数据分包 except DecodingError as e: self._queue.put(e) # 后⾯省略... def _to_queue(self, bbuffer): b_msg, bbuffer = self.codec.extract_message(bbuffer) # 解码器提取完整的信息 while b_msg is not None: self._queue.put(self.codec.loads(b_msg)) # 解码后的消息放⼊消息队列中等待处理 b_msg, bbuffer = self.codec.extract_message(bbuffer) return bbuffer
组包的主要内容如下:
def put(self, item): if self._closed: raise BsonRpcError('Attempt to put items to closed queue.') msg_bytes = self.codec.into_frame(self.codec.dumps(item)) # 组包 with self._lock: self.socket.sendall(msg_bytes)
如上图所示,程序采⽤的是对象组合的⽅式实现消息分包处理的。对象组合是继承之外的另⼀种选择,对象组合要求被组合的对象具有良好定义的接⼝,通过接⼝的⽅式调⽤其他对象的功能,这个也被“⿊箱复⽤”,因为对象的内部细节是不可⻅的。SocketQueue中依赖Codec的extract_message()接⼝⽅法,不⽤关⼼其具体的实现⽅法。具体实现由JSONCodec和BSONCode进⾏实现。JSONCodec中依赖JSONFrame中的extract_message()接⼝⽅法,该接⼝⽅法的实现由JSONFramingNone、JSONFramingNetstring、JSONFramingRFC7464进⾏实现。SocketQueue消息组包过程依赖于into_frame()⽅法,也是通过对象组合实现的。
注:图中的接⼝为了⼤家容易理解才加上了,源码⾥⾯并没有。