Simple PPPoE scanner Python Script

  • Thread starter Thread starter JB701
  • Start date Start date
  • Replies Replies 25
  • Views Views 7,528
  • Tags Tags
    ppppoe
I tried to run this script . My first encounter with python btw. I followed " how to "google guides to install python and run the script . It says sent 1 packets . and nothing comes after that tried few times. Then I connected to my modem on a bridge mode directly and setup broadband connection like setting dial up method. when I click connect on that settings I am getting something printed but there is no MAC Address
 
you could try this software PPPoE Monitor download instead. its got a gui
 
I tried this, the sniff function does work, if I dial PPPoE using pppd and have this script running it shows the information about the AC its connecting to. But the scanner function is not working for me, the one which sends the PADI packets. (On an airtel connection btw)
I was testing if I could connect to another AC
 
Code:
import threading
import time
from scapy.all import *
from scapy.layers.ppp import *

IFACE = "Realtek PCIe GbE Family Controller #4"

PPP_CODES = {
    'PADI': 0x09,
    'PADO': 0x07,
    'PADR': 0x19,
    'PADS': 0x65,
    'PADT': 0xa7
}

def send_padi(smac="00:2B:67:F8:1F:78"):
    padi_discover = Ether(src=smac, dst="ff:ff:ff:ff:ff:ff", type=0x8863) / PPPoED(version=1, type=1, code=PPP_CODES['PADI'], sessionid=0x0)
    sendp(padi_discover, iface=IFACE)

def packet_callback(pkt):
    if pkt.haslayer(PPPoED) and pkt[PPPoED].code == PPP_CODES['PADO']:
        for p in pkt:
            pkt_info = p.show(dump=True)
            print(pkt_info)

def sniff_background():
    sniff(filter="(pppoed or pppoes)", prn=packet_callback, store=0, iface=IFACE)

def main():
    sniff_thread = threading.Thread(target=sniff_background)
    sniff_thread.start()

    for i in range(5):
        send_padi()
        time.sleep(0.25)

    sniff_thread.join()

if __name__ == "__main__":
    main()

here u go. this should work
 
Thanks, that worked, only AIRBRAS_PUN-3 is replying (The AC I am currently on) so looks like airtel has configured it in that way?
 
so, i made a gui version of the script. its not complete and im sure there are bugs but it "works":


Source

dtyf.jpg
 
@JB701 can you modify the script to make it scan and show VLAN tag used by OLT. Few members in our forum including me own xz000-g7 and it doesn't have the feature to scan VLAN ID unlike xz000-g3. For most of us VLAN 100 is not working for PPPoE discovery. Thanks in advance.
 
Code:
import tkinter as tk
import tkinter.ttk as ttk
from scapy.all import *
from scapy.layers.l2 import *
from scapy.layers.ppp import *

IFACE = "Realtek PCIe GbE Family Controller"

PPP_CODES = {
    'PADI': 0x09,
    'PADO': 0x07,
    'PADR': 0x19,
    'PADS': 0x65,
    'PADT': 0xa7
}

start_time = 0
end_time = 0
packet_sent = 0
packet_rec = 0
packet_loss_percentage = 0
tree = None


def return_tag(pkt, id):
    tag = str(pkt[PPPoED][id].show(dump=True))
    tag = re.search(r"tag_value = '(.+?)'", tag)
    return tag.group(1)


def send_padi(smac="00:2B:67:F8:1F:78", vlan=0):
    if vlan is not None:
        padi_discover = Ether(src=smac, dst="ff:ff:ff:ff:ff:ff")/Dot1Q(vlan=vlan)/PPPoED(version=1, type=1, code=PPP_CODES['PADI'],sessionid=0x0)
    else:
        padi_discover = Ether(src=smac, dst="ff:ff:ff:ff:ff:ff", type=0x8863) / PPPoED(version=1, type=1,
                                                                                          code=PPP_CODES['PADI'],
                                                                                          sessionid=0x0)
    sendp(padi_discover, iface=IFACE)


def retrive_service_name(x):
    regex_pattern = r"tag_type\s+=\s+Service-Name.*?tag_value\s+=\s+'([^']+)'"

    matches = re.findall(regex_pattern, x, re.DOTALL)
    matches = ', '.join(matches)
    return matches


def packet_callback(pkt):
    global start_time, end_time, packet_sent, packet_rec, packet_loss_percentage, tree
    if pkt.haslayer(PPPoED) and pkt[PPPoED].code == PPP_CODES['PADI']:
        start_time = time.perf_counter()
        packet_sent += 1

    if pkt.haslayer(PPPoED) and pkt[PPPoED].code == PPP_CODES['PADO']:
        end_time = time.perf_counter()
        rtt = (end_time - start_time) * 100
        packet_rec += 1

        if packet_sent and packet_rec != 0:
            packet_loss_percentage = ((packet_sent - packet_rec) / packet_sent) * 100
        else:
            packet_loss_percentage = "NA"

        src_mac = pkt[Ether].src
        ac_name = return_tag(pkt, 3)
        service_name = retrive_service_name(pkt[PPPoED].show(dump=True))

        try:
            data = (src_mac, ac_name, service_name, str(rtt), str(packet_sent), str(packet_rec),
                    str(packet_loss_percentage))

            items = tree.get_children()

            src_mac_exists = False

            for item in items:
                values = tree.item(item, 'values')

                if values[0] == src_mac:
                    src_mac_exists = True
                    tree.item(item, values=data)
                    break

            else:
                tree.insert("", "end", values=data)
        except:
            pass


def sniff_background():
    sniff(filter="(pppoed or pppoes or vlan)", prn=packet_callback, store=0, iface=IFACE)


def guithread():
    global tree
    pppoemon = tk.Tk()
    tree = ttk.Treeview(pppoemon, columns=("MAC Address", "AC Name", "Service Name", "RTT(ms)", "Pkt Sent",
                                            "Pkt Rec", "Pkt Loss"))
    pppoemon.title("PPPoE Monitor")
    heading = tk.Label(pppoemon, text="PPPoE Monitor")

    tree.heading("MAC Address", text="MAC Address")
    tree.heading("AC Name", text="AC Name")
    tree.heading("Service Name", text="Service Name")
    tree.heading("RTT(ms)", text="RTT(ms)")
    tree.heading("Pkt Sent", text="Pkt Sent")
    tree.heading("Pkt Rec", text="Pkt Rec")
    tree.heading("Pkt Loss", text="Pkt Loss")
    tree.pack()

    heading.pack()

    pppoemon.mainloop()


def main():
    sniff_thread = threading.Thread(target=sniff_background)
    sniff_thread.start()

    gui_thread = threading.Thread(target=guithread)
    gui_thread.start()

    for i in range(150):
        send_padi()
        time.sleep(2)


if __name__ == "__main__":
    main()

Modify VLAN under send_padi function
 
@JB701, thanks for the great work on this script!
Can you please help as to what all are we supposed to modify in this script? And if I understood correctly, our computer has to be connected to our ONT which must be in bridge mode, right?
 

Back