添加socket编程

This commit is contained in:
External trust 2025-03-24 18:03:40 +08:00
parent 7f2880cd32
commit bcb88e2836
6 changed files with 643 additions and 0 deletions

View File

@ -0,0 +1,131 @@
#!/usr/bin/env python
# -*-coding:utf-8 -*-
'''
# @Author :幸运锦鲤
# @Time : 2025-03-24 16:28:21
# @version : python3
# @Update time :
# @Description : TCP客户端示例代码(可发送心跳)
'''
import socket
import threading
import time
class TCPClient:
def __init__(self, host='127.0.0.1', port=9999, heartbeat_interval=30):
"""
初始化 TCP 客户端
:param host: 服务端的 IP 地址默认为 '127.0.0.1'
:param port: 服务端的端口号默认为 9999
:param heartbeat_interval: 心跳间隔时间默认为 30
"""
self.host = host
self.port = port
self.client_socket = None
self.is_running = False
self.heartbeat_interval = heartbeat_interval
def connect(self):
"""
连接到服务端
"""
try:
# 创建 TCP socket
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.connect((self.host, self.port))
self.is_running = True
print(f"[*] 已连接到服务端 {self.host}:{self.port}")
# 启动接收线程
threading.Thread(target=self.receive_messages, daemon=True).start()
# 启动心跳线程
# threading.Thread(target=self.send_heartbeat, daemon=True).start()
except Exception as e:
print(f"[!] 连接失败: {e}")
def send_message(self, message):
"""
发送消息给服务端
:param message: 要发送的消息
"""
if self.client_socket and self.is_running:
try:
self.client_socket.sendall(message.encode('utf-8'))
except Exception as e:
print(f"[!] 发送消息失败: {e}")
self.disconnect()
def receive_messages(self):
"""
持续接收服务端的消息
"""
self.client_socket.settimeout(30.0)
while self.is_running:
try:
# 接收服务端的消息(长时间阻塞会超时)
data = self.client_socket.recv(1024).decode('utf-8')
if not data:
print("[!] 服务端已断开连接.")
break
print(f"[*] 收到服务端消息: {data}")
except ConnectionResetError:
print("[!] 服务端强制断开连接.")
break
except socket.timeout:
print("接收数据超时")
except Exception as e:
print(f"[!] 接收消息时发生错误: {e}")
break
# 断开连接
self.disconnect()
def send_heartbeat(self):
"""
定期发送心跳消息以保持连接活跃
"""
while self.is_running:
try:
if self.client_socket:
self.client_socket.sendall("PING".encode('utf-8'))
print("[*] 发送心跳消息: PING")
time.sleep(self.heartbeat_interval)
except Exception as e:
print(f"[!] 发送心跳消息失败: {e}")
break
def disconnect(self):
"""
断开与服务端的连接
"""
if self.client_socket:
try:
self.is_running = False
self.client_socket.close()
print("[*] 已断开与服务端的连接.")
except Exception as e:
print(f"[!] 断开连接时发生错误: {e}")
# 测试代码
if __name__ == "__main__":
# 创建并连接客户端
client = TCPClient(host='127.0.0.1', port=9999, heartbeat_interval=30)
client.connect()
# 主线程用于发送消息
try:
while client.is_running:
message = input("请输入要发送的消息 (输入 'exit' 退出): ")
if message.lower() == "exit":
break
client.send_message(message)
except KeyboardInterrupt:
print("\n[!] 正在断开连接...")
# 断开客户端
client.disconnect()

View File

