From e7fb876b906d2cae2e8d119359c8d94f768614fd Mon Sep 17 00:00:00 2001 From: Fateh Singh Date: Mon, 2 Oct 2023 23:43:17 -0700 Subject: [PATCH] RANGER-4400: Fixed ConcurrentModificationException in RangerKafkaAuditHandler.processResults(Collection results) --- .../authorizer/RangerKafkaAuditHandler.java | 78 +++++++------------ 1 file changed, 29 insertions(+), 49 deletions(-) diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java index 459e874f10..f16fbd6b6e 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java @@ -28,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Collection; public class RangerKafkaAuditHandler extends RangerDefaultAuditHandler { @@ -36,8 +35,6 @@ public class RangerKafkaAuditHandler extends RangerDefaultAuditHandler { private AuthzAuditEvent auditEvent = null; - private ArrayList auditEventList = new ArrayList<>(); - public RangerKafkaAuditHandler(){ } @@ -45,56 +42,42 @@ public RangerKafkaAuditHandler(){ public void processResult(RangerAccessResult result) { // If Cluster Resource Level Topic Creation is not Allowed we don't audit. // Subsequent call from Kafka for Topic Creation at Topic resource Level will be audited. - if(LOG.isDebugEnabled()) { - LOG.debug("==> RangerKafkaAuditHandler.processResult()"); + if(LOG.isTraceEnabled()) { + LOG.trace("==> RangerKafkaAuditHandler.processResult()"); } if (!isAuditingNeeded(result)) { return; } auditEvent = super.getAuthzEvents(result); - if(LOG.isDebugEnabled()) { - LOG.debug("<== RangerKafkaAuditHandler.processResult()"); + if(LOG.isTraceEnabled()) { + LOG.trace("<== RangerKafkaAuditHandler.processResult()"); } } @Override public void processResults(Collection results) { - if(LOG.isDebugEnabled()) { - LOG.debug("==> RangerKafkaAuditHandler.processResults(" + results + ")"); + if(LOG.isTraceEnabled()) { + LOG.trace("==> RangerKafkaAuditHandler.processResults(" + results + ")"); } - for(RangerAccessResult res: results){ - if (isAuditingNeeded(res)){ - AuthzAuditEvent event = super.getAuthzEvents(res); - if(event!=null){ - if(LOG.isDebugEnabled()) { - LOG.debug("Got event=" + event + " for RangerAccessResult=" + res); - } - auditEventList.add(event); - } - else{ - if(LOG.isDebugEnabled()) { - LOG.debug("No audit event for :" + res); - } - } - } - else { - if(LOG.isDebugEnabled()) { - LOG.debug("Auditing not required for :"+res); - } + if (results!=null){ + for(RangerAccessResult res: results){ + processResult(res); + flushAudit(); } } - if(LOG.isDebugEnabled()) { - LOG.debug("<== RangerKafkaAuditHandler.processResults(" + results + ")"); + if(LOG.isTraceEnabled()) { + LOG.trace("<== RangerKafkaAuditHandler.processResults(" + results + ")"); } } + private boolean isAuditingNeeded(final RangerAccessResult result) { - if(LOG.isDebugEnabled()) { - LOG.debug("==> RangerKafkaAuditHandler.isAuditingNeeded()"); + if(LOG.isTraceEnabled()) { + LOG.trace("==> RangerKafkaAuditHandler.isAuditingNeeded()"); } boolean ret = true; boolean isAllowed = result.getIsAllowed(); - RangerAccessRequest request = result.getAccessRequest(); + RangerAccessRequest request = result.getAccessRequest(); RangerAccessResourceImpl resource = (RangerAccessResourceImpl) request.getResource(); String resourceName = (String) resource.getValue(RangerKafkaAuthorizer.KEY_CLUSTER); if (resourceName != null) { @@ -102,31 +85,28 @@ private boolean isAuditingNeeded(final RangerAccessResult result) { ret = false; } } - if(LOG.isDebugEnabled()) { - LOG.debug("RangerKafkaAuditHandler: isAuditingNeeded()"); - LOG.debug("request:"+request); - LOG.debug("resource:"+resource); - LOG.debug("resourceName:"+resourceName); - LOG.debug("request.getAccessType():"+request.getAccessType()); - LOG.debug("isAllowed:"+isAllowed); - LOG.debug("ret="+ret); - LOG.debug("<== RangerKafkaAuditHandler.isAuditingNeeded() = "+ret+" for result="+result); + if(LOG.isTraceEnabled()) { + LOG.trace("RangerKafkaAuditHandler: isAuditingNeeded()"); + LOG.trace("request:"+request); + LOG.trace("resource:"+resource); + LOG.trace("resourceName:"+resourceName); + LOG.trace("request.getAccessType():"+request.getAccessType()); + LOG.trace("isAllowed:"+isAllowed); + LOG.trace("ret="+ret); + LOG.trace("<== RangerKafkaAuditHandler.isAuditingNeeded() = "+ret+" for result="+result); } return ret; } public void flushAudit() { - if(LOG.isDebugEnabled()) { - LOG.debug("==> RangerKafkaAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent +" list="+ auditEventList+ ")"); + if(LOG.isTraceEnabled()) { + LOG.trace("==> RangerKafkaAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent+")"); } if (auditEvent != null) { super.logAuthzAudit(auditEvent); } - else if (auditEventList.size()>0){ - super.logAuthzAudits(auditEventList); - } - if(LOG.isDebugEnabled()) { - LOG.debug("<== RangerKafkaAuditHandler.flushAudit()"); + if(LOG.isTraceEnabled()) { + LOG.trace("<== RangerKafkaAuditHandler.flushAudit()"); } } }