Send email from other address restriction check (SOLVED)

kopano

#1

Hello, it’s complex problem.
1:
by default Postfix from Univention allow to send email from every address for authenticated users.
It could be solved by
ucr set mail /postfix/smtpd/restrictions/recipient/20 = "reject_authenticated_sender_login_mismatch"

2:
but this is not a good solution if you use Kopano and you use smtp/imap configured mail clients.

Kopano adding additional custom schemas to ldap such as k4uUserSendAsPrivilege that allow user to send email as other user. It’s works only if you use web interface. Postfix has no checking scripts for this.

So if you implemented “reject_authenticated_sender_login_mismatch” to close the bug , you will not able to use "UserSendAsPrivilege " by smtp.

  1. There is univention script listfilter.py that checks if users allowed to send email to group. it works as filter for

smtpd_recipient_restrictions = check_policy_service unix:private/listfilter,

  1. Is there a solution for k4uUserSendAsPrivilege or i should write same script for it?

#2

Crosspost: https://forum.kopano.io/topic/1964/k4uusersendasprivilege-checking-for-postfix/

Edit: if you decide to write your own filter script you could consider to integrate it with the integration package: https://wiki.z-hub.io/display/K4U/Contributor+Guidelines


#5

i quick rewrited existing script , it’s works in test, but i need to google how to insert realuser and fake user in postfix pipe.

i will bakl tommorow


#7

Ok.

solved with new script.

update 1.01.2019 fix error, script search now mailPrimaryAddress from authenticated user
next step- aliases

Problems:

  1. Kopano web user are not sasl authenticated.

Due to this we should set these restrictions to postfix:

smtpd_recipient_restrictions = check_policy_service unix:private/listfilter,  # check maillist resctiction
        permit_mynetworks,  #allow web kopano send  no sasl authenticated emails.  Kopano has self check "sendas" restriction mechanism. 
	check_policy_service unix:private/testfilter,  # here our new filter !!
        permit_sasl_authenticated, 
        reject_unauth_destination,
        reject_unlisted_recipient
  1. In this position our script will check ALL emails , for example it will check incoming emails from externel mail server. And we have 2 options.
    a). don’t check emails from external mail servers and getting mailforget SPAM
    b) check external mail server too, but only if they trying to send from our email address.

i prefer b) , but you can switch to a) by pass_not_authenticated = True in script.

  1. i don’t know how kopano generate this file /etc/listfilter.secret, it generated by listfilter option, but if you use listfilter you don’t need care about it.

postfix configuration:

master.cf.local)

send_as_filter     unix  -       n       n       -       30       spawn user=listfilter argv=/usr/share/univention-mail-postfix/send_as_filter.py
    -b dc=your_ldap_domain,dc=local 

ucr set  mail/postfix/smtpd/restrictions/recipient/20="check_policy_service unix:private/send_as_filter"
univention-config-registry commit /etc/postfix/main.cf
univention-config-registry commit /etc/postfix/master.cf
service postfix reload

/usr/share/univention-mail-postfix/send_as_filter.py

#!/usr/bin/python2.7 -u
#



import univention.uldap
import optparse
import sys
import re
import traceback
import syslog
from ldap.filter import filter_format
from univention.config_registry import ConfigRegistry
import univention.admin.modules

usage = "help"
parser = optparse.OptionParser(usage=usage)
parser.add_option("-b", "--ldap_base", dest="ldap_base", help="ldap base")
parser.add_option("-s", "--sender", dest="sender", help="sender address (for use with -t)")
parser.add_option("-t", "--test", dest="test", help="test run", action="store_true", default=False)
options, args = parser.parse_args()

syslog.openlog(ident="kopano_send_as_filter", logoption=syslog.LOG_PID, facility=syslog.LOG_MAIL)
ucr = ConfigRegistry()
ucr.load()
univention.admin.modules.update()
usersmod = univention.admin.modules.get("users/user")
_do_debug = True

# should we pass mails from externel mail servers without checking?  If this case we will have potential mailforged SPAM to us.
pass_not_authenticated = True


def debug(msg, *args):
	if _do_debug:
		msg = "kopano_send_as_filter: {}".format(msg % args)
		if options.test:
			sys.stderr.write("{}\n".format(msg))
		else:
			syslog.syslog(syslog.LOG_DEBUG, msg)