@ -0,0 +1,123 @@
#!/usr/bin/env python
# -*-coding:utf-8 -*-
'''
# @Author :幸运锦鲤
# @Time : 2025-03-24 16:08:20
# @version : python3
# @Update time :
# @Description : TCP服务器端示例代码(适应多连接,优化端口占用)
'''
import socket
import threading
class TCPServer:
def __init__(self, host='0.0.0.0', port=9999):
"""
初始化 TCP 服务端
:param host: 监听的 IP 地址默认为 '0.0.0.0'
:param port: 监听的端口号默认为 9999
"""
self.host = host
self.port = port
self.server_socket = None
self.is_running = False
def start(self):
"""
启动 TCP 服务端
"""
# 创建 TCP socket
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5) # 最大连接数设为5
self.is_running = True
print(f"[*] 服务端正在监听 {self.host}:{self.port} ...")
while self.is_running:
try:
# 等待客户端连接
client_socket, client_address = self.server_socket.accept()
# 设置超时时间,避免 recv() 无限阻塞
client_socket.settimeout(60) # 超时时间为 60 秒
# 创建新线程处理客户端
client_handler = threading.Thread(
target=self.handle_client,
args=(client_socket, client_address)
)
client_handler.daemon = True # 设置为守护线程,主线程退出时自动结束
client_handler.start()
except Exception as e:
if self.is_running:
print(f"[!] 服务端发生错误: {e}")
break
def stop(self):
"""
停止 TCP 服务端
"""
if self.server_socket:
self.is_running = False
self.server_socket.close()
print("[*] 服务端已停止.")
def handle_client(self, client_socket, client_address):
"""
处理客户端连接
:param client_socket: 客户端套接字
:param client_address: 客户端地址
"""
print(f"[+] {client_address} 已连接.")
while self.is_running:
try:
# 接收客户端发送的数据
data = client_socket.recv(1024).decode('utf-8')
if not data:
print(f"[-] {client_address} 断开连接.")
break
print(f"[*] 收到来自 {client_address} 的消息: {data}")
# 自定义处理逻辑(默认回显)
response = self.process_data(data)
# 发送响应给客户端
client_socket.sendall(response.encode('utf-8'))
except socket.timeout:
print(f"[!] {client_address} 连接超时.")
break
except ConnectionResetError:
print(f"[-] {client_address} 强制断开连接.")
break
except Exception as e:
print(f"[!] 处理 {client_address} 时发生错误: {e}")
break
# 确保关闭客户端连接
try:
client_socket.close()
except Exception as e:
print(f"[!] 关闭 {client_address} 连接时发生错误: {e}")
def process_data(self, data):
"""
自定义数据处理逻辑
:param data: 客户端发送的消息
:return: 要返回给客户端的响应
"""
# 默认实现:回显消息
return data
# 测试代码
if __name__ == "__main__":
# 创建并启动服务端
server = TCPServer(host='0.0.0.0', port=9999)
try:
server.start()
except KeyboardInterrupt:
print("\n[!] 正在关闭服务端...")
server.stop()

View File

@ -0,0 +1,106 @@
#!/usr/bin/env python
# -*-coding:utf-8 -*-
'''
# @Author :幸运锦鲤
# @Time : 2025-03-24 17:06:06
# @version : python3
# @Update time :
# @Description : UDP客户端示例代码
'''
import socket
import threading
class UDPClient:
def __init__(self, host='127.0.0.1', port=9999):
"""
初始化 UDP 客户端
:param host: 服务端的 IP 地址默认为 '127.0.0.1'
:param port: 服务端的端口号默认为 9999
"""
self.host = host
self.port = port
self.client_socket = None
self.is_running = False
def connect(self):
"""
初始化并启动客户端
"""
try:
# 创建 UDP socket
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.is_running = True
print(f"[*] UDP 客户端已启动,目标服务端地址: {self.host}:{self.port}")
# 启动接收线程
threading.Thread(target=self.receive_messages, daemon=True).start()
except Exception as e:
print(f"[!] 客户端初始化失败: {e}")
def send_message(self, message):
"""
发送消息给服务端
:param message: 要发送的消息
"""
if not message.strip(): # 检查消息是否为空或只包含空白字符
print("[!] 消息为空,未发送.")
return
if self.client_socket and self.is_running:
try:
print(f"[*] 向服务端发送消息: {message}")
self.client_socket.sendto(message.encode('utf-8'), (self.host, self.port))
except Exception as e:
print(f"[!] 发送消息失败: {e}")
self.disconnect()
def receive_messages(self):
"""
持续接收服务端的消息
"""
while self.is_running:
try:
# 接收服务端的消息
data, server_address = self.client_socket.recvfrom(1024)
data = data.decode('utf-8')
print(f"[*] 收到服务端消息: {data}")
except Exception as e:
print(f"[!] 接收消息时发生错误: {e}")
break
# 断开连接
self.disconnect()
def disconnect(self):
"""
断开客户端
"""
if self.client_socket:
try:
self.is_running = False
self.client_socket.close()
print("[*] UDP 客户端已断开连接.")
except Exception as e:
print(f"[!] 断开连接时发生错误: {e}")
# 测试代码
if __name__ == "__main__":
# 创建并启动客户端
client = UDPClient(host='127.0.0.1', port=9999)
client.connect()
try:
while client.is_running:
message = input("请输入要发送的消息 (输入 'exit' 退出): ")
if message.lower() == "exit":
break
client.send_message(message.strip()) # 去除首尾空白字符
except KeyboardInterrupt:
print("\n[!] 正在断开连接...")
# 断开客户端
client.disconnect()

