Send email from other address restriction check (SOLVED)

Ok.

solved with new script.

update 1.01.2019 fix error, script search now mailPrimaryAddress from authenticated user
next step- aliases

Problems:

  1. Kopano web user are not sasl authenticated.

Due to this we should set these restrictions to postfix:

smtpd_recipient_restrictions = check_policy_service unix:private/listfilter,  # check maillist resctiction
        permit_mynetworks,  #allow web kopano send  no sasl authenticated emails.  Kopano has self check "sendas" restriction mechanism. 
	check_policy_service unix:private/testfilter,  # here our new filter !!
        permit_sasl_authenticated, 
        reject_unauth_destination,
        reject_unlisted_recipient
  1. In this position our script will check ALL emails , for example it will check incoming emails from externel mail server. And we have 2 options.
    a). don’t check emails from external mail servers and getting mailforget SPAM
    b) check external mail server too, but only if they trying to send from our email address.

i prefer b) , but you can switch to a) by pass_not_authenticated = True in script.

  1. i don’t know how kopano generate this file /etc/listfilter.secret, it generated by listfilter option, but if you use listfilter you don’t need care about it.

postfix configuration:

master.cf.local)

send_as_filter     unix  -       n       n       -       30       spawn user=listfilter argv=/usr/share/univention-mail-postfix/send_as_filter.py
    -b dc=your_ldap_domain,dc=local 

ucr set  mail/postfix/smtpd/restrictions/recipient/20="check_policy_service unix:private/send_as_filter"
univention-config-registry commit /etc/postfix/main.cf
univention-config-registry commit /etc/postfix/master.cf
service postfix reload

/usr/share/univention-mail-postfix/send_as_filter.py

#!/usr/bin/python2.7 -u
#



import univention.uldap
import optparse
import sys
import re
import traceback
import syslog
from ldap.filter import filter_format
from univention.config_registry import ConfigRegistry
import univention.admin.modules

usage = "help"
parser = optparse.OptionParser(usage=usage)
parser.add_option("-b", "--ldap_base", dest="ldap_base", help="ldap base")
parser.add_option("-s", "--sender", dest="sender", help="sender address (for use with -t)")
parser.add_option("-t", "--test", dest="test", help="test run", action="store_true", default=False)
options, args = parser.parse_args()

syslog.openlog(ident="kopano_send_as_filter", logoption=syslog.LOG_PID, facility=syslog.LOG_MAIL)
ucr = ConfigRegistry()
ucr.load()
univention.admin.modules.update()
usersmod = univention.admin.modules.get("users/user")
_do_debug = True

# should we pass mails from externel mail servers without checking?  If this case we will have potential mailforged SPAM to us.
pass_not_authenticated = True


def debug(msg, *args):
	if _do_debug:
		msg = "kopano_send_as_filter: {}".format(msg % args)
		if options.test:
			sys.stderr.write("{}\n".format(msg))
		else:
			syslog.syslog(syslog.LOG_DEBUG, msg)


def listfilter(attrib):
	sender = attrib.get("sasl_username", None)
	send_as = attrib.get("sender", None)

	debug("sender=%r send_as=%r", sender, send_as)
	debug("attrib=%r", attrib)


	#if sender == "relay":
	#    return "DUNNO local relay server"
	    

	if pass_not_authenticated:
		if not sender:
			return "DUNNO not sasl_authenticated. pass_not_authenticated = True option is enabled"

	if not options.ldap_base:
		return "443 LDAP base not set."
	elif not send_as:
		return "REJECT Access denied for empty sender."
	else:
		try:
			ldap = univention.uldap.getMachineConnection(ldap_master=False, secret_file="/etc/listfilter.secret")

			user_dn = ""
			users_groups = []
			allowed_user_dns = []

			# try the ldap stuff, if that fails send email anyway
			# get send_as restriction
			ldap_attr = ["kopanoSendAsPrivilege"]
			ldap_filter = filter_format(
				'(&(mailPrimaryAddress=%s)(|(objectclass=kopano-user)))',
				(send_as,))
			result = ldap.search(base=options.ldap_base, filter=ldap_filter, attr=ldap_attr)

			if result:
				debug("sender=%r", sender)
				user_filter = filter_format(
                                        '(uid=%s)',
                                        (sender,)
                                )

				primarymail_ldap_filter = usersmod.lookup_filter(user_filter)
				primarymail = ldap.search(base=options.ldap_base, filter=str(primarymail_ldap_filter), attr=["mailPrimaryAddress"])
				#primarymail = ldap.search(base=options.ldap_base, filter=str(ldap_filter), attr=["mailPrimaryAddress"])
				primarymail = primarymail[0][1].values()[0][0]
				if primarymail == send_as:
				    debug("mail=%r, sender=%r", primarymail, send_as)
				    return "DUNNO user is mail owner"
                                debug("mail=%r, sender=%r", primarymail, send_as)
				
				# get allowed user dns
				for u in result[0][1].get("kopanoSendAsPrivilege", []):
					allowed_user_dns.append(u)

				# check if there are restrictions, check sender first
				if allowed_user_dns:
					debug("allowed_user_dns=%r", allowed_user_dns)
					if not sender:
						return "REJECT Access denied for not authenticated sender to send as user %s" % send_as 

					# get dn of sender
					user_filter = filter_format(
						'(uid=%s)',
						(sender,)
					)
					ldap_filter = usersmod.lookup_filter(user_filter)
					user_result = ldap.search(base=options.ldap_base, filter=str(ldap_filter), attr=["uidNumber"])
					if user_result:
						user_dn = user_result[0][1].values()[0][0]
						debug("user_dn=%r", user_dn)

						# check useruidNiumber in kopanoSendAsPrivilege
						if allowed_user_dns and \
								user_dn and \
								user_dn in allowed_user_dns:
							return "DUNNO allowed per user uidNumber"

					return "REJECT Access denied for %s to send as user  %s" % (sender, send_as)
				else:
					return "REJECT Access denied for %s to send as user  %s" % (sender, send_as)
			else:
				return "DUNNO %s is not our email address" % send_as
		except Exception:
			return "WARN Error with sender={} send_as={} attrib={}, traceback={}".format(
				sender, send_as, attrib, traceback.format_exc().replace("\n", " "))

# main
attr = {}

# testing
if options.test:
	_do_debug = True
	if not options.sender or not options.send_as:
		sys.stderr.write("sender and send_as are required\n")
		parser.print_help()
		sys.exit(1)
	attr["sender"] = options.sender
	attr["send_as"] = options.send_as
	action = listfilter(attr)
	print("action={}\n".format(action))
else:
	# read from stdin python -u is required for unbufferd streams
	while True:
		data = sys.stdin.readline()
		m = re.match(r'([^=]+)=(.*)\n', data)
		if m:
			attr[m.group(1).strip()] = m.group(2).strip()

		elif data == "\n":
			if attr.get("request", None) == "smtpd_access_policy":
				action = listfilter(attr)
				debug("action=%s", action)
				sys.stdout.write("action=%s\n\n" % action)
			else:
				sys.stderr.write("unknown action in %s\n" % attr)
				sys.stdout.write("defer_if_permit Service temporarily unavailable\n")
				syslog.syslog(syslog.LOG_ERR, "unknown action in '{}', exiting.".format(attr))
				sys.exit(1)
			attr = {}
		elif data == "":
			# Postfix telling us to shutdown (max_idle).
			debug("shutting down (max_idle)")
			sys.exit(0)
		else:
			syslog.syslog(syslog.LOG_ERR, "received bad data: '{}', exiting.".format(data))
			sys.exit(1)