当前位置:首页 » 《随便一记》 » 正文

python的websocket方法教程

18 人参与  2024年04月05日 10:30  分类 : 《随便一记》  评论

点击全文阅读


WebSocket是一种网络通信协议,它在单个TCP连接上提供全双工的通信信道。在本篇文章中,我们将探讨如何在Python中使用WebSocket实现实时通信。

websockets是Python中最常用的网络库之一,也是websocket协议的Python实现。它不仅作为基础组件在众多项目中发挥着重要作用,其源码也值得广大“Python玩家”研究。
官网:https://github.com/python-websockets/websockets

1. 什么是WebSocket?

WebSocket协议是在2008年由Web应用程序设计师和开发人员创建的,目的是为了在Web浏览器和服务器之间提供更高效、更低延迟的双向通信。它允许客户端和服务器在任何时候发送消息,无需重新建立TCP连接。WebSocket可以在Web浏览器和服务器之间传输文本和二进制数据,使得构建实时Web应用程序变得更加简单。

2. 在Python中使用WebSocket

Python中有多个库可以帮助我们使用WebSocket,如:websockets、aiohttp等。在本文中,我们将使用websockets库来演示WebSocket编程。

要安装websockets库,你可以使用pip:

pip install websockets

3. 创建WebSocket服务器

使用websockets库,我们可以轻松地创建一个WebSocket服务器。以下是一个简单的示例:

import asyncioimport websocketsasync def echo(websocket, path):    async for message in websocket:        print(f"Received message: {message}")        await websocket.send(f"Echo: {message}")start_server = websockets.serve(echo, "localhost", 8765)asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()

在这个示例中,我们定义了一个名为echo的协程函数,它接收两个参数:websocket和path。该函数使用async for循环读取客户端发送的消息,并将消息发送回客户端。

然后,我们使用websockets.serve()函数创建一个WebSocket服务器,监听本地主机的8765端口。最后,我们使用asyncio的事件循环启动服务器。

4. 创建WebSocket客户端

要创建一个WebSocket客户端,我们同样可以使用websockets库。以下是一个简单的客户端示例:

import asyncioimport websocketsasync def main():    async with websockets.connect("ws://localhost:8765") as websocket:        message = "Hello, server!"        await websocket.send(message)        print(f"Sent: {message}")        response = await websocket.recv()        print(f"Received: {response}")asyncio.run(main())

在这个示例中,我们使用websockets.connect()函数建立与WebSocket服务器的连接。然后,我们使用send()方法向服务器发送消息,并使用recv()方法接收服务器的响应。

5. 总结

WebSocket协议为Web浏览器和服务器之间提供了实时双向通信的能力,使得构建实时Web应用程序变得更加容易。在Python中,我们可以使用websockets库轻松地实现WebSocket编程。

6. 通过websockets这个项目,从大型开源项目中学习asyncio库。

一、asyncio.Transport
在官方文档中,Transport被描述成对socket的抽象,它控制着如何传输数据。除了websockets,uvicorn、daphne等ASGI实现都会用到Transport。

Transport继承于ReadTransport和WriteTransport,两者都继承于BaseTransport。顾名思义,Transport兼备读和写的功能,可以类比为读写socket对象。
在这里插入图片描述

Transport对象提供以下常用函数——

is_reading:判断该Transport是否在读。

set_write_buffer_limits:设置写入Transport的高和低水位。考虑到网络状况,有时不希望写入过多的数据。

write、write_eof、write_line:为当前Transport写入数据,分别表示写入二进制数据、eof和二进制行数据。其中eof写入后不会关闭Transport,但会flush数据。

abort:立刻关闭Transport,不接受新的数据。留在缓冲的数据也会丢失,后续调用Protocol的connection_lost函数。

在websockets中,Transport使用场景不多,一般都是通过Protocol对象的回调参数使用的。在websocket的初始化过程中,会设置Transport的最高水位。同样,在这种场景下,该对象也是作为回调参数使用的。
在这里插入图片描述

二、asyncio.Protocol
如果Transport是对socket的抽象,那么Protocol就是对协议的抽象。它提供了如何使用Transport的方式。
在这里插入图片描述

用户使用的Protocol直接继承自BaseProtocol,并提供了六个Unimplemented函数需要用户去实现——

connection_made:当连接建立时会执行该函数,该函数包含一个Transport类型的参数。

connection_lost:当连接丢失或者关闭时会执行该函数,该函数包含一个Exception类型的参数。

pause_writing:当Transport对象写入的数据高于之前设置的高水位时被调用,一般会暂停数据的写入。

resume_writing:当Transport对象写入的数据低于之前设置的低水位时被调用,一般用于恢复数据写入。

