• 欢迎光临~

Python实现网络工具

开发技术 开发技术 2022-05-28 次浏览

使用python编写网络工具

基础内容

介绍基本的网络编程

Socket编程

Socket又称"套接字",应用程序通常通过"套接字"向网络发出请求或者应答网络请求,使主机间或者一台计算机上的进程间可以通讯。使用Python中的socket库就可以进行网络相关的编程。

函数 描述
服务器端套接字
s.bind() 绑定地址(host,port)到套接字, 在 AF_INET下,以元组(host,port)的形式表示地址。
s.listen() 开始 TCP 监听。backlog 指定在拒绝连接之前,操作系统可以挂起的最大连接数量。该值至少为 1,大部分应用程序设为 5 就可以了。
s.accept() 被动接受TCP客户端连接,(阻塞式)等待连接的到来
客户端套接字
s.connect() 主动初始化TCP服务器连接,。一般address的格式为元组(hostname,port),如果连接出错,返回socket.error错误。
s.connect_ex() connect()函数的扩展版本,出错时返回出错码,而不是抛出异常
公共用途的套接字函数
s.recv() 接收 TCP 数据,数据以字符串形式返回,bufsize 指定要接收的最大数据量。flag 提供有关消息的其他信息,通常可以忽略。
s.send() 发送 TCP 数据,将 string 中的数据发送到连接的套接字。返回值是要发送的字节数量,该数量可能小于 string 的字节大小。
s.sendall() 完整发送 TCP 数据。将 string 中的数据发送到连接的套接字,但在返回之前会尝试发送所有数据。成功返回 None,失败则抛出异常。
s.recvfrom() 接收 UDP 数据,与 recv() 类似,但返回值是(data,address)。其中 data 是包含接收数据的字符串,address 是发送数据的套接字地址。
s.sendto() 发送 UDP 数据,将数据发送到套接字,address 是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。
s.close() 关闭套接字

TCP客户端

import socket

host = "www.baidu.com"
port = 80
# 创建socket对象
client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 连接服务器
client.connect((host,port))
# 发送数据 HTTP请求
client.send(b"GET / HTTP/1.1rnHost: baidu.comrnrn")
# 接收响应数据
response = client.recv(4096)
# 打印数据
print(response.decode())
client.close()

使用socket函数创建一个socket对象。AF_INET表示使用标准的IPv4地址或主机名,SOCK_STREAM是流式套接字,表示使用TCP。上述程序向www.baidu.com对应的主机发送了一个HTTP GET请求,并且得到了响应

HTTP/1.1 200 OK
Accept-Ranges: bytes
Cache-Control: no-cache
Connection: keep-alive
Content-Length: 9508
Content-Type: text/html
...

UDP客户端

import socket

host = "127.0.0.1"
port = 9091
# 创建socket对象
client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
# 发送数据
client.sendto(b"AAAAAAAAAA",(host,port))
# 接收数据
data,addr = client.recvfrom(4096)
# 打印响应信息
print(data.decode())
client.close()

使用UDP时,socket中的函数中的第二个参数变成了SOCK_DGRAM即数据报套接字,并且发送数据时使用sendto函数,接收数据时使用recvfrom函数。

TCP服务端

import socket
import threading
from urllib import request

IP = '0.0.0.0'
PORT = 9999

def handler(client_socket):
    with client_socket as sock:
        request = sock.recv(1024)
        print(f'[*] Received: {request.decode("utf-8")}')
        sock.send(b"ACK")


def main():
    server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    server.bind((IP,PORT))
    server.listen(5)
    print(f"[*] Listening on {IP}:{PORT}")
    
    while True:
        client,address = server.accept()
        print(f"[*] Accepted connection from {address[0]:{address[1]}}")
        client_handler = threading.Thread(target=handler,args=(client,))
        client_handler.start()

if __name__ == "__main__":
    main()

指定服务器应该监听哪个IP地址和端口,接着让服务器开始监听,并且将最大连接数设置为5。下一步让服务器进入主循环,并在该循环中等待外来连接。当一个客户端成功建立连接时,将接收到的客户端socket对象保存到client变量中,将远程连接的详细信息保存到address变量中。然后,创建一个新的线程,让它指向handler函数,并传入client变量。创建好后,启动这个线程来处理接收到的连接,与此同时服务端的主循环也已经准备好处理下一个外来连接,而handler函数会调用recv接收数据,并给客户端发送一段简单的回复。

