Skip to content

Commit

Permalink
fix(topicdata): Use the partition leader from partition info (#1388)
Browse files Browse the repository at this point in the history
relate to #657 #1364
  • Loading branch information
justinmchase authored and tchiotludo committed Apr 4, 2023
1 parent 301937c commit 397cd27
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions src/main/java/org/akhq/models/Partition.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
@Getter
@NoArgsConstructor
public class Partition {
private Node.Partition leader;
private int id;
private String topic;
private List<Node.Partition> nodes;
Expand All @@ -27,27 +28,36 @@ public Partition(String topic, TopicPartitionInfo partitionInfo, List<LogDir> lo
this.firstOffset = offsets.getFirstOffset();
this.lastOffset = offsets.getLastOffset();
this.nodes = new ArrayList<>();

for (org.apache.kafka.common.Node replica : partitionInfo.replicas()) {
nodes.add(new Node.Partition(
Node.Partition partition = new Node.Partition(
replica,
partitionInfo.leader().id() == replica.id(),
partitionInfo.isr().stream().anyMatch(node -> node.id() == replica.id())
));
);

this.nodes.add(partition);
if (partition.isLeader()) {
this.leader = partition;
}
}

if (this.leader == null) {
org.apache.kafka.common.Node leader = partitionInfo.leader();
this.leader = new Node.Partition(
leader,
true,
partitionInfo.isr().stream().anyMatch(node -> node.id() == leader.id())
);
}
}

public Node.Partition getLeader() {
return nodes
.stream()
.filter(Node.Partition::isLeader)
.findFirst()
.orElseThrow(() -> new NoSuchElementException("Leader not found"));
return this.leader;
}

public long getLogDirSize() {
return this.getLogDir().stream()
.filter(logDir -> logDir.getBrokerId() == this.getLeader().getId())
.filter(logDir -> this.leader != null && logDir.getBrokerId() == this.leader.getId())
.map(LogDir::getSize)
.reduce(0L, Long::sum);
}
Expand Down

0 comments on commit 397cd27

Please sign in to comment.