def listfilter(attrib):
	sender = attrib.get("sasl_username", None)
	send_as = attrib.get("sender", None)

	debug("sender=%r send_as=%r", sender, send_as)
	debug("attrib=%r", attrib)


	#if sender == "relay":
	#    return "DUNNO local relay server"
	    

	if pass_not_authenticated:
		if not sender:
			return "DUNNO not sasl_authenticated. pass_not_authenticated = True option is enabled"

	if not options.ldap_base:
		return "443 LDAP base not set."
	elif not send_as:
		return "REJECT Access denied for empty sender."
	else:
		try:
			ldap = univention.uldap.getMachineConnection(ldap_master=False, secret_file="/etc/listfilter.secret")

			user_dn = ""
			users_groups = []
			allowed_user_dns = []

			# try the ldap stuff, if that fails send email anyway
			# get send_as restriction
			ldap_attr = ["kopanoSendAsPrivilege"]
			ldap_filter = filter_format(
				'(&(mailPrimaryAddress=%s)(|(objectclass=kopano-user)))',
				(send_as,))
			result = ldap.search(base=options.ldap_base, filter=ldap_filter, attr=ldap_attr)

			if result:
				debug("sender=%r", sender)
				user_filter = filter_format(
                                        '(uid=%s)',
                                        (sender,)
                                )

				primarymail_ldap_filter = usersmod.lookup_filter(user_filter)
				primarymail = ldap.search(base=options.ldap_base, filter=str(primarymail_ldap_filter), attr=["mailPrimaryAddress"])
				#primarymail = ldap.search(base=options.ldap_base, filter=str(ldap_filter), attr=["mailPrimaryAddress"])
				primarymail = primarymail[0][1].values()[0][0]
				if primarymail == send_as:
				    debug("mail=%r, sender=%r", primarymail, send_as)
				    return "DUNNO user is mail owner"
                                debug("mail=%r, sender=%r", primarymail, send_as)
				
				# get allowed user dns
				for u in result[0][1].get("kopanoSendAsPrivilege", []):
					allowed_user_dns.append(u)

				# check if there are restrictions, check sender first
				if allowed_user_dns:
					debug("allowed_user_dns=%r", allowed_user_dns)
					if not sender:
						return "REJECT Access denied for not authenticated sender to send as user %s" % send_as 

					# get dn of sender
					user_filter = filter_format(
						'(uid=%s)',
						(sender,)
					)
					ldap_filter = usersmod.lookup_filter(user_filter)
					user_result = ldap.search(base=options.ldap_base, filter=str(ldap_filter), attr=["uidNumber"])
					if user_result:
						user_dn = user_result[0][1].values()[0][0]
						debug("user_dn=%r", user_dn)

						# check useruidNiumber in kopanoSendAsPrivilege
						if allowed_user_dns and \
								user_dn and \
								user_dn in allowed_user_dns:
							return "DUNNO allowed per user uidNumber"

					return "REJECT Access denied for %s to send as user  %s" % (sender, send_as)
				else:
					return "REJECT Access denied for %s to send as user  %s" % (sender, send_as)
			else:
				return "DUNNO %s is not our email address" % send_as
		except Exception:
			return "WARN Error with sender={} send_as={} attrib={}, traceback={}".format(
				sender, send_as, attrib, traceback.format_exc().replace("\n", " "))

# main
attr = {}

# testing
if options.test:
	_do_debug = True
	if not options.sender or not options.send_as:
		sys.stderr.write("sender and send_as are required\n")
		parser.print_help()
		sys.exit(1)
	attr["sender"] = options.sender
	attr["send_as"] = options.send_as
	action = listfilter(attr)
	print("action={}\n".format(action))
else:
	# read from stdin python -u is required for unbufferd streams
	while True:
		data = sys.stdin.readline()
		m = re.match(r'([^=]+)=(.*)\n', data)
		if m:
			attr[m.group(1).strip()] = m.group(2).strip()

		elif data == "\n":
			if attr.get("request", None) == "smtpd_access_policy":
				action = listfilter(attr)
				debug("action=%s", action)
				sys.stdout.write("action=%s\n\n" % action)
			else:
				sys.stderr.write("unknown action in %s\n" % attr)
				sys.stdout.write("defer_if_permit Service temporarily unavailable\n")
				syslog.syslog(syslog.LOG_ERR, "unknown action in '{}', exiting.".format(attr))
				sys.exit(1)
			attr = {}
		elif data == "":
			# Postfix telling us to shutdown (max_idle).
			debug("shutting down (max_idle)")
			sys.exit(0)
		else:
			syslog.syslog(syslog.LOG_ERR, "received bad data: '{}', exiting.".format(data))
			sys.exit(1)