ipaddress处理IP地址

文档: ipaddress模块介绍

在下面案例中会用到的几个重要方法

import ipaddress

addr = ipaddress.ip_address('192.0.2.1') # 返回一个object
print(addr.version) # 打印IP版本
net4 = ipaddress.ip_network('192.0.2.0/24') # 返回接口
print(net4.num_addresses) # 答打印地址数
for host in net4.hosts(): # 遍历
    print(host)

输出

4
256
192.0.2.1
192.0.2.2
192.0.2.3
192.0.2.4
192.0.2.5
192.0.2.6
192.0.2.7
192.0.2.8
192.0.2.9
192.0.2.10
192.0.2.11

如上代码迭代显示该网段上的"可用"的独立地址

基于SOCKET的网络工具

大部分操作系统都会执行一个操作: 向一台主机发送一格UDP数据包时,如果主机上的UDP端口没有开启,一般会返回一个ICMP包来提示目标端口不可访问,可以以此来判断主机是否存活。

嗅探流量包

使用Python的socket库,代码如下

import socket
import os

host = '10.81.226.234'

def main():
    # 判断系统是否为windows
    if os.name == 'nt':
        # windwos允许嗅探任何协议
        socket_protocol = socket.IPPROTO_IP
    else:
        # Linux强制指定一个协议进行嗅探
        socket_protocol = socket.IPPROTO_ICMP

    # 创建socket对象
    sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
    sniffer.bind((host,0))
    # 设置包含IP头
    sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

    # 对windows机器 额外启用混杂模式
    if os.name == 'nt':
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

    # 读取一个数据包
    print(sniffer.recvfrom(65565))

    # 关闭混杂模式
    if os.name == 'nt':
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)


if __name__ == '__main__':
    main()

host变量为本机IP地址,然后创建一个socket对象,使用bind函数绑定socket对象,使用setsockopt函数修改socket设置,使其在抓包时包含IP头部。之后会判断平台是否为windows,如果是则启动网卡的混杂模式。

运行输出

$ sudo python3 sock_sniffer.py 
(b'Exc0x00Xx88jx00x00@x01x17x04nQxe2xeanQxe2xeax03x01?+x00x00x00x00Ex00x00<n)@x00@x06xc8x0fnQxe2xeaoxb2x0bx96xafx1ex00Px1fxd8Yxfdx00x00x00x00xa0x02xfaxf0hxb2x00x00x02x04x05xb4x04x02x08nx9ex9dxda}x00x00x00x00x01x03x03x07', ('10.81.226.234', 0))

毫无疑问,这些输出相当凌乱,可以将其进行解码

解码IP层

进行更深层次解析分析,提取诸如协议类型、源IP地址和目的IP地址等有用的信息

如下是IPv4报文头

Python实现网络工具

因为是二进制数据,要对IP头各个数据段进行分割

使用ctypes库或者struct库

Ctypes库拆解IP字段

ctypes提供了各种兼容C语言的数据结构,可调用符合C语言标准的共享库中的函数

文档介绍: ctypes — A foreign function library for Python

示例代码

from ctypes import *
import socket
import struct

from numpy import uint32

class IP(Structure):
    _fields_ = [
        ("ihl", c_ubyte, 4),
        ("version", c_ubyte, 4),
        ("tos", c_ushort, 8),
        ("len", c_ushort, 16),
        ("id", c_ushort, 16),
        ("offset", c_ushort, 16),
        ("ttl", c_ubyte, 8),
        ("protocol_num", c_ubyte, 8),
        ("sum", c_ushort, 16),
        ("src", c_uint32, 32),
        ("dst", c-uint32, 32)
    ]

    def __new__(cls, socket_buffer=None):
        return cls.from_buffer_copy(socket_buffer)

    def __init__(self,socket_buffert=None):
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))

这个类创建了一个名为_fields_的结构,用于定义IP头各个部分。该结构使用了ctypes里定义的C语言数据类型,例如代表unsigned char 类型的c_ubyte,代表unsigned short的c_ushort等。上述代码中定义的字段和上图中的IP头部中的字段一一对应。各个字段的定义有3个字段组成: 字段名称、数据类型以及字段位数。设置字段位数使得能够以位为单位指定数据长度,这意味着能够自由指定想要的长度。

