scoket模块
需要进行网络编程就要创建套接字,而在python中要创建套接字,就必须使用socket.scoket()
函数,它的一般语法如下:
socket(scoket_family,scoket_type,protocol=0)
scoket_family
可以是AF_UNIX
或AF_INET(6)
- AF_UNIX:UNIX,用于单一的
UNIX
系统进程之间的通信 - AF_INET(6):因特网,IPv4或IPv6
scoket_type
可以是SOCK_STREAM
或SOCK_DGRAM
- SOCK_STREAM:TCP,面向连接的套接字(流套接字,虚拟电路),主要协议是TCP(传输控制协议)
- SOCK_DGRAM:UDP,无连接的套接字(数据报),主要协议为UDP(用户数据报协议)protocol
protocol
参数为与特定的地址家族相关的协议,默认为0(根据地址格式和套接类别,自动选择一个合适的协议),该参数通常省略
创建TCP/IP
套接字
tcpSock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
创建UPD/IP
套接字
udpSock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
服务器套接字方法
名称 | 描述 |
---|---|
s.bind() | 将地址(主机名字),绑定到套接字上,参数需要为元祖格式 |
s.listen() | 设置并启动TCP监听器,参数为最大挂起连接数 |
s.accept() | 被动接受TCP客户端连接,一直等待到连接到达(阻塞) |
客户端套接字方法
名称 | 描述 |
---|---|
s.connect() | 主动发起TCP服务器连接 |
s.connect_ex() | connect的扩展版本,会以错误码形式返回问题,而不是抛出一个异常 |
普通套接字方法
名称 | 描述 |
---|---|
s.recv() | 接受TCP消息 |
s.send() | 发送TCP消息 |
s.sendall | 完整的发送TCP消息 |
s.recvfrom() | 接收UDP消息 |
s.sendto() | 发送UDP消息 |
s.shutdown() | 关闭连接 |
s.close() | 关闭套接字 |
TCP时间戳服务器
服务器
from socket import *
from time import ctime
HOST = "0.0.0.0"
PORT = 22222
BUFSIZ = 1024
ADDR = (HOST,PORT)
tcpSerSock = socket(AF_INET,SOCK_STREAM)
tcpSerSock.bind(ADDR)
tcpSerSock.listen(5)
while(True):
print("Waiting for connection...")
tcpCliSock, addr = tcpSerSock.accept()
print("...connected from:{}".format(addr))
while(True):
data = tcpCliSock.recv(BUFSIZ)
if not data:
break
tcpCliSock.send("[{}] {}".format(ctime(),data.decode()).encode())
tcpCliSock.close()
tcpSerSock.close()
客户端
from socket import *
from time import ctime
HOST = "192.168.0.102"
PORT = 22222
BUFSIZ = 1024
ADDR = (HOST,PORT)
tcpCliSock = socket(AF_INET,SOCK_STREAM)
tcpCliSock.connect(ADDR)
while(True):
data = input('> ')
if not data:
break
tcpCliSock.send(data.encode())
data = tcpCliSock.recv(BUFSIZ)
if not data:
break
print(data.decode())
tcpCliSock.close()
效果如下
UDP时间戳服务器
服务器
from socket import *
from time import ctime
HOST = "0.0.0.0"
PORT = 22222
BUFSIZ = 1024
ADDR = (HOST,PORT)
udpSerSock = socket(AF_INET,SOCK_DGRAM)
udpSerSock.bind(ADDR)
while(True):
print("Waiting for message...")
data, addr = udpSerSock.recvfrom(BUFSIZ)
udpSerSock.sendto("[{}] {}".format(ctime(),data.decode()).encode(),addr)
print("...received from and returned to :{}".format(addr))
udpSerSock.close()
客户端
from socket import *
HOST = "192.168.0.102"
PORT = 22222
BUFSIZ = 1024
ADDR = (HOST,PORT)
udpCliSock = socket(AF_INET,SOCK_DGRAM)
while(True):
data = input('> ')
if not data:
break
udpCliSock.sendto(data.encode(),ADDR)
data, ADDR = udpCliSock.recvfrom(BUFSIZ)
if not data:
break
print(data.decode())
udpCliSock.close()
效果如下
scoketserver模块
socketserver
是标准库中的一个高级别的模块。用于简化实现网络客户端与服务器所需要的大量样板代码。模块中已经实现了一些可以使用的类
类 | 描述 |
---|---|
BaseServer | 包含核心服务器功能和mix-in的钩子,仅用于推导,不创建类的实例 |
TCPServer/UDPServer | 基础的网络同步TCP/UDP服务器 |
UnixStreamServer/UnixDatagramServer | 基于文件的基础同步TCP/UDP服务器 |
BaseRequestHandler | 包含处理服务器请求的核心功能,仅用于推导,不创建类的实例 |
StreamRequestHandler/DatagramRequestHandler | 实现TCP/UDP服务器的服务处理器 |
scoketserver TCP 服务器
服务器
from socketserver import (TCPServer as TCP, StreamRequestHandler as SRH)
from time import ctime
HOST = "0.0.0.0"
PORT = 22222
ADDR = (HOST,PORT)
class MyRequestHandler(SRH):
def handle(self):
print("...conneceted from :{}".format(self.client_address))
self.wfile.write("[{}] {}".format(ctime(),self.rfile.readline().decode()).encode())
tcpServ = TCP(ADDR,MyRequestHandler)
print("waiting for connection...")
tcpServ.serve_forever()
客户端
from socket import *
HOST = "192.168.0.102"
PORT = 22222
BUFSIZ = 1024
ADDR = (HOST,PORT)
while(True):
tcpCliSock = socket(AF_INET,SOCK_STREAM)
tcpCliSock.connect(ADDR)
data = input('> ')
if not data:
break
tcpCliSock.send("{}\r\n".format(data).encode())
data = tcpCliSock.recv(BUFSIZ)
if not data:
break
print(data.decode().strip())
tcpCliSock.close()
效果如下
若没有本文 Issue,您可以使用 Comment 模版新建。
GitHub Issues