import tkinter as tk
import tkinter.ttk as ttk
from scapy.all import *
from scapy.layers.l2 import *
from scapy.layers.ppp import *
IFACE = "Realtek PCIe GbE Family Controller"
PPP_CODES = {
'PADI': 0x09,
'PADO': 0x07,
'PADR': 0x19,
'PADS': 0x65,
'PADT': 0xa7
}
start_time = 0
end_time = 0
packet_sent = 0
packet_rec = 0
packet_loss_percentage = 0
tree = None
def return_tag(pkt, id):
tag = str(pkt[PPPoED][id].show(dump=True))
tag = re.search(r"tag_value = '(.+?)'", tag)
return tag.group(1)
def send_padi(smac="00:2B:67:F8:1F:78", vlan=0):
if vlan is not None:
padi_discover = Ether(src=smac, dst="ff:ff:ff:ff:ff:ff")/Dot1Q(vlan=vlan)/PPPoED(version=1, type=1, code=PPP_CODES['PADI'],sessionid=0x0)
else:
padi_discover = Ether(src=smac, dst="ff:ff:ff:ff:ff:ff", type=0x8863) / PPPoED(version=1, type=1,
code=PPP_CODES['PADI'],
sessionid=0x0)
sendp(padi_discover, iface=IFACE)
def retrive_service_name(x):
regex_pattern = r"tag_type\s+=\s+Service-Name.*?tag_value\s+=\s+'([^']+)'"
matches = re.findall(regex_pattern, x, re.DOTALL)
matches = ', '.join(matches)
return matches
def packet_callback(pkt):
global start_time, end_time, packet_sent, packet_rec, packet_loss_percentage, tree
if pkt.haslayer(PPPoED) and pkt[PPPoED].code == PPP_CODES['PADI']:
start_time = time.perf_counter()
packet_sent += 1
if pkt.haslayer(PPPoED) and pkt[PPPoED].code == PPP_CODES['PADO']:
end_time = time.perf_counter()
rtt = (end_time - start_time) * 100
packet_rec += 1
if packet_sent and packet_rec != 0:
packet_loss_percentage = ((packet_sent - packet_rec) / packet_sent) * 100
else:
packet_loss_percentage = "NA"
src_mac = pkt[Ether].src
ac_name = return_tag(pkt, 3)
service_name = retrive_service_name(pkt[PPPoED].show(dump=True))
try:
data = (src_mac, ac_name, service_name, str(rtt), str(packet_sent), str(packet_rec),
str(packet_loss_percentage))
items = tree.get_children()
src_mac_exists = False
for item in items:
values = tree.item(item, 'values')
if values[0] == src_mac:
src_mac_exists = True
tree.item(item, values=data)
break
else:
tree.insert("", "end", values=data)
except:
pass
def sniff_background():
sniff(filter="(pppoed or pppoes or vlan)", prn=packet_callback, store=0, iface=IFACE)
def guithread():
global tree
pppoemon = tk.Tk()
tree = ttk.Treeview(pppoemon, columns=("MAC Address", "AC Name", "Service Name", "RTT(ms)", "Pkt Sent",
"Pkt Rec", "Pkt Loss"))
pppoemon.title("PPPoE Monitor")
heading = tk.Label(pppoemon, text="PPPoE Monitor")
tree.heading("MAC Address", text="MAC Address")
tree.heading("AC Name", text="AC Name")
tree.heading("Service Name", text="Service Name")
tree.heading("RTT(ms)", text="RTT(ms)")
tree.heading("Pkt Sent", text="Pkt Sent")
tree.heading("Pkt Rec", text="Pkt Rec")
tree.heading("Pkt Loss", text="Pkt Loss")
tree.pack()
heading.pack()
pppoemon.mainloop()
def main():
sniff_thread = threading.Thread(target=sniff_background)
sniff_thread.start()
gui_thread = threading.Thread(target=guithread)
gui_thread.start()
for i in range(150):
send_padi()
time.sleep(2)
if __name__ == "__main__":
main()