在上述代码中,IP类继承自ctypes库的Structure类,要求创建对象前必须定义_fields_结构。为了向fields结构里填充数据,Structure类利用了_new_函数。此函数的第一个参数是指向当前类的引用,new函数用该引用创建当前类的第一个对象。之后这个对象被传给_init_函数进行初始化。

struct库拆解IP字段

该库提供了一些格式字符,用来定义二进制数据的结构

import ipaddress
import struct

class IP:
    def __init__(self,buff=None):
        header = struct.unpack('<BBHHHBBH4s4s',buff)
        self.ver = header[0] >> 4
        self.ihl = header[0] & 0xF

        self.tos = header[1]
        self.len = header[2]
        self.id = header[3]
        self.offset = header[4]
        self.ttl = header[5]
        self.protocol = header[6]
        self.sum = header[7]
        self.src = header[8]
        self.dst = header[9]

        self.src_address = ipaddress.ip_address(self.src)
        self.dst_address = ipaddress.ip_address(self.dst)

        # 映射
        self.protocol_map = {1:"ICMP",6:"TCP",17:"UDP"}

<BBHHHBBH4s4s中的<表示数据字节序,这里表示小端序。

BBHHHBBH4s4s表示IP头的各部分。struct库中提供了若干格式字符。B是1字节,H是2字节,ns是n个字节的数组。

因为不能按位指定想要的长度了,但这里先获取version,只要取第一个字节的高4位,所以使用>>4进行偏移,将高4位向低位偏移4位,原来的高位用0补齐,原来的低4为被覆盖,这样就得到了version。ihl是第一个字节的低4位,将第一个字节与二进制数00001111相与即可得到,对其他字段的获取只要顺次按下标获取即可。

IP解码器

如下是完整代码实现

import ipaddress
import os
from shutil import ExecError
import socket
import struct
import sys


class IP:
    def __init__(self, buff=None):
        header = struct.unpack('<BBHHHBBH4s4s', buff)
        self.ver = header[0] >> 4
        self.ihl = header[0] & 0xF

        self.tos = header[1]
        self.len = header[2]
        self.id = header[3]
        self.offset = header[4]
        self.ttl = header[5]
        self.protocol_num = header[6]
        self.sum = header[7]
        self.src = header[8]
        self.dst = header[9]

        self.src_address = ipaddress.ip_address(self.src)
        self.dst_address = ipaddress.ip_address(self.dst)

        # 映射
        self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"}

        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            print(f'No protocol {self.protocol_num}')
            self.protocol = str(self.protocol_num)


def sniff(host):
    if os.name == 'nt':
        socket_protocol = socket.IPPROTO_IP
    else:
        socket_protocol = socket.IPPROTO_ICMP

    sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
    sniffer.bind((host, 0))
    sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

    if os.name == 'nt':
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

    try:
        while True:
            raw_buffer = sniffer.recvfrom(65535)[0]
            ip_header = IP(raw_buffer[0:20])
            print(
                f"Protocol: {ip_header.protocol} {ip_header.src_address} -> {ip_header.dst_address}")

    except KeyboardInterrupt:
        if os.name == 'nt':
            sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
        sys.exit()


if __name__ == "__main__":
    if len(sys.argv) == 2:
        host = sys.argv[1]
    else:
        host = "127.0.0.1"

    sniff(host)

首先实现IP类,它定义了一个Python结构,可以把数据包的前20字节映射到读/写的IP头对象,将IP数据包中的字段拆解出来,可供用户识别。将之前实现的抓包程序合并进来,就实现了完整的流程。

测试运行:

因为是Linux平台,所以只能看到ICMP协议数据,运行程序后

使用ping命令 ping baidu.com

如下是程序输出

$ sudo python3 ip_decoder.py 10.81.226.234
Protocol: ICMP 10.81.226.234 -> 10.81.226.234
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Protocol: ICMP 10.81.226.234 -> 10.81.226.234
Protocol: ICMP 220.181.38.148 -> 10.81.226.234

ICMP解码器

