-
Notifications
You must be signed in to change notification settings - Fork 7
/
create-partitions.lisp
58 lines (52 loc) · 2.31 KB
/
create-partitions.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
;;; Copyright (C) 2018-2020 Sahil Kang <sahil.kang@asilaycomputing.com>
;;;
;;; This file is part of cl-rdkafka.
;;;
;;; cl-rdkafka is free software: you can redistribute it and/or modify
;;; it under the terms of the GNU General Public License as published by
;;; the Free Software Foundation, either version 3 of the License, or
;;; (at your option) any later version.
;;;
;;; cl-rdkafka is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;;; GNU General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with cl-rdkafka. If not, see <http://www.gnu.org/licenses/>.
(in-package #:cl-rdkafka)
(defgeneric create-partitions
(client topic partitions &key timeout-ms)
(:documentation
"Increase TOPIC's partition count up to PARTITIONS and return PARTITIONS on success."))
(defun make-newpart (topic partitions errstr errstr-len)
(cffi:with-foreign-string (buf topic)
(let ((newpart (cl-rdkafka/ll:rd-kafka-newpartitions-new
buf
partitions
errstr
errstr-len)))
(when (cffi:null-pointer-p newpart)
(error 'allocation-error
:name "newpart"
:description (cffi:foreign-string-to-lisp
errstr :max-chars (1- errstr-len))))
newpart)))
(defun %create-partitions
(rd-kafka-client topic partitions timeout-ms)
(let (admin-options newpart)
(unwind-protect
(cffi:with-foreign-object (errstr :char +errstr-len+)
(setf admin-options (make-admin-options rd-kafka-client)
newpart (make-newpart topic partitions errstr +errstr-len+))
(set-timeout admin-options timeout-ms errstr +errstr-len+)
(perform-admin-op createpartitions rd-kafka-client admin-options newpart))
(when newpart
(cl-rdkafka/ll:rd-kafka-newpartitions-destroy newpart))
(when admin-options
(cl-rdkafka/ll:rd-kafka-adminoptions-destroy admin-options)))))
(def-admin-methods
create-partitions
(client (topic string) (partitions fixnum) &key (timeout-ms 5000))
(%create-partitions pointer topic partitions timeout-ms)
partitions)