View File

@ -0,0 +1,81 @@
#!/usr/bin/env python
# -*-coding:utf-8 -*-
'''
# @Author :幸运锦鲤
# @Time : 2025-03-24 16:59:42
# @version : python3
# @Update time :
# @Description : UDP服务器端示例代码
'''
import socket
import threading
class UDPServer:
def __init__(self, host='0.0.0.0', port=9999):
"""
初始化 UDP 服务端
:param host: 监听的 IP 地址默认为 '0.0.0.0'
:param port: 监听的端口号默认为 9999
"""
self.host = host
self.port = port
self.server_socket = None
self.is_running = False
def start(self):
"""
启动 UDP 服务端
"""
# 创建 UDP socket
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.bind((self.host, self.port))
self.is_running = True
print(f"[*] UDP 服务端正在监听 {self.host}:{self.port} ...")
while self.is_running:
try:
# 接收数据包(阻塞等待)
data, client_address = self.server_socket.recvfrom(1024)
data = data.decode('utf-8')
print(f"[*] 收到来自 {client_address} 的消息: {data}")
# 处理数据并生成响应
response = self.process_data(data)
# 发送响应给客户端
self.server_socket.sendto(response.encode('utf-8'), client_address)
except Exception as e:
if self.is_running:
print(f"[!] 服务端发生错误: {e}")
break
def stop(self):
"""
停止 UDP 服务端
"""
if self.server_socket:
self.is_running = False
self.server_socket.close()
print("[*] UDP 服务端已停止.")
def process_data(self, data):
"""
自定义数据处理逻辑
:param data: 客户端发送的消息
:return: 要返回给客户端的响应
"""
# 默认实现:回显消息
return data
# 测试代码
if __name__ == "__main__":
# 创建并启动服务端
server = UDPServer(host='0.0.0.0', port=9999)
try:
server.start()
except KeyboardInterrupt:
print("\n[!] 正在关闭服务端...")
server.stop()

View File