解码了IP层数据后还要解码ICMP响应。不同的ICMP消息之间千差万别,但一定存在3个字段:类型、代码和校验和。类型和代码表示了要接收的ICMP消息是什么类型的,也就指明了如何正确地解码其中的数据。

字段 长度 含义
Type 1字节 消息类型:

- 0:回显应答报文
- 8:请求回显报文
Code 1字节 消息代码,此处值为0。
Checksum 2字节 检验和。
Identifier 2字节 标识符,发送端标示此发送的报文
Sequence Number 2字节 序列号,发送端发送的报文的顺序号。每发送一次顺序号就加1。
Data 可变 选项数据,是一个可变长的字段,其中包含要返回给发送者的数据。回显应答通常返回与所收到的数据完全相同的数据。

如上所示,数据包开头的8个二进制位代表类型,其后的8个二进制位代表ICMP消息代码。

完整代码实现

import ipaddress
import os
import socket
import struct
import sys

class IP:
    def __init__(self, buff=None):
        header = struct.unpack('<BBHHHBBH4s4s', buff)
        self.ver = header[0] >> 4
        self.ihl = header[0] & 0xF

        self.tos = header[1]
        self.len = header[2]
        self.id = header[3]
        self.offset = header[4]
        self.ttl = header[5]
        self.protocol_num = header[6]
        self.sum = header[7]
        self.src = header[8]
        self.dst = header[9]

        self.src_address = ipaddress.ip_address(self.src)
        self.dst_address = ipaddress.ip_address(self.dst)

        # 映射
        self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"}

        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            print(f'No protocol {self.protocol_num}')
            self.protocol = str(self.protocol_num)

class ICMP:
    def __init__(self,buff):
        header = struct.unpack('<BBHHH',buff)
        self.type = header[0]
        self.code = header[1]
        self.sum = header[2]
        self.id = header[3]
        self.seq = header[4]

def sniff(host):
    if os.name == 'nt':
        socket_protocol = socket.IPPROTO_IP
    else:
        socket_protocol = socket.IPPROTO_ICMP

    sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
    sniffer.bind((host, 0))
    sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

    if os.name == 'nt':
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

    try:
        while True:
            raw_buffer = sniffer.recvfrom(65535)[0]
            ip_header = IP(raw_buffer[0:20])
            if ip_header.protocol == "ICMP":
                print(f"Protocol: {ip_header.protocol} {ip_header.src_address} -> {ip_header.dst_address}")
                print(f"Version: {ip_header.ver}")
                print(f"Header Length: {ip_header.ihl} TTL: {ip_header.ttl}")

                offset = ip_header.ihl * 4
                buf = raw_buffer[offset:offset+8]
                icmp_header = ICMP(buf)
                print(f"ICMP -> Type: {icmp_header.type} Code: {icmp_header.code}")

    except KeyboardInterrupt:
        if os.name == 'nt':
            sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
        sys.exit()

if __name__ == "__main__":
    if len(sys.argv) == 2:
        host = sys.argv[1]
    else:
        host = "127.0.0.1"

    sniff(host)

如上代码中增加了ICMP消息结构及其解析。在sniff中也增加了一系列流程:

在接受数据包的主循环中判断接收到的数据包是否为ICMP数据包,然后计算出ICMP数据在原始数据包中的偏移,最后将数据按照ICMP结构进行解析,输出其中的类型(type)和代码(code)字段。IP头的长度是基于IP头中的ihl字段计算的,该字段记录了IP头中有多少个32位(4字节)长的数据块,将这个字段乘以4就能计算出IP头的大小,以及数据包中下一网络层开始的位置。

运行测试

$ sudo python3 icmp_decoder.py 10.81.226.234
Protocol: ICMP 10.81.226.234 -> 10.81.226.234
Version: 4
Header Length: 5 TTL: 64
ICMP -> Type: 3 Code: 1
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Version: 4
Header Length: 5 TTL: 52
ICMP -> Type: 0 Code: 0
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Version: 4
Header Length: 5 TTL: 52
ICMP -> Type: 0 Code: 0
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Version: 4

基于UDP的主机扫描器

完整代码如下

import ipaddress
import os
import socket
import struct
import sys
import threading
import time

SUBNET = '10.81.226.0/24'
MESSAGE = 'PYTHONRULES' # 签名

