Skip to content

Commit

Permalink
RANGER-4400: Fixed ConcurrentModificationException in RangerKafkaAudi…
Browse files Browse the repository at this point in the history
…tHandler.processResults(Collection<RangerAccessResult> results) (#285)

(cherry picked from commit 376c8f7)
  • Loading branch information
fateh288 authored and mneethiraj committed Jun 20, 2024
1 parent c2f13fe commit 8c06079
Showing 1 changed file with 29 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,105 +28,85 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;

public class RangerKafkaAuditHandler extends RangerDefaultAuditHandler {
private static final Logger LOG = LoggerFactory.getLogger(RangerKafkaAuditHandler.class);

private AuthzAuditEvent auditEvent = null;

private ArrayList<AuthzAuditEvent> auditEventList = new ArrayList<>();

public RangerKafkaAuditHandler(){
}

@Override
public void processResult(RangerAccessResult result) {
// If Cluster Resource Level Topic Creation is not Allowed we don't audit.
// Subsequent call from Kafka for Topic Creation at Topic resource Level will be audited.
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerKafkaAuditHandler.processResult()");
if(LOG.isTraceEnabled()) {
LOG.trace("==> RangerKafkaAuditHandler.processResult()");
}
if (!isAuditingNeeded(result)) {
return;
}
auditEvent = super.getAuthzEvents(result);
if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerKafkaAuditHandler.processResult()");
if(LOG.isTraceEnabled()) {
LOG.trace("<== RangerKafkaAuditHandler.processResult()");
}
}
@Override
public void processResults(Collection<RangerAccessResult> results) {
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerKafkaAuditHandler.processResults(" + results + ")");
if(LOG.isTraceEnabled()) {
LOG.trace("==> RangerKafkaAuditHandler.processResults(" + results + ")");
}
for(RangerAccessResult res: results){
if (isAuditingNeeded(res)){
AuthzAuditEvent event = super.getAuthzEvents(res);
if(event!=null){
if(LOG.isDebugEnabled()) {
LOG.debug("Got event=" + event + " for RangerAccessResult=" + res);
}
auditEventList.add(event);
}
else{
if(LOG.isDebugEnabled()) {
LOG.debug("No audit event for :" + res);
}
}
}
else {
if(LOG.isDebugEnabled()) {
LOG.debug("Auditing not required for :"+res);
}
if (results!=null){
for(RangerAccessResult res: results){
processResult(res);
flushAudit();
}
}

if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerKafkaAuditHandler.processResults(" + results + ")");
if(LOG.isTraceEnabled()) {
LOG.trace("<== RangerKafkaAuditHandler.processResults(" + results + ")");
}
}


private boolean isAuditingNeeded(final RangerAccessResult result) {
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerKafkaAuditHandler.isAuditingNeeded()");
if(LOG.isTraceEnabled()) {
LOG.trace("==> RangerKafkaAuditHandler.isAuditingNeeded()");
}
boolean ret = true;
boolean isAllowed = result.getIsAllowed();
RangerAccessRequest request = result.getAccessRequest();
RangerAccessRequest request = result.getAccessRequest();
RangerAccessResourceImpl resource = (RangerAccessResourceImpl) request.getResource();
String resourceName = (String) resource.getValue(RangerKafkaAuthorizer.KEY_CLUSTER);
if (resourceName != null) {
if (request.getAccessType().equalsIgnoreCase(RangerKafkaAuthorizer.ACCESS_TYPE_CREATE) && !isAllowed) {
ret = false;
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("RangerKafkaAuditHandler: isAuditingNeeded()");
LOG.debug("request:"+request);
LOG.debug("resource:"+resource);
LOG.debug("resourceName:"+resourceName);
LOG.debug("request.getAccessType():"+request.getAccessType());
LOG.debug("isAllowed:"+isAllowed);
LOG.debug("ret="+ret);
LOG.debug("<== RangerKafkaAuditHandler.isAuditingNeeded() = "+ret+" for result="+result);
if(LOG.isTraceEnabled()) {
LOG.trace("RangerKafkaAuditHandler: isAuditingNeeded()");
LOG.trace("request:"+request);
LOG.trace("resource:"+resource);
LOG.trace("resourceName:"+resourceName);
LOG.trace("request.getAccessType():"+request.getAccessType());
LOG.trace("isAllowed:"+isAllowed);
LOG.trace("ret="+ret);
LOG.trace("<== RangerKafkaAuditHandler.isAuditingNeeded() = "+ret+" for result="+result);
}
return ret;
}

public void flushAudit() {
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerKafkaAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent +" list="+ auditEventList+ ")");
if(LOG.isTraceEnabled()) {
LOG.trace("==> RangerKafkaAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent+")");
}
if (auditEvent != null) {
super.logAuthzAudit(auditEvent);
}
else if (auditEventList.size()>0){
super.logAuthzAudits(auditEventList);
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerKafkaAuditHandler.flushAudit()");
if(LOG.isTraceEnabled()) {
LOG.trace("<== RangerKafkaAuditHandler.flushAudit()");
}
}
}

0 comments on commit 8c06079

Please sign in to comment.