data_received:当有数据被接受时回调,该函数包含一个二进制对象data,用来表示接受的数据。

eof_received:当被Transport对象被调用write_eof时被调用。

在websockets中,server端的connection_made实现截图如图所示。在该函数中,websockets将用户实现的handler封装成task对象,并和websocket的server绑定。
在这里插入图片描述

而在client端中实现如第一节截图所示,只是在reader中注册该Transport对象。

websockets的connection_lost函数实现方式如下。主要操作即更新状态、关闭pings、更新对应的waiter状态,以及维护reader对象。
在这里插入图片描述

在其他函数的实现中,websockets也主要用到了reader对象完成数据流的暂停和恢复,以及数据的写入。

从上面代码实现可以看出,websockets通过reader代理完成数据流的操作。这个reader是一个asyncio.StreamReader对象。这个对象具体如何使用将在下一篇介绍。

附录:进阶版本:

python使用websockets库
serve:在server端使用,等待客户端的连接。如果连接成功,返回一个websocket。

connect: 在client端使用,用于建立连接。

send:发送数据

recv:接收数据

close:关闭连接

服务端

#!/usr/bin/python3# 主要功能:创建1个基本的websocket server, 符合asyncio 开发要求import asyncioimport websocketsfrom datetime import datetimeasync def handler(websocket):    data = await websocket.recv()    reply = f"Data received as \"{data}\".  time: {datetime.now()}"    print(reply)    await websocket.send(reply)    print("Send reply")async def main():    async with websockets.serve(handler, "localhost", 9999):        await asyncio.Future()  # run foreverif __name__ == "__main__":    asyncio.run(main())

客户端

import asyncioimport websocketsimport timeasync def ws_client(url):    for i in range(1, 40):        async with websockets.connect(url) as websocket:            await websocket.send("Hello, I am PyPy.")            response = await websocket.recv()        print(response)        time.sleep(1)asyncio.run(ws_client('ws://localhost:9999'))

服务端

import asyncioimport websocketsIP_ADDR = "127.0.0.1"IP_PORT = "9090"# 握手,通过接收Hi,发送"success"来进行双方的握手。async def serverHands(websocket):    while True:        recv_text = await websocket.recv()        print("recv_text=" + recv_text)        if recv_text == "Hi":            print("connected success")            await websocket.send("success")            return True        else:            await websocket.send("connected fail")# 接收从客户端发来的消息并处理,再返给客户端successasync def serverRecv(websocket):    while True:        recv_text = await websocket.recv()        print("recv:", recv_text)        await websocket.send("success,get mess:"+ recv_text)# 握手并且接收数据async def serverRun(websocket, path):    print(path)    await serverHands(websocket)    await serverRecv(websocket)# main functionif __name__ == '__main__':    print("======server======")    server = websockets.serve(serverRun, IP_ADDR, IP_PORT)    asyncio.get_event_loop().run_until_complete(server)    asyncio.get_event_loop().run_forever()

客户端

import asyncioimport websocketsIP_ADDR = "127.0.0.1"IP_PORT = "9090"async def clientHands(websocket):    while True:        # 通过发送hello握手        await websocket.send("Hi")        response_str = await websocket.recv()        # 接收"success"来进行双方的握手        if "success" in response_str:            print("握手成功")            return True# 向服务器端发送消息async def clientSend(websocket):    while True:        input_text = input("input text: ")        if input_text == "exit":            print(f'"exit", bye!')            await websocket.close(reason="exit")            return False        await websocket.send(input_text)        recv_text = await websocket.recv()        print(f"{recv_text}")# 进行websocket连接async def clientRun():    ipaddress = IP_ADDR + ":" + IP_PORT    async with websockets.connect("ws://" + ipaddress) as websocket:        await clientHands(websocket)        await clientSend(websocket)# main functionif __name__ == '__main__':    print("======client======")    asyncio.get_event_loop().run_until_complete(clientRun())

服务端