# IP消息结构
class IP:
    def __init__(self, buff=None):
        header = struct.unpack('<BBHHHBBH4s4s', buff)
        self.ver = header[0] >> 4
        self.ihl = header[0] & 0xF

        self.tos = header[1]
        self.len = header[2]
        self.id = header[3]
        self.offset = header[4]
        self.ttl = header[5]
        self.protocol_num = header[6]
        self.sum = header[7]
        self.src = header[8]
        self.dst = header[9]

        self.src_address = ipaddress.ip_address(self.src)
        self.dst_address = ipaddress.ip_address(self.dst)

        # 映射
        self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"}

        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            print(f'No protocol {self.protocol_num}')
            self.protocol = str(self.protocol_num)

# ICMP消息结构
class ICMP:
    def __init__(self, buff):
        header = struct.unpack('<BBHHH', buff)
        self.type = header[0]
        self.code = header[1]
        self.sum = header[2]
        self.id = header[3]
        self.seq = header[4]

# 发送UDP数据包
def udp_sender():
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sender:
        for ip in ipaddress.ip_network(SUBNET).hosts(): # 遍历子网IP地址
            sender.sendto(bytes(MESSAGE, 'utf8'), (str(ip), 65212))

# 扫描
class Scanner:
    def __init__(self, host):
        self.host = host
        if os.name == 'nt':
            socket_protocol = socket.IPPROTO_IP
        else:
            socket_protocol = socket.IPPROTO_ICMP

        self.socket = socket.socket(
            socket.AF_INET, socket.SOCK_RAW, socket_protocol)
        self.socket.bind((host, 0))
        self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

        if os.name == 'nt':
            self.socket.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
    
    # 获取数据包
    def sniff(self):
        hosts_up = set([f"{str(self.host)}"])
        try:
            while True:
                raw_buffer = self.socket.recvfrom(65535)[0]
                ip_header = IP(raw_buffer[0:20])
                if ip_header.protocol == "ICMP":
                    offset = ip_header.ihl * 4
                    buff = raw_buffer[offset:offset+8]
                    icmp_header = ICMP(buff)
                    if icmp_header.code == 3 and icmp_header.type == 3:
                        if ipaddress.ip_address(ip_header.src_address) in ipaddress.IPv4Network(SUBNET):
                            if raw_buffer[len(raw_buffer) - len(MESSAGE):] == bytes(MESSAGE, 'utf8'):
                                target = str(ip_header.src_address)
                                if target != self.host and target not in hosts_up:
                                    hosts_up.add(str(ip_header.src_address))
                                    print(f"Host UP: {target}")
        # 处理键盘中断
        except KeyboardInterrupt:
            if os.name == 'nt':
                self.socket.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

            print('nUser interrupted')
            if hosts_up:
                print(f'nnSummary: Hosts up on {SUBNET}')
            for host in sorted(hosts_up):
                print(f'{host}')
            print('')
            sys.exit()


if __name__ == "__main__":
    if len(sys.argv) == 2:
        host = sys.argv[1]
    else:
        host = '127.0.0.1'
    scan = Scanner(host)
    time.sleep(3)
    thread = threading.Thread(target=udp_sender)
    thread.start()
    scan.sniff()

程序的开头定义了一个字符串作为签名,udp_sender负责遍历子网下的IP地址,发送UDP数据包。接着定义了一个名叫Scanner的类,向其中传入扫描器所在主机的IP地址来初始化它。sniff函数负责嗅探网络上的数据,解析并提取报文格式,并将在线的主机记录下来。接收到ICMP消息时,要检查这个响应是不是来自正在扫描的网段,然后检查ICMP消息里有没有自定义的签名("PYTHONRULES!")。如果检查通过则打印发送这条消息的主机IP地址,如果使用Ctrl+C中断该程序,就会把目前扫描出来的结果打印。

运行测试

$ sudo python3 scanner.py 10.81.226.234
Host UP: 10.81.226.3
Host UP: 10.81.226.1
Host UP: 10.81.226.251
Host UP: 10.81.226.14
Host UP: 10.81.226.51
Host UP: 10.81.226.37
Host UP: 10.81.226.34
Host UP: 10.81.226.39
Host UP: 10.81.226.23
...

Scapy的使用

使用Scapy进行流量嗅探

