-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcluster.clj
126 lines (112 loc) · 5.32 KB
/
cluster.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
(ns franzy.admin.cluster
(:require [franzy.admin.codec :as codec]
[franzy.common.configuration.codec :as config-codec])
(:import (kafka.utils ZkUtils)
(kafka.admin AdminUtils)
(kafka.common BrokerEndPointNotAvailableException)
(clojure.lang IPersistentMap)))
;
;(defprotocol BrokerWriter
; ;;TODO: find if underlying data structure for this
; (register-broker [this id host port endpoints jmx-port]))
;
;(defprotocol ClusterMetadataProvider
; (cluster-info [this]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Reading Brokers
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn all-brokers
"Returns a list of all the broker info for each broker in the cluster."
[^ZkUtils zk-utils]
(->> (.getAllBrokersInCluster zk-utils)
(codec/decode)))
(defn broker-ids
"Gets a sorted list of broker IDs."
[^ZkUtils zk-utils]
(->>
(.getSortedBrokerList zk-utils)
(codec/decode)))
;;TODO: catch exception and return nil?
(defn broker-metadata
"Returns broker info from Zookeeper, given a broker id.
If the broker dies before the Zookeeper query finishes, an exception will be thrown."
[^ZkUtils zk-utils broker-id]
(->> (int broker-id)
(.getBrokerInfo zk-utils)
(codec/decode)))
(defn broker-endpoints-for-channel
"Returns all broker info in the cluster matching a keyword representing the given protocol type.
Valid values are:
:ssl
:plaintext
:sasl_plaintext
:sasl_ssl
:trace"
[^ZkUtils zk-utils protocol-type]
;Scala throws a BrokerEndPointNotAvailableException, but not sure why we care if we're querying.
;There is one exception, which is when a broker dies but we know it should be on a particular channel, in which case returning nil might be less informative.
;Might want to not swallow this, but seems like throwing an exception here makes this api less useful
(try
(->> protocol-type
(codec/encode-security-protocol)
(.getAllBrokerEndPointsForChannel zk-utils)
(codec/decode))
(catch BrokerEndPointNotAvailableException e)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Write Brokers
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;TODO - this needs an explicit encoding as the nested maps cause issues unless using records
;(defn register-broker!
; "Registers a broker in the cluster.
;
; Example:
;
; ```
; (let [id 1005
; host \"127.0.0.1\"
; port 9092
; ;;Security Protocol/Endpoint
; endpoints {:plaintext {:host \"127.0.0.1\"
; :port 9092
; :protocol-type :plaintext}}
; jmx-port -1]
; (register-broker! zk-utils id host port endpoints jmx-port)"
; ([^ZkUtils zk-utils {:keys [id host port endpoints jmx-port]
; :or {jmx-port -1}}]
; (register-broker! zk-utils id host port endpoints jmx-port))
; ([^ZkUtils zk-utils id ^String host ^String port endpoints jmx-port]
; (.registerBrokerInZk zk-utils id host port (codec/encode endpoints) jmx-port)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Replicas
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn manual-replication-assignment
"Creates a manual replication assignment map, given a string of replica assignments, a set of broker ids, a start
partition id, and an optional check if a broker is available."
([^String replica-assignments broker-ids start-partition-id]
(manual-replication-assignment replica-assignments broker-ids start-partition-id true))
([^String replica-assignments broker-ids start-partition-id check-broker-available?]
(AdminUtils/getManualReplicaAssignment replica-assignments (set broker-ids) (int start-partition-id) check-broker-available?)))
(defn assign-replicas-to-brokers
([broker-ids partitions replication-factor]
(assign-replicas-to-brokers broker-ids partitions replication-factor -1 -1))
([broker-ids partitions replication-factor fixed-start-index start-partition-id]
(-> broker-ids
(codec/encode)
(AdminUtils/assignReplicasToBrokers (int partitions) (int replication-factor) (int fixed-start-index) (int start-partition-id))
(codec/decode))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Zookeeper
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn delete-broker-topic-path!
"Deletes the broker topic path in Zookeeper."
[^ZkUtils zk-utils broker-id ^String topic]
(.deletePartition zk-utils (int broker-id) topic))
(defn broker-ids-path []
"Returns the path in Zookeeper used for broker ids."
(ZkUtils/BrokerIdsPath))
(defn broker-sequence-id-path []
"Returns the broker sequence id path in Zookeeper."
(ZkUtils/BrokerSequenceIdPath))
(defn broker-topics-path []
"Returns the broker topics path in Zookeeper."
(ZkUtils/BrokerTopicsPath))