Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

Commit

Permalink
add Administrative API
Browse files Browse the repository at this point in the history
  • Loading branch information
nmred committed May 5, 2017
1 parent 22fa29e commit 56704d9
Show file tree
Hide file tree
Showing 12 changed files with 1,102 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .php_cs.cache
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"php":"5.5.26","version":"2.1.0","rules":{"encoding":true,"full_opening_tag":true,"blank_line_after_namespace":true,"braces":true,"class_definition":true,"elseif":true,"function_declaration":true,"indentation_type":true,"line_ending":true,"lowercase_constants":true,"lowercase_keywords":true,"method_argument_space":true,"no_closing_tag":true,"no_spaces_after_function_name":true,"no_spaces_inside_parenthesis":true,"no_trailing_whitespace":true,"no_trailing_whitespace_in_comment":true,"single_blank_line_at_eof":true,"single_class_element_per_statement":["property"],"single_import_per_statement":true,"single_line_after_imports":true,"switch_case_semicolon_to_colon":true,"switch_case_space":true,"visibility_required":true},"hashes":{"src\/Kafka\/Exception.php":1887794156,"src\/Kafka\/Exception\/NotSupported.php":2083554785,"src\/Kafka\/Exception\/OutOfRange.php":613717588,"src\/Kafka\/Exception\/Protocol.php":2787269416,"src\/Kafka\/Exception\/Socket.php":2743402340,"src\/Kafka\/Exception\/SocketConnect.php":2883015980,"src\/Kafka\/Exception\/SocketEOF.php":1195314501,"src\/Kafka\/Exception\/SocketTimeout.php":1079612001,"src\/Kafka\/Exception\/Config.php":2153825859,"src\/Kafka\/Protocol\/CommitOffset.php":3711823135,"src\/Kafka\/Protocol\/Fetch.php":4166817029,"src\/Kafka\/Protocol\/FetchOffset.php":2615769477,"src\/Kafka\/Protocol\/GroupCoordinator.php":3030713974,"src\/Kafka\/Protocol\/Heartbeat.php":3269486857,"src\/Kafka\/Protocol\/JoinGroup.php":332592611,"src\/Kafka\/Protocol\/Metadata.php":1609652106,"src\/Kafka\/Protocol\/Offset.php":3262177831,"src\/Kafka\/Protocol\/Produce.php":1894065123,"src\/Kafka\/Protocol\/Protocol.php":476565724,"src\/Kafka\/Protocol\/SyncGroup.php":3584049845,"src\/Kafka\/Protocol.php":87023264,"src\/Kafka\/SingletonTrait.php":3424400384,"src\/Kafka\/Socket.php":1512067958,"src\/Kafka\/ConsumerConfig.php":2320744655,"src\/Kafka\/ProducerConfig.php":1925922788,"src\/Kafka\/Config.php":913412482,"src\/Kafka\/Broker.php":598094486,"src\/Kafka\/Consumer.php":674526766,"src\/Kafka\/Consumer\/Assignment.php":626810662,"src\/Kafka\/Consumer\/Process.php":511720065,"src\/Kafka\/Consumer\/State.php":1179310365,"src\/Kafka\/LoggerTrait.php":1234965748,"src\/Kafka\/Producer.php":2558655402,"src\/Kafka\/Producer\/State.php":3349306988,"src\/Kafka\/Producer\/Process.php":3384630694,"tests\/KafkaMock\/Protocol\/Encoder.php":1383970351,"tests\/KafkaTest\/ClientTest.php":347003966,"tests\/KafkaTest\/Produce\/ProduceTest.php":868593207,"tests\/KafkaTest\/Protocol\/DecoderTest.php":1953905898,"tests\/KafkaTest\/Protocol\/EncoderTest.php":118406869,"tests\/TestConfiguration.php":1872544484,"tests\/_autoload.php":2725147420,"tests\/run-tests.php":3681452361,"tests\/Bootstrap.php":3397345411,"tests\/KafkaTest\/Protocol\/GroupCoordinatorTest.php":3938006454,"tests\/KafkaTest\/Base\/ProtocolTest.php":3926873193,"tests\/KafkaTest\/Base\/BrokerTest.php":3957314909,"tests\/KafkaTest\/Base\/ConsumerConfigTest.php":901088941,"tests\/KafkaTest\/Base\/ProducerConfigTest.php":1821493122,"tests\/KafkaTest\/Protocol\/ProduceTest.php":3670305798,"tests\/KafkaTest\/Protocol\/OffsetTest.php":3799352786,"tests\/KafkaTest\/Protocol\/FetchTest.php":2392957613,"tests\/KafkaTest\/Protocol\/CommitOffsetTest.php":3999089971,"tests\/KafkaTest\/Protocol\/SyncGroupTest.php":1695142498,"tests\/KafkaTest\/Protocol\/HeartbeatTest.php":3012473440,"tests\/KafkaTest\/Protocol\/FetchOffsetTest.php":1945252662,"tests\/KafkaTest\/Protocol\/MetadataTest.php":1723320331,"tests\/KafkaTest\/Protocol\/JoinGroupTest.php":3735777277}}
{"php":"5.5.26","version":"2.1.0","rules":{"encoding":true,"full_opening_tag":true,"blank_line_after_namespace":true,"braces":true,"class_definition":true,"elseif":true,"function_declaration":true,"indentation_type":true,"line_ending":true,"lowercase_constants":true,"lowercase_keywords":true,"method_argument_space":true,"no_closing_tag":true,"no_spaces_after_function_name":true,"no_spaces_inside_parenthesis":true,"no_trailing_whitespace":true,"no_trailing_whitespace_in_comment":true,"single_blank_line_at_eof":true,"single_class_element_per_statement":["property"],"single_import_per_statement":true,"single_line_after_imports":true,"switch_case_semicolon_to_colon":true,"switch_case_space":true,"visibility_required":true},"hashes":{"src\/Kafka\/Exception.php":1887794156,"src\/Kafka\/Exception\/NotSupported.php":2083554785,"src\/Kafka\/Exception\/OutOfRange.php":613717588,"src\/Kafka\/Exception\/Protocol.php":2787269416,"src\/Kafka\/Exception\/Socket.php":2743402340,"src\/Kafka\/Exception\/SocketConnect.php":2883015980,"src\/Kafka\/Exception\/SocketEOF.php":1195314501,"src\/Kafka\/Exception\/SocketTimeout.php":1079612001,"src\/Kafka\/Exception\/Config.php":2153825859,"src\/Kafka\/Protocol\/CommitOffset.php":3711823135,"src\/Kafka\/Protocol\/Fetch.php":4166817029,"src\/Kafka\/Protocol\/FetchOffset.php":2615769477,"src\/Kafka\/Protocol\/GroupCoordinator.php":3030713974,"src\/Kafka\/Protocol\/Heartbeat.php":3269486857,"src\/Kafka\/Protocol\/JoinGroup.php":332592611,"src\/Kafka\/Protocol\/Metadata.php":1609652106,"src\/Kafka\/Protocol\/Offset.php":3262177831,"src\/Kafka\/Protocol\/Produce.php":1894065123,"src\/Kafka\/Protocol\/Protocol.php":476565724,"src\/Kafka\/Protocol\/SyncGroup.php":3584049845,"src\/Kafka\/Protocol.php":338507625,"src\/Kafka\/SingletonTrait.php":3424400384,"src\/Kafka\/Socket.php":1512067958,"src\/Kafka\/ConsumerConfig.php":2320744655,"src\/Kafka\/ProducerConfig.php":1925922788,"src\/Kafka\/Config.php":913412482,"src\/Kafka\/Broker.php":598094486,"src\/Kafka\/Consumer.php":674526766,"src\/Kafka\/Consumer\/Assignment.php":626810662,"src\/Kafka\/Consumer\/Process.php":511720065,"src\/Kafka\/Consumer\/State.php":1179310365,"src\/Kafka\/LoggerTrait.php":1234965748,"src\/Kafka\/Producer.php":2558655402,"src\/Kafka\/Producer\/State.php":3349306988,"src\/Kafka\/Producer\/Process.php":3384630694,"tests\/KafkaMock\/Protocol\/Encoder.php":1383970351,"tests\/KafkaTest\/ClientTest.php":347003966,"tests\/KafkaTest\/Produce\/ProduceTest.php":868593207,"tests\/KafkaTest\/Protocol\/DecoderTest.php":1953905898,"tests\/KafkaTest\/Protocol\/EncoderTest.php":118406869,"tests\/TestConfiguration.php":1872544484,"tests\/_autoload.php":2725147420,"tests\/run-tests.php":4228663547,"tests\/Bootstrap.php":3397345411,"tests\/KafkaTest\/Protocol\/GroupCoordinatorTest.php":3938006454,"tests\/KafkaTest\/Base\/ProtocolTest.php":3926873193,"tests\/KafkaTest\/Base\/BrokerTest.php":3957314909,"tests\/KafkaTest\/Base\/ConsumerConfigTest.php":901088941,"tests\/KafkaTest\/Base\/ProducerConfigTest.php":1821493122,"tests\/KafkaTest\/Protocol\/ProduceTest.php":3670305798,"tests\/KafkaTest\/Protocol\/OffsetTest.php":3799352786,"tests\/KafkaTest\/Protocol\/FetchTest.php":2392957613,"tests\/KafkaTest\/Protocol\/CommitOffsetTest.php":3999089971,"tests\/KafkaTest\/Protocol\/SyncGroupTest.php":1695142498,"tests\/KafkaTest\/Protocol\/HeartbeatTest.php":3012473440,"tests\/KafkaTest\/Protocol\/FetchOffsetTest.php":1945252662,"tests\/KafkaTest\/Protocol\/MetadataTest.php":1723320331,"tests\/KafkaTest\/Protocol\/JoinGroupTest.php":3735777277,"src\/Kafka\/Protocol\/LeaveGroup.php":3263763308,"src\/Kafka\/Protocol\/ListGroup.php":3673950913,"src\/Kafka\/Protocol\/DescribeGroups.php":1369242879,"tests\/KafkaTest\/Protocol\/LeaveGroupTest.php":1567634470,"tests\/KafkaTest\/Protocol\/DescribeGroupsTest.php":759837642,"tests\/KafkaTest\/Protocol\/ListGroupTest.php":3375737744}}
117 changes: 117 additions & 0 deletions example/protocol/DescribeGroups.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
<?php
require '../../vendor/autoload.php';