安装python3版本:$ sudo pip3 install scapy

捕获数据

编写一个能分解并输出数据包内容的基础嗅探器。Scapy提供了一个接口函数sniff:

sniff(filter="",iface="any",prn=function,count=N)

filter参数允许指定一个Berkeley数据包过滤器,用于过滤嗅探到的数据包。将此参数留空则代表要嗅探所有数据包。

iface参数用于指定嗅探器要嗅探的网卡,如果不设置的话,默认会嗅探所有网卡。

prn参数用于指定一个回调函数,每当遇到符合过滤条件的数据包时,嗅探器会将该数据包传给这个回调函数,这是该函数接收的唯一参数。

count参数可以用来指定要嗅探的数据包数量

from scapy.all import sniff

# 回调函数 接收数据包
def packet_callback(packet):
    print(packet.show())

def main():
    sniff(prn=packet_callback,count=1)

if __name__ == "__main__":
    main()

如上代码所示,该程序使用sniff获取一个数据包并显示

使用管理员权限运行

$ sudo python3 sniff.py 
###[ Ethernet ]### 
  dst       = 14:14:4b:81:96:23
  src       = 34:cf:f6:89:e3:83
  type      = IPv4
###[ IP ]### 
     version   = 4
     ihl       = 5
     tos       = 0x0
     len       = 60
     id        = 44584
     flags     = DF
     frag      = 0
     ttl       = 64
     proto     = tcp
     chksum    = 0x934c
     src       = 10.81.226.234
     dst       = 8.8.4.4
     options   
###[ TCP ]### 
        sport     = 54080
        dport     = https
        seq       = 2355326062
        ack       = 0
        dataofs   = 10
        reserved  = 0
        flags     = S
        window    = 64240
        chksum    = 0xf975
        urgptr    = 0
        options   = [('MSS', 1460), ('SAckOK', b''), ('Timestamp', (889222417, 0)), ('NOP', None), ('WScale', 7)]

None

使用show方法将数据包显示在终端。可以看到打印出的信息按照数据链路层、网络层、传输层的层次显示。

下面解释上述部分信息的含义:

Ethernet是以太网,该层下的信息对应了以太网帧结构的一部分: 目的地址、源地址、数据字段。以太网帧中的数据字段存放了IP数据报,到IP层被解析

IP层下对应的信息是IP数据包中的关键字段:

version是版本号,4即IPv4,ihl是报文头长度,以字节为单位

tos是服务类型,该字段使不同类型的数据报能区分开来,len是数据报长度

id flags frag是标识、标志、片偏移,与IP分片有关

ttl是寿命,即Time-To-Live,这确保了数据报不会永远在网络中循环,当TTL=0,则该数据报会被丢弃

proto是协议,该字段值指示了IP数据报的数据部分应交给哪个特定的运输层协议

checksum是首部校验和,帮助路由器检测数据报中的比特错误

srcdst分别是源IP和目的IP

上述打印结果的最后一层是TCP字段报文结构,sport是源端口号,dport是目的端口。

seqack是序号和确认号,这些字段被用于实现TCP的可靠数据传输服务(三次握手)

BPF语法

概念 描述 示例
描述词 想匹配的数据 host, net, port
数据流方向 数据行进的方向 src, dst, src or dst
通信协议 发送数据所用协议 ip, ip6, tcp, udp

examples:

只捕获来源于网络中某一IP的主机流量: src host 192.168.10.1

只捕获80端口: port 80

只捕获ICMP流量: ICMP

保存数据

示例代码

from scapy.all import *

def capture(packet):
    print("*"*30)
    print(f"source:{packet[IP].src}:{packet.sport} target:{packet[IP].dst}:{packet.sport}")
    print(packet.show())
    print("*"*30)

if __name__ == "__main__":
    packets = sniff(filter="tcp and port 80",prn=capture,count=10)
    # 保存输出文件
    wrpcap("res.pcap",packets)

该程序只获取TCP端口为80的数据包,使用wrpcap函数进行保存,pcap文件可以使用wireshark打开。

发送数据

send(IP(dst="www.baidu.com",ttl=1)/ICMP())

使用send函数发送ICMP数据包,可以设置字段值,指定协议名称,从第三层发送数据包。