#8

https://jira.z-hub.io/projects/KUCS/issues/KUCS-40


#9

This looks great! FYI, there is a also a feature request for UCS: http://forge.univention.org/bugzilla/show_bug.cgi?id=40609


#10

script updated. bug fix


#11

Many bugs fixed.

Now it works so:

  1. check if UCS own domain - pass if not.
  2. check if send_as mail exist in ldap in and get uid , check if sasl_uid = send_as uid pass if yes. ( this is for primary mails, aliases etc)
  3. check send_as mail contain kopanoSendAsPrivileg and pass if yes.
  4. deny
#!/usr/bin/python2.7 -u
#
# by Konstantin Frank


#Config
#turn debug on
#ucr set mail/postfix/policy/send_as_filter/debug=True
#optional 
#ucr set mail/send_from_any_email_users=relay

import univention.uldap
import optparse
import sys
import re
import traceback
import syslog
from ldap.filter import filter_format
from univention.config_registry import ConfigRegistry
import univention.admin.modules

usage = "help"
parser = optparse.OptionParser(usage=usage)
parser.add_option("-b", "--ldap_base", dest="ldap_base", help="ldap base")
parser.add_option("-s", "--sender", dest="send_as", help="send_as address (for use with -t)")
parser.add_option("-u", "--user", dest="sasl_username", help="sasl username (for use with -t)")
parser.add_option("-t", "--test", dest="test", help="test run", action="store_true", default=False)
options, args = parser.parse_args()

syslog.openlog(ident="kopano_send_as_filter", logoption=syslog.LOG_PID, facility=syslog.LOG_MAIL)
ucr = ConfigRegistry()
ucr.load()
univention.admin.modules.update()
usersmod = univention.admin.modules.get("users/user")
#_do_debug = ucr.is_true("mail/postfix/policy/send_as_filter/debug", False)
_do_debug = True
# should we pass mails from external mail servers without checking?  If this case we will have potential mailforged SPAM to us.
pass_not_authenticated = ucr.is_true("mail/postfix/policy/send_as_filter/pass_not_authenticated", False)




def debug(msg, *args):
	if _do_debug:
		msg = "kopano_send_as_filter: {}".format(msg % args)
		if options.test:
			sys.stderr.write("{}\n".format(msg))
		else:
			syslog.syslog(syslog.LOG_DEBUG, msg)