@ -0,0 +1,101 @@
#!/usr/bin/env python
# -*-coding:utf-8 -*-
'''
# @Author :幸运锦鲤
# @Time : 2025-03-24 17:30:50
# @version : python3
# @Update time :
# @Description : WebSocket客户端示例代码
'''
import asyncio
import websockets
class WebSocketClient:
def __init__(self, uri='ws://127.0.0.1:8765'):
"""
初始化 WebSocket 客户端
:param uri: 服务端的 WebSocket URI默认为 'ws://127.0.0.1:8765'
"""
self.uri = uri
self.websocket = None
async def connect(self):
"""
连接到 WebSocket 服务端
"""
try:
self.websocket = await websockets.connect(self.uri)
print(f"[*] 已连接到 WebSocket 服务端: {self.uri}")
# 启动接收消息的任务
asyncio.create_task(self.receive_messages())
except Exception as e:
print(f"[!] 连接失败: {e}")
async def send_message(self, message):
"""
发送消息给服务端
:param message: 要发送的消息
"""
if not message.strip(): # 检查消息是否为空
print("[!] 消息为空,未发送.")
return
if self.websocket:
try:
print(f"[*] 向服务端发送消息: {message}")
await self.websocket.send(message)
except Exception as e:
print(f"[!] 发送消息失败: {e}")
await self.disconnect()
async def receive_messages(self):
"""
持续接收服务端的消息
"""
while True:
try:
message = await self.websocket.recv()
print(f"[*] 收到服务端消息: {message}")
except websockets.exceptions.ConnectionClosed:
print("[!] 服务端已断开连接.")
break
except Exception as e:
print(f"[!] 接收消息时发生错误: {e}")
break
# 断开连接
await self.disconnect()
async def disconnect(self):
"""
断开与服务端的连接
"""
if self.websocket:
try:
await self.websocket.close()
print("[*] 已断开与 WebSocket 服务端的连接.")
except Exception as e:
print(f"[!] 断开连接时发生错误: {e}")
# 测试代码
if __name__ == "__main__":
async def main():
client = WebSocketClient(uri='ws://127.0.0.1:8765')
await client.connect()
try:
while True:
message = input("请输入要发送的消息 (输入 'exit' 退出): ")
if message.lower() == "exit":
break
await client.send_message(message.strip()) # 去除首尾空白字符
except KeyboardInterrupt:
print("\n[!] 正在断开连接...")
await client.disconnect()
asyncio.run(main())

View File

@ -0,0 +1,101 @@
#!/usr/bin/env python
# -*-coding:utf-8 -*-
'''
# @Author :幸运锦鲤
# @Time : 2025-03-24 17:30:27
# @version : python3
# @Update time :
# @Description : WebSocket服务器端示例代码
# @env : pip install websockets
'''
import asyncio
import websockets
class WebSocketServer:
def __init__(self, host='0.0.0.0', port=8765):
"""
初始化 WebSocket 服务端
:param host: 监听的 IP 地址默认为 '0.0.0.0'
:param port: 监听的端口号默认为 8765
"""
self.host = host
self.port = port
self.clients = set() # 存储所有连接的客户端
async def handle_client(self, websocket, path):
"""
处理客户端连接
:param websocket: 客户端 WebSocket 连接对象
:param path: 客户端请求的路径可选
"""
# 添加客户端到集合中
self.clients.add(websocket)
print(f"[+] 客户端已连接: {websocket.remote_address}")
try:
async for message in websocket:
print(f"[*] 收到来自 {websocket.remote_address} 的消息: {message}")
# 自定义处理逻辑
response = self.process_message(message)
# 将响应发送回客户端
await websocket.send(response)
except websockets.exceptions.ConnectionClosed:
print(f"[-] 客户端断开连接: {websocket.remote_address}")
finally:
# 移除客户端
self.clients.remove(websocket)
def process_message(self, message):
"""
自定义消息处理逻辑
:param message: 客户端发送的消息
:return: 要返回给客户端的响应
"""
# 默认实现:回显消息
return message
async def broadcast(self, message):
"""
广播消息给所有连接的客户端
:param message: 要广播的消息
"""
if not self.clients:
print("[!] 没有客户端连接,无法广播消息.")
return
print(f"[*] 广播消息: {message}")
for client in self.clients:
try:
await client.send(message)
except Exception as e:
print(f"[!] 向 {client.remote_address} 发送消息失败: {e}")
def start(self):
"""
启动 WebSocket 服务端
"""
print(f"[*] WebSocket 服务端正在监听 ws://{self.host}:{self.port} ...")
server = websockets.serve(self.handle_client, self.host, self.port)
asyncio.get_event_loop().run_until_complete(server)
asyncio.get_event_loop().run_forever()
def stop(self):
"""
停止 WebSocket 服务端
"""
print("[*] 正在停止 WebSocket 服务端...")
asyncio.get_event_loop().stop()
# 测试代码
if __name__ == "__main__":
server = WebSocketServer(host='0.0.0.0', port=8765)
try:
server.start()
except KeyboardInterrupt:
print("\n[!] 正在关闭服务端...")
server.stop()