#!/usr/bin/env python # -*- coding: utf-8 -*- """ ICMP Knock Server ~~~~~~~~~~~~~~~~~ Listen for ICMP packets. Overview -------- This program uses a concept similar to port knocking, which basically involves waiting for a range of packets that match given criteria before executing some action (typically exposing a port, e.g. for SSH connections, that is kept closed or blocked otherwise). But instead of listening for TCP or UDP packets to arrive on specific ports in a specific order, it accepts ICMP echo requests and checks if their payload lengths and order of arrival match the given pattern. The advantage is that this approach works with the standard PING tool. Programs with custom code or (although common) networking tools like netcat plus the ability to execute them are not required. Usage ----- Using common PING utilities, information can be passed by setting a special length:: Windows: ping -l 123 11.22.33.44 Linux: ping -s 123 11.22.33.44 Prospect -------- This is not yet the end of what can be done using ICMP (or other protocols). By setting the payload data, there are more possibilities for transmitting a secret on which the server might react. It's also possible to tunnel any data (like shell commands) through ICMP, even bypassing firewalls that don't block or analyze ICMP packets to that effect. However, setting the payload is not supported by common PING utilities and would eleminate the advantage mentioned above. TODO/Ideas ---------- - reset accept window and action after X seconds - shorter output display (leave out type code?) - no-output/quiet mode - ignore packets that are too big - allow special length to be specified as parameter - make usable as module by accepting a callback or so - allow for a list of different packet sizes as secret code - pad sequences smaller than `max(len(seq.key))` to avoid interferences? - start checking smaller sequences first, break and reset on match For details, refer to :RFC:`791` (Internet Protocol) and :RFC:`792` (Internet Control Message Protocol). .. _RFC 791: http://www.faqs.org/rfcs/rfc791.html .. _RFC 297: http://www.faqs.org/rfcs/rfc792.html .. _portknocking.org: http://www.portknocking.org/ .. _Port Knocking: http://www.linuxjournal.com/article/6811 :Copyright: 2007-2008 Jochen Kupperschmidt :Date: 04-Apr-2008 :License: MIT `<http://www.opensource.org/licenses/mit-license.php>`_ """ import socket from struct import unpack import time # Decorator to bind execution of the wrapped callable to the occurence of the # given value (= payload length) sequence. actions = {} def react_on(*values): """Decorate a function to be called if the given value sequence is found.""" def wrapper(func): seq = tuple(map(int, values)) actions[seq] = func def inner(*args, **kwargs): return func(*args, **kwargs) return inner return wrapper # custom action definitions @react_on(1, 2, 3) def count_to_three(): """A sample action.""" print('One, two, three.') @react_on(22, 22) @react_on(5, 5, 5, 5, 5, 6) def open_ssh(): """Here goes stuff to adjust your firewall or something.""" print('Opening SSH...') # protocol stuff ICMP_TYPES = { 0: 'echo reply', 3: 'destination unreachable', 4: 'source quench', 5: 'redirect', 8: 'echo', 11: 'time exceeded', 12: 'parameter problem', 13: 'timestamp', 14: 'timestamp reply', 15: 'information request', 16: 'information reply', } class IPPacket(object): """An Internet Protocol packet (see :RFC:`791`).""" def __init__(self, data): header, self.payload = data[:20], data[20:] # Unpack IP header. (self.version_ihl, self.tos, self.length, self.ident, self.flags_fragoffset, self.ttl, self.proto, self.hdr_chksum) = unpack('!BBHHHBBH', header[:12]) # Get IP addresses directly from packed data. self.src_addr = socket.inet_ntoa(header[12:16]) self.dst_addr = socket.inet_ntoa(header[16:20]) class ICMPPacket(object): """An Internet Control Message Protocol packet (see :RFC:`792`).""" def __init__(self, data): header, self.payload = data[:8], data[8:] # Unpack header of ICMP echo message. (self.type, self.code, self.checksum, self.ident, self.seq_num) = unpack('!BBHHH', header) def parse_packet(data): """Parse packet and return the payload length.""" ip = IPPacket(data) icmp = ICMPPacket(ip.payload) print('ICMP message from %s, type %d (%s), code %d, %d byte payload.') % ( ip.src_addr, icmp.type, ICMP_TYPES[icmp.type], icmp.code, len(icmp.payload)) return len(icmp.payload) # server stuff class Server(object): def __init__(self, actions, knock_delay=5): self.actions = actions self.key_lengths = tuple(sorted(len(key) for key in actions.iterkeys())) self.max_key_length = max(self.key_lengths) self.seq = [] self.knock_delay = knock_delay self.last_knock = time.time() # Open socket. self.sock = socket.socket( socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) self.sock.bind(('', 1)) def append_to_seq(self, value): now = time.time() if now > (self.last_knock + self.knock_delay): # Too much time passed since last knock. self.seq = [] self.last_knock = now self.seq.append(value) if len(self.seq) > self.max_key_length: del self.seq[0] # Try to find sequence in action mapping. for i in self.key_lengths: key = tuple(self.seq[-i:]) try: self.actions[key]() self.seq = [] break except KeyError: pass def handle_request(self): """Receive and process a request.""" try: data = self.sock.recv(1024) except socket.error, e: if e[0] == 10040: print('Message too long, ignoring.') return raise self.append_to_seq(parse_packet(data)) def serve_forever(self): while True: self.handle_request() if __name__ == '__main__': try: Server(actions).serve_forever() except KeyboardInterrupt: exit('Ctrl-C pressed, aborting...')