class DescribeGroups {
protected $group = array();
// {{{ functions
// {{{ protected function joinGroup()

protected function joinGroup() {
$data = array(
'group_id' => 'test',
'session_timeout' => 6000,
'rebalance_timeout' => 6000,
'member_id' => '',
'data' => array(
array(
'protocol_name' => 'group',
'version' => 0,
'subscription' => array('test'),
'user_data' => '',
),
),
);

$protocol = \Kafka\Protocol::init('0.9.1.0');
$requestData = \Kafka\Protocol::encode(\Kafka\Protocol::JOIN_GROUP_REQUEST, $data);

$socket = new \Kafka\Socket('127.0.0.1', '9192');
$socket->SetonReadable(function($data) {
$coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
$result = \Kafka\Protocol::decode(\Kafka\Protocol::JOIN_GROUP_REQUEST, substr($data, 4));
$this->group = $result;
Amp\stop();
});

$socket->connect();
$socket->write($requestData);
Amp\run(function () use ($socket, $requestData) {
});
}

// }}}
// {{{ protected function syncGroup()

protected function syncGroup() {
$this->joinGroup();
$data = array(
'group_id' => 'test',
'generation_id' => $this->group['generationId'],
'member_id' => $this->group['memberId'],
'data' => array(
array(
'version' => 0,
'member_id' => $this->group['memberId'],
'assignments' => array(
array(
'topic_name' => 'test',
'partitions' => array(
0
),
),
),
),
),
);

$protocol = \Kafka\Protocol::init('0.9.1.0');
$requestData = \Kafka\Protocol::encode(\Kafka\Protocol::SYNC_GROUP_REQUEST, $data);

$socket = new \Kafka\Socket('127.0.0.1', '9192');
$socket->SetonReadable(function($data) {
$coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
$result = \Kafka\Protocol::decode(\Kafka\Protocol::SYNC_GROUP_REQUEST, substr($data, 4));
//echo json_encode($result);
Amp\stop();
});

$socket->connect();
$socket->write($requestData);
Amp\run(function () use ($socket, $requestData) {
});
}

// }}}
// {{{ public function run()

public function run() {
$this->joinGroup();
$this->syncGroup();
$data = array(
'test'
);

$protocol = \Kafka\Protocol::init('0.9.1.0');
$requestData = \Kafka\Protocol::encode(\Kafka\Protocol::DESCRIBE_GROUPS_REQUEST, $data);

$socket = new \Kafka\Socket('127.0.0.1', '9192');
$socket->SetonReadable(function($data) {
$coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
$result = \Kafka\Protocol::decode(\Kafka\Protocol::DESCRIBE_GROUPS_REQUEST, substr($data, 4));
echo json_encode($result);
Amp\stop();
});

$socket->connect();
$socket->write($requestData);
Amp\run(function () use ($socket, $requestData) {
});
}

// }}}
// }}}
}