# -*- coding:utf8 -*-import jsonimport socketimport asyncioimport loggingimport websocketsimport multiprocessingIP = '127.0.0.1'PORT_CHAT = 9090USERS ={}#提供聊天的后台async def ServerWs(websocket,path):    logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',                        filename="chat.log",                        level=logging.INFO)    # 握手    await websocket.send(json.dumps({"type": "handshake"}))    async for message in websocket:        data = json.loads(message)        message = ''        # 用户发信息        if data["type"] == 'send':            name = '404'            for k, v in USERS.items():                if v == websocket:                    name = k            data["from"] = name            if len(USERS) != 0:  # asyncio.wait doesn't accept an empty list                message = json.dumps(                    {"type": "user", "content": data["content"], "from": name})        # 用户注册        elif data["type"] == 'register':            try:                USERS[data["uuid"]] = websocket                if len(USERS) != 0:  # asyncio.wait doesn't accept an empty list                    message = json.dumps(                        {"type": "login", "content": data["content"], "user_list": list(USERS.keys())})            except Exception as exp:                print(exp)        # 用户注销        elif data["type"] == 'unregister':            del USERS[data["uuid"]]            if len(USERS) != 0:  # asyncio.wait doesn't accept an empty list                message = json.dumps(                    {"type": "logout", "content": data["content"], "user_list": list(USERS.keys())})        #打印日志        logging.info(data)        # 群发        await asyncio.wait([user.send(message) for user in USERS.values()])def server_run():    print("server")    start_server = websockets.serve(ServerWs, '0.0.0.0', PORT_CHAT)    asyncio.get_event_loop().run_until_complete(start_server)    asyncio.get_event_loop().run_forever()if __name__ == "__main__":    from multiprocessing import Process    multiprocessing.freeze_support()    server = Process(target=server_run, daemon=False)    server.start()

服务端

import asyncioimport websocketsimport timeimport jsonimport threading# 功能模块class OutputHandler():    async def run(self,message,send_ms,websocket):        # 用户发信息        await send_ms(message, websocket)        # 单发消息        # await send_ms(message, websocket)        # 群发消息        #await s('hi起来')# 存储所有的客户端Clients = {}# 服务端class WS_Server():    def __init__(self):        self.ip = "127.0.0.1"        self.port = 9090    # 回调函数(发消息给客户端)    async def callback_send(self, msg, websocket=None):        await self.sendMsg(msg, websocket)    # 发送消息    async def sendMsg(self, msg, websocket):        print('sendMsg:', msg)        # websocket不为空,单发,为空,群发消息        if websocket != None:            await websocket.send(msg)        else:            # 群发消息            await self.broadcastMsg(msg)        # 避免被卡线程        await asyncio.sleep(0.2)    # 群发消息    async def broadcastMsg(self, msg):        for user in Clients:            await user.send(msg)    # 针对不同的信息进行请求,可以考虑json文本    async def runCaseX(self,jsonMsg,websocket):        print('runCase')        op = OutputHandler()        # 参数:消息、方法、socket        await op.run(jsonMsg,self.callback_send,websocket)    # 连接一个客户端,起一个循环监听    async def echo(self,websocket, path):        # 添加到客户端列表        # Clients.append(websocket)        # 握手        await websocket.send(json.dumps({"type": "handshake"}))        # 循环监听        while True:            # 接受信息            try:                # 接受文本                recv_text = await websocket.recv()                message = "Get message: {}".format(recv_text)                # 返回客户端信息                await websocket.send(message)                # 转json                data = json.loads(recv_text)                # 用户发信息                if data["type"] == 'send':                    name = '404'                    for k, v in Clients.items():                        if v == websocket:                            name = k                    data["from"] = name                    if len(Clients) != 0:  # asyncio.wait doesn't accept an empty list                        message = json.dumps({"type": "send", "content": data["content"], "from": name})                        await self.runCaseX(jsonMsg=message, websocket=websocket)                # 用户注册                elif data["type"] == 'register':                    try:                        Clients[data["uuid"]] = websocket                        if len(Clients) != 0:  # asyncio.wait doesn't accept an empty list                            message = json.dumps({"type": "register", "content": data["content"], "user_list": list(Clients.keys())})                            await self.runCaseX(jsonMsg=message, websocket=websocket)                    except Exception as exp:                        print(exp)                # 用户注销                elif data["type"] == 'unregister':                    del Clients[data["uuid"]]                # 对message进行解析,跳进不同功能区                # await self.runCaseX(jsonMsg=data,websocket=websocket)            # 链接断开            except websockets.ConnectionClosed:                print("ConnectionClosed...", path)                # del Clients                break            # 无效状态            except websockets.InvalidState:                print("InvalidState...")                # del Clients                break            # 报错            except Exception as e:                print("ws连接报错",e)                # del Clients                break    # 启动服务器    async def runServer(self):        async with websockets.serve(self.echo, self.ip, self.port):            await asyncio.Future()  # run forever# 多协程模式,防止阻塞主线程无法做其他事情    def WebSocketServer(self):        asyncio.run(self.runServer())    # 多线程启动    def startServer(self):        # 多线程启动,否则会堵塞        thread = threading.Thread(target=self.WebSocketServer)        thread.start()        # thread.join()if __name__=='__main__':    print("server")    s = WS_Server()    s.startServer()

点击全文阅读


本文链接:http://zhangshiyu.com/post/90673.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1