一个基于python的即时通信程序

5月17日更新:

广播信息、用户列表、信息确认列表以及通信信息,从原来的用字符串存储改为使用字典来存储,使代码更清晰,更容易扩展,具体更改的格式如下:

广播信息(上线):
{
  'status': 信息状态标志,
  'user_info': 本机的用户名和主机名,
  'pub_key': 本机生成的公钥,
}
广播信息(下线):
{
  'status': 信息状态标志,
  'user_info': 本机的用户名和主机名,
}

用户列表的元素:
{
  'user_info': 对应用户的用户名和主机名,
  'pub_key': 对应用户的公钥,
  'addr': 用户对应的ip,
}

信息确认列表的元素:
{
  'confirm_seq': 信息序列号,
  'user': 发送信息的用户的用户名,
  'msg': 发送的信息,
  'addr': 信息的目的ip和端口,
}


通信信息:
{
  'status': 信息序列号,
  'user': 发送信息的用户的用户名,
  'msg': 发送的信息,
}

更新后的代码如下:

!/usr/bin/env python
#coding=utf-8
#author: cjyfff
#blog: http://www.cnblogs.com/cjyfff/

import socket
import os
import threading
import traceback
import rsa

user_list = []
confirm_list = []
username = os.environ['USER']
hostname = os.popen('hostname').read()
(pubkey, privkey) = rsa.newkeys(1024)
pub = pubkey.save_pkcs1()


class MyThread(threading.Thread):
    '''这个类用于创建新的线程'''

    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        apply(self.func, self.args)


def broadcast(broADDR, status):
    '''发送广播信息模块
    用于发送广播信息给其他主机,通知其他主机本主机上线下线状态,以及发送本机的信息给其他主机。
    这个模块会在广播信息前添加上status这个参数的值。在本程序中,当需要通知其他主机,本机已经上线时,
    会传递"online"给status,当需要通知其他主机本机即将下线时,会传递"offline"给status。
    '''
    global username, hostname, pub

    def broadcast_send(oMsg):
        udpSock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        udpSock2.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        udpSock2.sendto(oMsg, broADDR)
    oMsg = {}
    if status == 'online':
        oMsg = {
            'status': status,
            'user_info': ' '.join([username, hostname]),
            'pub_key': pub,
        }
        broadcast_send(str(oMsg))
    elif status == 'offline':
        oMsg = {
            'status': status,
            'user_info': ' '.join([username, hostname]),
        }
        broadcast_send(str(oMsg))


def recv_msg(localADDR, BUFSIZ, udpSock, port):
    '''信息接收模块
    这个模块的主要功能是,跟据接收到的广播信息更新用户列表,以及处理对端发送过来信息
    '''
    global user_list, confirm_list, username, hostname, pub, privkey

    while True:
        try:
            data, addr = udpSock.recvfrom(BUFSIZ)
        except:
            break
        if data:
            data = eval(data)
        addr = addr[0]
        if data['status'] == 'online':
            user_list_info = {
                'user_info': data['user_info'],
                'pub_key': data['pub_key'],
                'addr': addr,
            }
            if user_list_info not in user_list:
                user_list.append(user_list_info)
                # 把对方添加上用户列表的同时,还要把自己的信息发给对方,以便对方更新用户列表
                respond_msg = {
                    'status': 'respon_online',
                    'user_info': ' '.join([username, hostname]),
                    'pub_key': pub,
                }
                udpSock.sendto(str(respond_msg), (addr, port))

        elif data['status'] == 'offline':
            user_list_info = {
                'user_info': data['user_info'],
            }
            for i in xrange(len(user_list)):
                for k, v in user_list[i].iteritems():
                    if user_list_info['user_info'] == v:
                        del user_list[i]

        elif data['status'] == 'respon_online':
            user_list_info = {
                'user_info': data['user_info'],
                'pub_key': data['pub_key'],
                'addr': addr,
            }
            if user_list_info not in user_list:
                user_list.append(user_list_info)

        elif data['status'] == 'quit':
            print "对方已断开连接,请输入'quit'或'q'返回主菜单"
            continue

        elif data['status'] == 'local_quit':
            continue

        else:
            confirm_msg = data['status']
            # 假如收到的确认标志和确认表中的某项匹配,删除该项
            for i in xrange(len(confirm_list)):
                if confirm_list[i]['confirm_seq'] == confirm_msg:
                    del confirm_list[i]
            if not data['msg']:
                continue
            addr_list = []
            for x in user_list:
                # 提取出用户表中所有用户的地址,存到addr_list中:
                addr_list.append(x['addr'])

            # 检查发送信息的用户的地址是否在用户列表当中:
            if addr in addr_list:
                # 反馈收到确认信息给对方:
                confirm_res = {'status': confirm_msg, 'msg': 0}
                udpSock.sendto(str(confirm_res), (addr, port))
                # 打印信息:
                data_user = data['user']
                try:
                    data_msg = rsa.decrypt((data['msg']), privkey)
                except DecryptionError:
                    print "解码出现异常,请重新连接"
                    continue
                print data_user, ":", data_msg