$describe = new DescribeGroups();
$describe->run();

118 changes: 118 additions & 0 deletions example/protocol/LeaveGroup.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
<?php
require '../../vendor/autoload.php';

class LeaveGroup {
protected $group = array();
// {{{ functions
// {{{ protected function joinGroup()

protected function joinGroup() {
$data = array(
'group_id' => 'test',
'session_timeout' => 6000,
'rebalance_timeout' => 6000,
'member_id' => '',
'data' => array(
array(
'protocol_name' => 'group',
'version' => 0,
'subscription' => array('test'),
'user_data' => '',
),
),
);

$protocol = \Kafka\Protocol::init('0.9.1.0');
$requestData = \Kafka\Protocol::encode(\Kafka\Protocol::JOIN_GROUP_REQUEST, $data);

$socket = new \Kafka\Socket('127.0.0.1', '9192');
$socket->SetonReadable(function($data) {
$coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
$result = \Kafka\Protocol::decode(\Kafka\Protocol::JOIN_GROUP_REQUEST, substr($data, 4));
$this->group = $result;
Amp\stop();
});

$socket->connect();
$socket->write($requestData);
Amp\run(function () use ($socket, $requestData) {
});
}

// }}}
// {{{ protected function syncGroup()

protected function syncGroup() {
$this->joinGroup();
$data = array(
'group_id' => 'test',
'generation_id' => $this->group['generationId'],
'member_id' => $this->group['memberId'],
'data' => array(
array(
'version' => 0,
'member_id' => $this->group['memberId'],
'assignments' => array(
array(
'topic_name' => 'test',
'partitions' => array(
0
),
),
),
),
),
);

$protocol = \Kafka\Protocol::init('0.9.1.0');
$requestData = \Kafka\Protocol::encode(\Kafka\Protocol::SYNC_GROUP_REQUEST, $data);

$socket = new \Kafka\Socket('127.0.0.1', '9192');
$socket->SetonReadable(function($data) {
$coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
$result = \Kafka\Protocol::decode(\Kafka\Protocol::SYNC_GROUP_REQUEST, substr($data, 4));
//echo json_encode($result);
Amp\stop();
});

$socket->connect();
$socket->write($requestData);
Amp\run(function () use ($socket, $requestData) {
});
}

// }}}
// {{{ public function run()

public function run() {
$this->joinGroup();
$this->syncGroup();
$data = array(
'group_id' => 'test',
'member_id' => $this->group['memberId'],
);

$protocol = \Kafka\Protocol::init('0.9.1.0');
$requestData = \Kafka\Protocol::encode(\Kafka\Protocol::LEAVE_GROUP_REQUEST, $data);

$socket = new \Kafka\Socket('127.0.0.1', '9192');
$socket->SetonReadable(function($data) {
$coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
$result = \Kafka\Protocol::decode(\Kafka\Protocol::LEAVE_GROUP_REQUEST, substr($data, 4));
echo json_encode($result);
Amp\stop();
});

$socket->connect();
$socket->write($requestData);
Amp\run(function () use ($socket, $requestData) {
});
}

// }}}
// }}}
}

