from ldap3 import Server,Connection,HASHED_MD5,MODIFY_REPLACE,ServerPool from ldap3.utils.hashed import hashed from re import match,findall from requests import post def send_mail(to_aadr,content): url = 'http://new.lmzdx.me:8000/send_mail/' data = {'to':to_aadr,'content':f'{content}'} post(url,data) class ldap_control: def __init__(self,LDAP_CONFIG): """ LDAP_CONFIG ={ "port" : "389", "host":"example.com", "admin_dn" :"cn=admin,dc=example,dc=com", "admin_passwd" : "tttttttt", "user_dn" : "dc=example,dc=com"} """ self.user_dn,self.admin_dn,self.admin_passwd,self.host,self.port = LDAP_CONFIG["user_dn"],LDAP_CONFIG["admin_dn"],LDAP_CONFIG["admin_passwd"],LDAP_CONFIG["host"],LDAP_CONFIG["port"] self.s = Server(host = self.host,port = self.port) self.c = Connection(self.s,user = self.admin_dn,password = self.admin_passwd,auto_bind=True) print(self.s.check_availability()) def keep(self): if (self.s.check_availability() != True): self.s = Server(host = self.host,port = self.port) self.c = Connection(self.s,user = self.admin_dn,password = self.admin_passwd,auto_bind=True) def result(self): print(self.c.result) def name_to_mail(self,name): self.c.search(self.user_dn,search_filter=f'(cn={name})') s = str(self.c.entries) mail = findall('(?<=mail\=).*?(?=,)',s) print(mail) return mail[0] def mail_to_username(self,mail): #检查邮箱 self.c.search(self.user_dn,search_filter=f"(mail={mail})") s = str(self.c.entries) name = findall('(?<=\=).*?(?=,)',s) print(name[0]) return name[0] def select_email_or_name(self,str): """ 判断 输入为用户名还是邮箱 :param str: 用户输入 :return: ldap 查询用数据 """ if '@' in str : result = f'mail={str}' else: result = f'cn={str}' return result def check_in(self,username,email): username = self.select_email_or_name(username) email = self.select_email_or_name(email) self.c.search(self.user_dn,f'({username})') if (self.c.entries == []): self.c.search(self.user_dn,f'({email})') if (self.c.entries == []): return 1 else: return 0 def add_user(self,username,email,description,passwd): """ :param username: :param email: :param description: :return: 1,成功 0,失败 """ self.keep() if self.check_in(username,email) == 0: return 0 att ={"cn":f'{username}', "Mail":f"{email}", "sn":f"{description}", "userPassword":"" } #try: self.c.add(dn = f'cn={username},{self.user_dn}', object_class=['inetOrgPerson', 'top'], attributes=att) self.reset_passwd(username,passwd) return 1 #except: #return 0 def reset_passwd(self,username,newpasswd): """ 通过用户名 (CN) 或邮箱(mail)重置密码 :param username: :param newpasswd: :return: 1 成功 0 失败 """ self.keep() print(f'username:{username}') user_dn = f'cn={username},{self.user_dn}' hashed_password = hashed(HASHED_MD5, newpasswd) changes = { 'userPassword': [(MODIFY_REPLACE, [hashed_password])] } #try: self.c.modify(user_dn, changes=changes) print(self.c.result) return 1 #except: #return 0 class User : def __init__(self,user_info): self.name = user_info['user_name'] self.mail = user_info['email'] if self.check_user() == 0: return(0) self.passwd = user_info['password'] self.description = user_info['description'] def check_user(self): if self.name.isalnum() : if match(r"^[A-Za-z0-9\.\+_-]+@[A-Za-z0-9\._-]+\.[a-zA-Z]*$", self.mail): return 1 else: return 0 else: return 0 def creat_ldap_account(self,ldap_control): ldap_control.add_user(self.name,self.mail,self.description,self.passwd)