Newer
Older
from ldap3 import Server,Connection,HASHED_MD5,MODIFY_REPLACE,ServerPool
from ldap3.utils.hashed import hashed
from re import match,findall
from requests import post
class ldap_control:
def __init__(self,LDAP_CONFIG):
"""
LDAP_CONFIG ={ "port" : "389",
"host":"example.com",
"admin_dn" :"cn=admin,dc=example,dc=com",
"admin_passwd" : "tttttttt",
"user_dn" : "dc=example,dc=com"}
"""
self.user_dn,self.admin_dn,self.admin_passwd,self.host,self.port = LDAP_CONFIG["user_dn"],LDAP_CONFIG["admin_dn"],LDAP_CONFIG["admin_passwd"],LDAP_CONFIG["host"],LDAP_CONFIG["port"]
self.s = Server(host = self.host,port = self.port)
self.c = Connection(self.s,user = self.admin_dn,password = self.admin_passwd,auto_bind=True)
print(self.s.check_availability())
def keep(self):
if (self.s.check_availability() != True):
self.s = Server(host = self.host,port = self.port)
self.c = Connection(self.s,user = self.admin_dn,password = self.admin_passwd,auto_bind=True)
def result(self):
print(self.c.result)
def name_to_mail(self,name):
self.c.search(self.user_dn,search_filter=f'(cn={name})')
s = str(self.c.entries)
mail = findall('(?<=mail\=).*?(?=,)',s)
print(mail)
return mail[0]
def mail_to_username(self,mail):
#检查邮箱
self.c.search(self.user_dn,search_filter=f"(mail={mail})")
s = str(self.c.entries)
name = findall('(?<=\=).*?(?=,)',s)
print(name[0])
return name[0]
def select_email_or_name(self,str):
"""
判断 输入为用户名还是邮箱
:param str: 用户输入
:return: ldap 查询用数据
"""
if '@' in str :
result = f'mail={str}'
else:
result = f'cn={str}'
return result
def check_in(self,username,email):
username = self.select_email_or_name(username)
email = self.select_email_or_name(email)
self.c.search(self.user_dn,f'({username})')
if (self.c.entries == []):
self.c.search(self.user_dn,f'({email})')
if (self.c.entries == []):
return 1
def add_user(self,username,email,description,passwd):
"""
:param username:
:param email:
:param description:
:return: 1,成功 0,失败
"""
self.keep()
if self.check_in(username,email) == 0:
return 0
att ={"cn":f'{username}',
"Mail":f"{email}",
"sn":f"{description}",
"userPassword":""
}
#try:
self.c.add(dn = f'cn={username},{self.user_dn}', object_class=['inetOrgPerson', 'top'], attributes=att)
return 1
#except:
#return 0
"""
通过用户名 (CN) 或邮箱(mail)重置密码
:param username:
:param newpasswd:
:return: 1 成功 0 失败
"""
hashed_password = hashed(HASHED_MD5, newpasswd)
changes = {
'userPassword': [(MODIFY_REPLACE, [hashed_password])]
}
#try:
self.c.modify(user_dn, changes=changes)
print(self.c.result)
return 1
#except:
#return 0
class User :
def __init__(self,user_info):
self.name = user_info['user_name']
self.mail = user_info['email']
self.passwd = user_info['password']
self.description = user_info['description']
def check_user(self):
if self.name.isalnum() :
if match(r"^[A-Za-z0-9\.\+_-]+@[A-Za-z0-9\._-]+\.[a-zA-Z]*$", self.mail):
return 1
else:
return 0
else:
return 0