$leave = new LeaveGroup();
$leave->run();

116 changes: 116 additions & 0 deletions example/protocol/ListGroup.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<?php
require '../../vendor/autoload.php';

class ListGroup {
protected $group = array();
// {{{ functions
// {{{ protected function joinGroup()

protected function joinGroup() {
$data = array(
'group_id' => 'test',
'session_timeout' => 6000,
'rebalance_timeout' => 6000,
'member_id' => '',
'data' => array(
array(
'protocol_name' => 'group',
'version' => 0,
'subscription' => array('test'),
'user_data' => '',
),
),
);

$protocol = \Kafka\Protocol::init('0.9.1.0');
$requestData = \Kafka\Protocol::encode(\Kafka\Protocol::JOIN_GROUP_REQUEST, $data);

$socket = new \Kafka\Socket('127.0.0.1', '9192');
$socket->SetonReadable(function($data) {
$coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
$result = \Kafka\Protocol::decode(\Kafka\Protocol::JOIN_GROUP_REQUEST, substr($data, 4));
$this->group = $result;
Amp\stop();
});

$socket->connect();
$socket->write($requestData);
Amp\run(function () use ($socket, $requestData) {
});
}

// }}}
// {{{ protected function syncGroup()

protected function syncGroup() {
$this->joinGroup();
$data = array(
'group_id' => 'test',
'generation_id' => $this->group['generationId'],
'member_id' => $this->group['memberId'],
'data' => array(
array(
'version' => 0,
'member_id' => $this->group['memberId'],
'assignments' => array(
array(
'topic_name' => 'test',
'partitions' => array(
0
),
),
),
),
),
);

$protocol = \Kafka\Protocol::init('0.9.1.0');
$requestData = \Kafka\Protocol::encode(\Kafka\Protocol::SYNC_GROUP_REQUEST, $data);

$socket = new \Kafka\Socket('127.0.0.1', '9192');
$socket->SetonReadable(function($data) {
$coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
$result = \Kafka\Protocol::decode(\Kafka\Protocol::SYNC_GROUP_REQUEST, substr($data, 4));
//echo json_encode($result);
Amp\stop();
});

$socket->connect();
$socket->write($requestData);
Amp\run(function () use ($socket, $requestData) {
});
}

// }}}
// {{{ public function run()

public function run() {
$this->joinGroup();
$this->syncGroup();
$data = array(
);

$protocol = \Kafka\Protocol::init('0.9.1.0');
$requestData = \Kafka\Protocol::encode(\Kafka\Protocol::LIST_GROUPS_REQUEST, $data);

$socket = new \Kafka\Socket('127.0.0.1', '9192');
$socket->SetonReadable(function($data) {
$coodid = \Kafka\Protocol\Protocol::unpack(\Kafka\Protocol\Protocol::BIT_B32, substr($data, 0, 4));
$result = \Kafka\Protocol::decode(\Kafka\Protocol::LIST_GROUPS_REQUEST, substr($data, 4));
echo json_encode($result);
Amp\stop();
});

$socket->connect();
$socket->write($requestData);
Amp\run(function () use ($socket, $requestData) {
});
}

// }}}
// }}}
}