def send_as_filter(attrib):

	sasl_username = attrib.get("sasl_username", None)
        send_as = attrib.get("sender", None)

	
	if sasl_username:
	    debug("sasl=%r send_as=%r", sasl_username, send_as)
	    debug("attrib=%r", attrib)

	
	#pass not sasl_authenticated?
	if pass_not_authenticated:
                if not sasl_username:
                        return "DUNNO not sasl_authenticated. pass_not_authenticated = True option is enabled"
	
	#####################################################################
	# PASS if no sasl auth send_as email is not from out mail domains.
	#####################################################################
	
	
	ucs_mail_domains = ucr.get("mail/hosteddomains").split(' ')
	sender_domain = send_as.split('@')[1]
	if ucs_mail_domains and not sasl_username and not send_as.split('@')[1] in ucs_mail_domains:
	    return "DUNNO %s not our domain" % sender_domain
	elif ucs_mail_domains and sasl_username and not send_as.split('@')[1] in ucs_mail_domains:
	    return "REJECT not allowed for authenticated user to send from not our domain"
	elif not ucs_mail_domains:
	    return "DUNNO Error: cannot get ucs domain list"

	#####################################################################
	#special exeption for users who can send from any email , use "ucr set mail/send_from_any_email_users=username1 localrelay2 username3 etc4"
	#####################################################################
	send_from_any_email_users = ucr.get("mail/send_from_any_email_users").split(' ')
	
	if sasl_username and sasl_username in send_from_any_email_users:
	    return "DUNNO local relay server"
	#####################################################################


	if not options.ldap_base:
		return "443 LDAP base not set."
	elif not send_as:
		return "REJECT Access denied for empty send_as."
	else:
		try:
			ldap = univention.uldap.getMachineConnection(ldap_master=False, secret_file="/etc/listfilter.secret")

			user_dn = ""
			users_groups = []
			allowed_user_dns = []
			alternative_user_address = []




			# try the ldap stuff, if that fails send email anyway
			# get send_as restriction
			ldap_attr = ["kopanoSendAsPrivilege","mailPrimaryAddress"]
			ldap_filter = filter_format(
				'(|(mailPrimaryAddress=%s)(mailAlternativeAddress=%s)(mail=%s))',
                                (send_as, send_as, send_as))
			result = ldap.search(base=options.ldap_base, filter=ldap_filter, attr=ldap_attr)



			if result:
    
				######################################################################
				#check if sasl_uid == send_as uid, if yes = pass.
				######################################################################


				send_as_uid = result[0][0].split(',')[0].split('=')[1]
				debug("send_as_uid=%r", send_as_uid)
				debug("sasl_username=%r", sasl_username)
				
				if send_as_uid:
				    if not sasl_username:
                                                return "REJECT Access denied for not authenticated sasl_username to send as %s" % send_as
				    if send_as_uid == sasl_username:
                            		debug("send_as_uid=%r, sasl_username=%r", send_as_uid, sasl_username)
                            		return "DUNNO user is mail owner"



				######################################################################
                                #check if send_as_uid has kopanoSendAsPrivileg and compare kopanoSendAsPrivileg list with sasl_user uid
                                ######################################################################

				
				# get allowed user dns
				for u in result[0][1].get("kopanoSendAsPrivilege", []):
				    allowed_user_dns.append(u)

				# check if there are restrictions, check sasl_username first
				if allowed_user_dns:
					debug("allowed_user_dns=%r", allowed_user_dns)
					if not sasl_username:
						return "REJECT Access denied for not authenticated sasl_username to send as user %s" % send_as 

					# get dn of sasl_username
					user_filter = filter_format(
						'(uid=%s)',
						(sasl_username,)
					)
					ldap_filter = usersmod.lookup_filter(user_filter)
					user_result = ldap.search(base=options.ldap_base, filter=str(ldap_filter), attr=["uidNumber"])
					if user_result:
						user_dn = user_result[0][1].values()[0][0]
						debug("user_dn=%r", user_dn)

						# check useruidNiumber in kopanoSendAsPrivilege
						if allowed_user_dns and \
								user_dn and \
								user_dn in allowed_user_dns:
							return "DUNNO allowed per user uidNumber"

					return "REJECT Access denied for %s to send as user  %s" % (sasl_username, send_as)
				else:
					return "REJECT Access denied for %s to send as user  %s" % (sasl_username, send_as)
			else:
				return "DUNNO %s is not our email address" % send_as
		except Exception:
			return "DUNNO Error with sasl_username={} send_as={} attrib={}, traceback={}".format(
				sasl_username, send_as, attrib, traceback.format_exc().replace("\n", " "))

# main
attr = {}

# testing
if options.test:
	_do_debug = True
	if not options.sasl_username or not options.send_as:
		sys.stderr.write("sasl_username and send_as are required\n")
		parser.print_help()
		sys.exit(1)
	attr["sasl_username"] = options.sasl_username
	attr["send_as"] = options.send_as
	action = send_as_filter(attr)
	print("action={}\n".format(action))
else:
	# read from stdin python -u is required for unbufferd streams
	while True:
		data = sys.stdin.readline()
		m = re.match(r'([^=]+)=(.*)\n', data)
		if m:
			attr[m.group(1).strip()] = m.group(2).strip()

		elif data == "\n":
			if attr.get("request", None) == "smtpd_access_policy":
				action = send_as_filter(attr)
				debug("action=%s", action)
				sys.stdout.write("action=%s\n\n" % action)
			else:
				sys.stderr.write("unknown action in %s\n" % attr)
				sys.stdout.write("defer_if_permit Service temporarily unavailable\n")
				syslog.syslog(syslog.LOG_ERR, "unknown action in '{}', exiting.".format(attr))
				sys.exit(1)
			attr = {}
		elif data == "":
			# Postfix telling us to shutdown (max_idle).
			debug("shutting down (max_idle)")
			sys.exit(0)
		else:
			syslog.syslog(syslog.LOG_ERR, "received bad data: '{}', exiting.".format(data))
			sys.exit(1)