def print_userlist():
    '''打印用户列表模块'''
    global user_list
    user_list_len = len(user_list)
    print "当前有%d个用户在线:" % user_list_len
    for i in xrange(user_list_len):
        print "ID:", i+1, ":", user_list[i]['user_info'].strip('
'), 
            "come from:", user_list[i]['addr']


def send_msg(udpSock, cli_addr, cli_pub_key, port):
    '''信息发送模块'''
    import random
    global username, confirm_list
    quit_list = ['q', 'quit', 'exit']
    cli_pub_key_rip = rsa.PublicKey.load_pkcs1(cli_pub_key)

    while True:
        msg = raw_input("> ")
        if msg in quit_list:
            # quit_msg_to_local用于通知本机对话结束,回收socket
            quit_msg_to_local = {'status': 'local_quit'}
            quit_msg_to_cli = {'status': 'quit'}
            udpSock.sendto(str(quit_msg_to_local), ('localhost', port))
            udpSock.sendto(str(quit_msg_to_cli), cli_addr)
            break

        random_num = random.randint(0, 1000)
        msg = rsa.encrypt(msg, cli_pub_key_rip)
        output_msg = {
            'status': str(random_num),
            'user': username,
            'msg': msg,
        }
        confirm_list_member = {
            'confirm_seq': str(random_num),
            'user': username,
            'msg': msg,
            'addr': cli_addr,
        }
        confirm_list.append(confirm_list_member)

        udpSock.sendto(str(output_msg), cli_addr)


def confirm_successd(udpSock):
    '''确认信息到达模块
    采用类似于最久未使用(LRU)算法,每隔5秒钟检查一下信息确认列表(confirm_list),当信息确认列表长度大于5时(
    也就是说未确认接收的信息大于5),把信息确认列表前一半的信息再一次发送。
    '''
    import time
    global confirm_list

    while True:
        confirm_list_len = len(confirm_list)
        if confirm_list_len > 5:
            for i in xrange(confirm_list_len/2):
                repeat_output_msg = {
                    'status': confirm_list[i]['confirm_seq'],
                    'user': confirm_list[i]['user'],
                    'msg': confirm_list[i]['msg'],
                }
                #msg = confirm_list[i][0]
                addr = confirm_list[i]['addr']
                udpSock.sendto(str(repeat_output_msg), addr)
            time.sleep(5)
        else:
            time.sleep(5)


def option(udpSock, BUFSIZ, broADDR, port):
    '''选项菜单模块'''
    while True:
        print '''
        请输入您的选项:
        1 显示用户列表
        2 连接到指定用户,并开始对话
        3 退出
        '''
        action = raw_input("> ")
        if action is '1':
            print_userlist()

        elif action is '2':
            client_option = raw_input("您想连接到哪个用户?,请输入对应的id号:
")
            try:
                # 获取对端的地址
                cli_addr = (user_list[int(client_option)-1]['addr'], port)
                cli_pub_key = user_list[int(client_option)-1]['pub_key']
            except IndexError:
                print "没有这个用户,请重新选择:"
                continue
            print "已建立好连接,可以开始对话,输入quit或q可以结束会话"
            threads = []
            t2 = MyThread(send_msg, (udpSock, cli_addr, cli_pub_key, port), send_msg.__name__)
            threads.append(t2)
            t3 = MyThread(confirm_successd, (udpSock, ), confirm_successd.__name__)
            threads.append(t3)
            for t in threads:
                t.setDaemon(True)
                t.start()
            t2.join()#send_msg中止之前,让父线程一直在阻塞状态
            print "连接中断,返回主菜单"

        elif action is '3':
            broadcast(broADDR, 'offline')
            udpSock.close()
            print "再见!"
            break

        else:
            pass


def main():
    '''主函数'''
    host = ''
    port = 2425
    broADDR = ('<broadcast>', port)
    localADDR = (host, port)
    BUFSIZ = 1024
    try:
        broadcast(broADDR, 'online')
        udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        udpSock.bind(localADDR)
        t1 = MyThread(recv_msg, (localADDR, BUFSIZ, udpSock, port, ),
                       recv_msg.__name__)
        t1.setDaemon(True)
        t1.start()
        option(udpSock, BUFSIZ, broADDR, port)
    except (KeyboardInterrupt, SystemError):
        udpSock.close()
        raise
    except:
        traceback.print_exc

if __name__ == '__main__':
    main()
cjyfffIM_v0.3

4月23日更新:

已实现RSA加密功能

4月18日:

额。。。本来想用弄一个类似于“飞鸽传书”那样的软件的,目前已经实现了一部分功能了,还有一部分功能没有实现,暂时把这篇文章当作是开发文档,以后添加了新功能后再修改这篇文章吧。有什么错漏的地方请各位大牛提出哈。

目前已经实现了的功能:

1 自动发现局域网内也运行了本程序的机器,把该机器添加到客户列表,假如对方下线的话,自动在客户列表中删除对应的信息。

2 具备确认机制,对方收到我方发送的信息后会反馈回来确认信息,没有收到确认的信息将会在一段时间之后重新发送。

3 信息采用RSA加密

待实现的功能:

1 实现文件传输功能

2 优化代码,使代码的可读性增强 

程序各个模块的简单逻辑关系如下图:

 一个基于python的即时通信程序

各个模块功能表述:

一、选项菜单模块(option)

这个模块有3个选项,分别是:

选项1 打印用户列表。通过调用print_userlist()函数把当前用户列表中的用户打印出来。

选项2 与指定的用户建立连接。根据用户输入的id号,与用户列表中的指定用户建立连接。

选项3 退出程序。在退出前首先会调用发送广播信息模块(broadcast),向局域网广播一条信息通知本机即将下线,然后关闭socket,最后再退出程序。

二、发送广播信息模块(broadcast)

这个模块的作用是在程序启动(退出)时,向局域网内的其他机器发送广播,通知其他机器在各自的用户列表中添加(删除)此用户。

假设本机的用户名是Mike,主机名是Mike‘PC

本机上线的广播信息将是:online^Mike Mike’PC‘^Mike’PC的rsa公钥

本机下线的广播信息将是:offline^Mike Mike’PC’

三、信息发送模块(send_msg)

这个模块运行在一个循环当中,不断的处理用户的输入。

假如用户输入退出指令('q', 'quit', 'exit'),这时候这个模块首先向本机发送一个“local^quit”信息,让本机的信息接收模块(recv_msg)停止接收数据,同时发送一个“quit”给对方,通知对方连接即将中断,然后退出循环,让程序回到选项菜单模块(option)。

假如用户输入的不是退出指令,那么就认为用户将要发送的是正常信息。这里要提一下这个程序中确认机制的实现原理:本机在发送一个消息出去的时候,会在消息的头部加上一个(0~9999)的随机数作为确认标记,同时把这个消息添加到信息确认列表(confirm_list)。对端收到这条消息后,会把确认标记发送回来,然后本机就会根据所接收到的确认标记删除信息确认列表(confirm_list)所对应的条目,这样就认为一条消息对方已经成功接收。

回到具体实现的过程,这个模块会在输入的信息之前加上一个(0~9999)的随机数作为标记,同时加上用户名。例如本机Mike用户向对端一个ip地址为192.168.1.10的用户发送一个“Hello”,那么经这个模块发送出去的信息可能是这样:“1255^Mike^Hello”。同时这个模块会在信息确认列表(confirm_list)中添加上“[1255^Mike^Hello,192.168.1.10]”这样的一条记录。

四、信息接收模块(recv_msg)

这个模块的主要功能是,跟据接收到的广播信息更新用户列表(confirm_list),以及处理对端发送过来信息。

假如收到以“online”开头的信息,这个模块会认为这是对端发送过来的通知上线的广播信息,于是便会在信息中提取出用户名以及主机名,再加上对端的ip地址和端口,添加到用户列表中。并且以一条以“respon_online”开头的信息反馈给对方本机的信息,以便对方也可以更新用户列表。例如收到从192.168.1.11发送过来的一条“online^Kate Kate'PC'^Kate'PC'的rsa公钥”这样一条广播信息后,本机将在用户列表中添加上“[['Kate Kate'PC', Kate'PC'的rsa公钥], ('192.168.1.11', 12345)]”(这个端口号是随机分配的),同时本机返回一条这样的信息给对方:respon_online^'Mike Mike'PC'^Mike'PC'的rsa公钥。

假如是本机收到以“respon_online”开头的信息的话,那就跟上面“online”的情况一样,提取出用户名、主机名、ip地址和端口,添加到用户列表(confirm_list)上。

假如收到的是以“offline”开头的信息,就提取出用户名、主机名、ip地址和端口,检查用户列表(confirm_list)中有没有对应的条目,假如有的话就删除掉对应的条目。

假如收到的是“quit”信息,说明对端即将断开连接,这个时候本模块将提示用户输入退出命令,以便退出连接。

假如收到的是“local^quit”信息,说明本机即将断开连接,这个时候本模块将返回模块的开头,准备接收新的信息。

假如接收到的信息不满足以上的条件,就会被认为是用户间发送的正常消息:

首先要提取消息头部的确认标志。如果收到的信息除了确认标志外没有其他内容了,那么这条消息会被认为是对端在收到本机发送出去的信息后,反馈回来的确认信息,因此接下来的工作就是根据确认标志,查找信息确认列表(confirm_list)所对应的条目并删除。

假如处理确认标志外还有其他内容,那么这条信息就是对端用户所输入的信息,于是首先提取出确认标志返回给对端,然后再本机上打印出对方所输入的内容。

五、确认信息到达模块(confirm_successd)

 这个模块采用类似于最久未使用(LRU)算法,每隔5秒钟检查一下信息确认列表(confirm_list),当信息确认列表长度大于5时(也就是说未确认接收的信息大于5),把信息确认列表前一半的信息再一次发送。

最后是这个程序的代码:

#! /usr/bin/env python
#coding=utf-8
#author: cjyfff
#blog: http://www.cnblogs.com/cjyfff/

import socket
import os
import pwd
import threading
import traceback
import random
import time
import rsa

user_list = []
confirm_list = []
username = pwd.getpwuid(os.getuid())[0]
hostname = os.popen('hostname').read()
(pubkey, privkey) = rsa.newkeys(1024)
pub = pubkey.save_pkcs1()


class MyThread(threading.Thread):
    '''这个类用于创建新的线程'''

    def __init__(self, func, args, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args

    def run(self):
        apply(self.func, self.args)


def broadcast(broADDR, status):
    '''发送广播信息模块
    用于发送广播信息给其他主机,通知其他主机本主机上线下线状态,以及发送本机的信息给其他主机。
    这个模块会在广播信息前添加上status这个参数的值。在本程序中,当需要通知其他主机,本机已经上线时,
    会传递"online"给status,当需要通知其他主机本机即将下线时,会传递"offline"给status。
    '''
    global username, hostname, pub

    def broadcast_send(oMsg):
        udpSock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        udpSock2.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        udpSock2.sendto(oMsg, broADDR)

    if status == 'online':
        oMsg = status + "^" + username + ' ' + hostname + "^" + pub
        broadcast_send(oMsg)
    elif status == 'offline':
        oMsg = status + "^" + username + ' ' + hostname
        broadcast_send(oMsg)


def recv_msg(localADDR, BUFSIZ, udpSock, port):
    '''信息接收模块
    这个模块的主要功能是,跟据接收到的广播信息更新用户列表,以及处理对端发送过来信息
    '''
    global user_list, confirm_list, username, hostname, pub, privkey

    while True:
        try:
            data, addr = udpSock.recvfrom(BUFSIZ)
        except:
            break

        if data.startswith('online'):
            data = data.split('^')[1:]
            if [data, addr] not in user_list:
                user_list.append([data, addr])
                # 把对方添加上用户列表的同时,还要把自己的信息发给对方,以便对方把更新用户列表
                res_msg = 'respon_online^' + username + ' ' + hostname + "^" + pub
                udpSock.sendto(res_msg, (addr[0], port))

        elif data.startswith('offline'):
            data = data.split('^')[1]
            for i in xrange(len(user_list)):
                if user_list[i][0][0] == data:
                    del user_list[i]

        elif data.startswith('respon_online'):
            data = data.split('^')[1:]
            if [data, addr] not in user_list:
                user_list.append([data, addr])

        elif data == 'quit':
            print "对方已断开连接,请输入'quit'或'q'返回主菜单"
            continue

        elif data == 'local^quit':
            continue

        else:
            confirm_recv = data.split('^')[0]
            # 假如收到的确认标志和确认表中的某项匹配,删除该项
            for i in xrange(len(confirm_list)):
                if confirm_list[i][0].split('^')[0] == confirm_recv:
                    del confirm_list[i]
            data = data.split('^')[1:]
            if not data:
                continue
            addr_list = []
            for x in user_list:
                # 提取出用户表中所有用户的地址,存到addr_list中:
                addr_list.append(x[1][0])
            addr = addr[0]
            # 检查发送信息的用户的地址是否在用户列表当中:
            if addr in addr_list:
                # 反馈收到确认信息给对方:
                udpSock.sendto(str(confirm_recv), (addr, port))
                # 打印信息:
                data_name = data[0]
                data_msg = rsa.decrypt((data[1]), privkey)
                print data_name, ":", data_msg


def print_userlist():
    '''打印用户列表模块'''
    global user_list
    print "当前有%d个用户在线:" % len(user_list)
    for i in xrange(len(user_list)):
        print "ID: ", i+1, ":", user_list[i][0][0]


def send_msg(udpSock, cli_addr, cli_pub, port):
    '''信息发送模块'''
    global username, user_list, confirm_list
    quit_list = ['q', 'quit', 'exit']
    cli_pubkey = rsa.PublicKey.load_pkcs1(cli_pub)

    while True:
        msg = raw_input("> ")
        if msg in quit_list:
            udpSock.sendto('local^quit', ('localhost', port))
            udpSock.sendto('quit', cli_addr)
            break

        random_num = random.randint(0, 1000)
        msg = rsa.encrypt(msg, cli_pubkey)
        out_msg = '%s' % random_num + '^' + username + '^' + msg
        confirm_list.append([out_msg, cli_addr])
        udpSock.sendto(out_msg, cli_addr)


def confirm_successd(udpSock):
    '''确认信息到达模块
    采用类似于最久未使用(LRU)算法,每隔5秒钟检查一下信息确认列表(confirm_list),当信息确认列表长度大于5时(
    也就是说未确认接收的信息大于5),把信息确认列表前一半的信息再一次发送。
    '''
    global confirm_list

    while True:
        lenght = len(confirm_list)
        if lenght > 5:
            for i in xrange(lenght/2):
                msg = confirm_list[i][0]
                addr = confirm_list[i][1]
                udpSock.sendto(msg, addr)
            time.sleep(5)
        else:
            time.sleep(5)


def option(udpSock, BUFSIZ, broADDR, port):
    '''选项菜单模块'''
    while True:
        print '''
        输入您的选项:
显示用户列表
连接到指定用户,并开始对话
退出
        '''
        action = raw_input("> ")
        if action is '1':
            print_userlist()

        elif action is '2':
            client_id = raw_input("您想连接到哪个用户?,请输入对应的id号:
")
            try:
                # 获取对端的地址
                cli_addr = (user_list[int(client_id)-1][1][0], port)
                cli_pub = user_list[int(client_id)-1][0][1]
            except IndexError:
                print "没有这个用户,请重新选择:"
                continue
            print "已建立好连接,可以开始对话"
            threads = []
            t2 = MyThread(send_msg, (udpSock, cli_addr, cli_pub, port), send_msg.__name__)
            threads.append(t2)
            t3 = MyThread(confirm_successd, (udpSock, ), confirm_successd.__name__)
            threads.append(t3)
            for t in threads:
                t.setDaemon(True)
                t.start()
            t2.join()#send_msg中止之前,让父线程一直在阻塞状态

            print "连接中断,返回主菜单"

        elif action is '3':
            broadcast(broADDR, 'offline')
            udpSock.close()
            print "再见!"
            break

        else:
            pass


def main():
    '''主函数'''
    host = ''
    port = 2425
    broADDR = ('<broadcast>', port)
    localADDR = (host, port)
    BUFSIZ = 1024
    try:
        broadcast(broADDR, 'online')
        udpSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        udpSock.bind(localADDR)
        t1 = MyThread(recv_msg, (localADDR, BUFSIZ, udpSock, port, ),
                       recv_msg.__name__)
        t1.setDaemon(True)
        t1.start()
        option(udpSock, BUFSIZ, broADDR, port)
    except (KeyboardInterrupt, SystemError):
        udpSock.close()
        raise
    except:
        traceback.print_exc

if __name__ == '__main__':
    main()
cjyfffIM_v0.1