sendp(Ether()/IP(dst="127.0.0.1",ttl=1)/ICMP())

使用sendp函数,从第二层发送数据包,上面两个函数都没有接收功能。

sr函数可以发送数据包,并且接收数据,会返回2个列表,第一个是有应答的answer,第二个是无应答的answer

from scapy.all import sr,sr1,IP,ICMP

a,b=sr(IP(dst="www.baidu.com",ttl=1)/ICMP())
print(a.show())
print(b.show())

输出

$ sudo python3 send.py
Begin emission:
Finished sending 1 packets.
.*
Received 2 packets, got 1 answers, remaining 0 packets
0000 IP / ICMP 10.81.226.234 > 180.101.49.11 echo-request 0 ==> IP / ICMP 10.81.255.254 > 10.81.226.234 time-exceeded ttl-zero-during-transit / IPerror / ICMPerror / Padding
None
None

sr1函数只返回1个列表,有应答的answer。

from scapy.all import sr,sr1,IP,ICMP

p =sr1(IP(dst="www.baidu.com",ttl=1)/ICMP())
print(p.show())

输出

$ sudo python3 send.py
Begin emission:
Finished sending 1 packets.
.*
Received 2 packets, got 1 answers, remaining 0 packets
###[ IP ]### 
  version   = 4
  ihl       = 5
  tos       = 0xc0
  len       = 70
  id        = 27585
  flags     = 
  frag      = 0
  ttl       = 64
  proto     = icmp
  chksum    = 0x16ab
  src       = 10.81.255.254
  dst       = 10.81.226.234
  options   
###[ ICMP ]### 
     type      = time-exceeded
     code      = ttl-zero-during-transit
     chksum    = 0xf4ff
     reserved  = 0
     length    = 0
     unused    = 0
###[ IP in ICMP ]### 
        version   = 4
        ihl       = 5
        tos       = 0x0
        len       = 28
        id        = 1
        flags     = 
        frag      = 0
        ttl       = 1
        proto     = icmp
        chksum    = 0xe734
        src       = 10.81.226.234
        dst       = 180.101.49.11
        options   
###[ ICMP in ICMP ]### 
           type      = echo-request
           code      = 0
           chksum    = 0xf7ff
           id        = 0x0
           seq       = 0x0
           unused    = ''
###[ Padding ]### 
              load      = 'x00x00x00x00x00x00x00x00x00x00x00x00x00x00'

None

基于ICMP的主机探测

发送一个ICMP请求消息给目标主机,若源主机收到目标主机的应答响应消息,则表示目标可达。

from socket import timeout
from tabnanny import verbose
from scapy.all import IP,ICMP,sr1
from random import randint
from optparse import OptionParser
import ipaddress
import logging

# 关闭报错
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)

hosts = [] # 存放结果
def scan(ip):
    # 生成随机字段
    ip_id = randint(1,65535) # IP报文标识符
    icmp_id = randint(1,65535) # ICMP报文标识符 
    icmp_seq = randint(1,65535) # ICMP序列号
    packet = IP(dst=ip,ttl=64,id=ip_id)/ICMP(id=icmp_id,seq=icmp_seq)/b'rootkit'
    result = sr1(packet,timeout=1,verbose=False) # 发送并接收数据包
    if result:
        for rcv in result:
            scan_ip = rcv[IP].src
            print(scan_ip,'Host is up')
            hosts.append(scan_ip)
    else:
        print(ip,'Host is down')

def main():
    # 命令行参数选项
    parser = OptionParser("Usage: %prog -i <target host>") # 输出帮助信息
    parser.add_option('-i',type='string',dest='IP',help='specify target host') # 获取IP地址参数
    options,args = parser.parse_args() # 解析参数
    print("Scan report for " + options.IP + "n")
    # 判断单台主机还是网段
    if '/' not in options.IP:
        scan(options.IP)
    else:
        net4 = ipaddress.ip_network(options.IP)
        for ip in net4.hosts():
            scan(str(ip))
    
    print("scan result:")
    for host in hosts:
        print(host)

if __name__ == "__main__":
    main()

该程序首先解析命令行参数,然后判断参数是单个IP还是网段,如果是网段则遍历其子网IP进行处理。

程序员灯塔
转载请注明原文链接:Python实现网络工具
喜欢 (0)