$list = new ListGroup();
$list->run();

6 changes: 3 additions & 3 deletions src/Kafka/Protocol.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ public static function init($version, $logger = null)
\Kafka\Protocol\Protocol::GROUP_COORDINATOR_REQUEST => 'GroupCoordinator',
\Kafka\Protocol\Protocol::JOIN_GROUP_REQUEST => 'JoinGroup',
\Kafka\Protocol\Protocol::HEART_BEAT_REQUEST => 'Heartbeat',
//\Kafka\Protocol\Protocol::LEAVE_GROUP_REQUEST => '',
\Kafka\Protocol\Protocol::LEAVE_GROUP_REQUEST => 'LeaveGroup',
\Kafka\Protocol\Protocol::SYNC_GROUP_REQUEST => 'SyncGroup',
//\Kafka\Protocol\Protocol::DESCRIBE_GROUPS_REQUEST => '',
//\Kafka\Protocol\Protocol::LIST_GROUPS_REQUEST => '',
\Kafka\Protocol\Protocol::DESCRIBE_GROUPS_REQUEST => 'DescribeGroups',
\Kafka\Protocol\Protocol::LIST_GROUPS_REQUEST => 'ListGroup',
);

$namespace = '\\Kafka\\Protocol\\';
Expand Down
Loading

0 comments on commit 56704d9

Please sign in to comment.