使用python 实现icmp测试主机存活性
代碼:
??????
#!/usr/bin/env python#coding:utf-8import os, sys, socket, struct, select, time# From /usr/include/linux/icmp.h; your milage may vary.ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.def checksum(source_string):??"""??I'm not too confident that this is right but testing seems??to suggest that it gives the same answers as in_cksum in ping.c??"""??sum = 0??countTo = (len(source_string)/2)*2??count = 0??while count<countTo:????thisVal = ord(source_string[count + 1])*256 + ord(source_string[count])????sum = sum + thisVal????sum = sum & 0xffffffff # Necessary?????count = count + 2??if countTo<len(source_string):????sum = sum + ord(source_string[len(source_string) - 1])????sum = sum & 0xffffffff # Necessary???sum = (sum >> 16) + (sum & 0xffff)??sum = sum + (sum >> 16)??answer = ~sum??answer = answer & 0xffff??# Swap bytes. Bugger me if I know why.??answer = answer >> 8 | (answer << 8 & 0xff00)??return answerdef receive_one_ping(my_socket, ID, timeout):??"""??receive the ping from the socket.??"""??timeLeft = timeout??while True:????startedSelect = time.time()????whatReady = select.select([my_socket], [], [], timeLeft)????howLongInSelect = (time.time() - startedSelect)????if whatReady[0] == []: # Timeout??????return????timeReceived = time.time()????recPacket, addr = my_socket.recvfrom(1024)????icmpHeader = recPacket[20:28]????type, code, checksum, packetID, sequence = struct.unpack(??????"bbHHh", icmpHeader????)????if packetID == ID:??????bytesInDouble = struct.calcsize("d")??????timeSent = struct.unpack("d", recPacket[28:28 + bytesInDouble])[0]??????return timeReceived - timeSent????timeLeft = timeLeft - howLongInSelect????if timeLeft <= 0:??????returndef send_one_ping(my_socket, dest_addr, ID):??"""??Send one ping to the given >dest_addr<.??"""??dest_addr = socket.gethostbyname(dest_addr)??# Header is type (8), code (8), checksum (16), id (16), sequence (16)??my_checksum = 0??# Make a dummy heder with a 0 checksum.??header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1) #壓包??#a1 = struct.unpack("bbHHh",header)? #my test??bytesInDouble = struct.calcsize("d")??data = (192 - bytesInDouble) * "Q"??data = struct.pack("d", time.time()) + data??# Calculate the checksum on the data and the dummy header.??my_checksum = checksum(header + data)??# Now that we have the right checksum, we put that in. It's just easier??# to make up a new header than to stuff it into the dummy.??header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1)??packet = header + data??my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1def do_one(dest_addr, timeout):??"""??Returns either the delay (in seconds) or none on timeout.??"""??icmp = socket.getprotobyname("icmp")??try:????my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)??except socket.error, (errno, msg):????if errno == 1:??????# Operation not permitted??????msg = msg + (????????" - Note that ICMP messages can only be sent from processes"????????" running as root."??????)??????raise socket.error(msg)????raise # raise the original error??my_ID = os.getpid() & 0xFFFF??send_one_ping(my_socket, dest_addr, my_ID)??delay = receive_one_ping(my_socket, my_ID, timeout)??my_socket.close()??return delaydef verbose_ping(dest_addr, timeout = 2, count = 100):??"""??Send >count< ping to >dest_addr< with the given >timeout< and display??the result.??"""??for i in xrange(count):????print "ping %s..." % dest_addr,????try:??????delay = do_one(dest_addr, timeout)????except socket.gaierror, e:??????print "failed. (socket error: '%s')" % e[1]??????break????if delay == None:??????print "failed. (timeout within %ssec.)" % timeout????else:??????delay = delay * 1000??????print "get ping in %0.4fms" % delayif __name__ == '__main__':??verbose_ping("www.163.com",2,1)
用到的模塊解析:
struct:
最近在學習python網絡編程這一塊,在寫簡單的socket通信代碼時,遇到了struct這個模塊的使用,當時不太清楚這到底有和作用,后來查閱了相關資料大概了解了,在這里做一下簡單的總結。
??? 了解c語言的人,一定會知道struct結構體在c語言中的作用,它定義了一種結構,里面包含不同類型的數據(int,char,bool等等),方便對某一結構對象進行處理。而在網絡通信當中,大多傳遞的數據是以二進制流(binary data)存在的。當傳遞字符串時,不必擔心太多的問題,而當傳遞諸如int、char之類的基本數據的時候,就需要有一種機制將某些特定的結構體類型打包成二進制流的字符串然后再網絡傳輸,而接收端也應該可以通過某種機制進行解包還原出原始的結構體數據。python中的struct模塊就提供了這樣的機制,該模塊的主要作用就是對python基本類型值與用python字符串格式表示的C struct類型間的轉化(This module performs conversions between Python values and C structs represented as Python strings.)。stuct模塊提供了很簡單的幾個函數,下面寫幾個例子。
??? struct提供用format specifier方式對數據進行打包和解包(Packing and Unpacking)。例如:
| 123456789101112 | import structimport binasciivalues = (1, 'abc', 2.7)s = struct.Struct('I3sf')packed_data = s.pack(*values)unpacked_data = s.unpack(packed_data)print 'Original values:', valuesprint 'Format string :', s.formatprint 'Uses :', s.size, 'bytes'print 'Packed Value :', binascii.hexlify(packed_data)print 'Unpacked Type :', type(unpacked_data), ' Value:', unpacked_data |
輸出:
Original values: (1, 'abc', 2.7) ?
Format string : I3sf ?
Uses : 12 bytes ?
Packed Value : 0100000061626300cdcc2c40 ?
Unpacked Type : <type 'tuple'>? Value: (1, 'abc', 2.700000047683716)
代碼中,首先定義了一個元組數據,包含int、string、float三種數據類型,然后定義了struct對象,并制定了format‘I3sf’,I 表示int,3s表示三個字符長度的字符串,f 表示 float。最后通過struct的pack和unpack進行打包和解包。通過輸出結果可以發現,value被pack之后,轉化為了一段二進制字節串,而unpack可以把該字節串再轉換回一個元組,但是值得注意的是對于float的精度發生了改變,這是由一些比如操作系統等客觀因素所決定的。打包之后的數據所占用的字節數與C語言中的struct十分相似。
select 模塊:
Python中的select模塊專注于I/O多路復用,提供了select ?poll ?epoll三個方法(其中后兩個在Linux中可用,windows僅支持select),另外也提供了kqueue方法(freeBSD系統)
select方法:
進程指定內核監聽哪些文件描述符(最多監聽1024個fd)的哪些事件,當沒有文件描述符事件發生時,進程被阻塞;當一個或者多個文件描述符事件發生時,進程被喚醒。
當我們調用select()時:
1 上下文切換轉換為內核態
2 將fd從用戶空間復制到內核空間
3 ?內核遍歷所有fd,查看其對應事件是否發生
4 ?如果沒發生,將進程阻塞,當設備驅動產生中斷或者timeout時間后,將進程喚醒,再次進行遍歷
5 返回遍歷后的fd
6 ?將fd從內核空間復制到用戶空間
fd:file descriptor 文件描述符
fd_r_list,?fd_w_list,?fd_e_list?=?.秒,之后返回三個空列表,如果監聽的句柄有變化,則直接執行。
轉載于:https://blog.51cto.com/linux1989/1925082
總結
以上是生活随笔為你收集整理的使用python 实现icmp测试主机存活性的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: springMVC3学习--ModelA
- 下一篇: iinflux数据库使用