forked from telosprotocol/xbase
-
Notifications
You must be signed in to change notification settings - Fork 0
/
xobject.h
executable file
·862 lines (750 loc) · 48 KB
/
xobject.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
// Copyright (c) 2018-2020 Telos Foundation & contributors
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#pragma once
#include <string>
#include <memory>
#include <functional>
#include "xbase.h"
#include "xint.h"
#include "xmem.h"
namespace top
{
namespace base
{
class xcall_t;
class xpacket_t;
class xmailbox_t;
class xdatabox_t;
class xiothread_t;
class xiosignaler_t;
class xcontext_t;
class xrefcount_t
{
protected:
xrefcount_t();
virtual ~xrefcount_t();
public:
virtual int32_t add_ref();
virtual int32_t release_ref();
public:
int32_t get_refcount() const { return m_refcount;}
protected:
//the default implementation do delete ,so any object inherited from xrefcount_t must create by new operator
virtual bool destroy()
{
delete this;
return true;
}
private:
std::atomic<int32_t> m_refcount; //reference count as atom operate
};
//xobject_t require to create object by new ,and destroy by release
enum enum_xobject_type
{
//application may use types of [1,1280]
enum_xobject_type_max = 1279,
enum_xobject_type_consensus_max = 1152,
enum_xobject_type_consensus_min = 1025,
enum_xobject_type_data_max = 356,
enum_xobject_type_data_min = 100,
enum_xobject_type_xcons_max = 99, //for xconsensus
enum_xobject_type_xcons_min = 65, //for xconsensus
enum_xobject_type_system_contract = 64,
enum_xobject_app_type_undefine = 1,
//xbase and core modules reserved types of [0,-32767]
enum_xobject_type_base_max = 0, //after that is open for application
enum_xobject_type_base = 0,
enum_xobject_type_object = 0,
enum_xobject_type_iobject = -1,
enum_xobject_type_iohandle = -2,
enum_xobject_type_signaler = -3,
enum_xobject_type_mailbox = -4,
enum_xobject_type_databox = -5,
enum_xobject_type_timer = -6,
enum_xobject_type_thread = -7,
enum_xobject_type_woker = -8, //worker thread object
enum_xobject_type_vevent = -9, //general event
enum_xobject_type_ionode = -10, //general io-node
enum_xobject_type_dataunit = -11,//general data unit
enum_xobject_type_datapdu = -12,//general data pdu/msg
enum_xobject_type_dataobj = -13,//general data object
enum_xobject_type_endpoint = -14,//general endpoint
enum_xobject_type_socket = -23, //xsocket object
enum_xobject_type_connection= -24, //xconnection object
enum_xobject_type_node = -25, //xnode_t object
enum_xobject_type_service = -27, //service
enum_xobject_type_vaccount = -28, //account
enum_xobject_type_xdbgplugin = -39, //used for xdbgplugin_t
//block-chain related
enum_xobject_type_xhashplugin = -40, //universal hash function,refer xhash_t object
enum_xobject_type_vqccert = -41, //quorum certification
enum_xobject_type_vheader = -42, //general virtual block header
enum_xobject_type_vblock = -43, //general virtual block object
enum_xobject_type_vbstore = -44, //manage vblock of db/disk
enum_xobject_type_vcauth = -45, //Certificate-Authority
enum_xobject_type_vnodesvr = -46, //service for Node management
enum_xobject_type_vcache = -47, //cache layer with function of persist store
enum_xobject_type_ventity = -48, //general virtual entity of block body
enum_xobject_type_binventity= -49, //general binary entity of block body
enum_xobject_type_vinput = -50, //general virtual input of block body
enum_xobject_type_voutput = -51, //general virtual outpu of block body
enum_xobject_type_min = -255,
};
//all xobject_t and subclass must create by new operation
class xobject_t : virtual public xrefcount_t
{
friend class xiosignaler_t;
public:
enum {enum_obj_type = enum_xobject_type_base};
public:
xobject_t();
xobject_t(const int16_t _enum_xobject_type_); //_enum_xobject_type_ refer enum_xobject_type
protected:
virtual ~xobject_t();
private:
xobject_t(const xobject_t &);
xobject_t & operator = (const xobject_t &);
public:
inline int64_t get_obj_id() const { return m_object_id;}
inline int get_obj_type() const { return m_object_type;}
inline int get_obj_flags() const {return m_object_flags;}
inline int get_obj_load() const {return m_load;}
inline int get_last_error() const {return m_last_error;}
virtual std::string get_obj_name() const {return std::string();} //each object may has own name as plugin
virtual bool is_close();
virtual bool close(bool force_async = true);
//note: query_interface search vertically from subclass ->parent class ->root class of this object
//note: query_interface not involve add_ref operation,so caller need do it manually if need keep returned ptr longer
//caller respond to cast (void*) to related interface ptr
virtual void* query_interface(const int32_t _enum_xobject_type_);
//plugin query and register
//note:query_plugin search horizontally from this to parent for plugin
//caller respond to cast (xobject_t*) to related object ptr,and release_ref it as query_plugin has done add_ref before return
virtual xobject_t* query_plugin(const std::string & plugin_uri){return NULL;} //uri must be formated as ./name, ../name, */name, or name
//register a plugin with name of object, must finish all registeration at init stage of object for multiple-thread safe
virtual bool register_plugin(xobject_t * plugin_ptr) {return false;}
virtual std::string dump() const; //just for debug purpose
#if defined(__USE_MEMORY_POOL__)
void* operator new(size_t size);
void operator delete(void *p);
#endif
inline void set_obj_flag(const uint16_t flag) {m_object_flags |= flag;} //subclass need arrange those flag well
inline void reset_obj_flag(const uint16_t flag){m_object_flags &= (~flag);}//subclass need ensure flag just keep 1 bit
inline bool check_obj_flag(const uint16_t flag) const {return ((m_object_flags & flag) != 0);}
inline void reset_obj_flags(){m_object_flags = 0;}
protected:
inline void set_load(const uint8_t load){m_load = load;}
inline void set_last_error(const int16_t err){m_last_error = err;}
bool set_type(const int16_t _enum_xobject_type_);//use carefully, only allow set when m_object_type is 0
private:
//xobject may bind a xiosignal object that may trigger and wakeup the host thread when need
//signal is going close when receive error_code as enum_xerror_code_close
//return false means object not handled this event
virtual bool on_signal_up(int32_t error_code,int32_t cur_thread_id, uint64_t time_now_ms){return false;}
private: //ARM required 4/8 byte alignment,dont change order the member variable othewise may throw EXC_ARM_DA_ALIGN
aligned_int64_t m_object_id; //unique object id
int16_t m_object_type; //pre-defined object type,see enum_xobject_type
uint16_t m_object_flags; //object 'flags
int16_t m_last_error; //0:successful, present internal error when < 0 ,present system error when > 0
uint8_t m_closed; //indicated whether object is closed but not destroyed yet
uint8_t m_load; //[0,101],object is at invliad status when load > 100
};
//extern/3rd part register into xcontext to tracking memory and xiobject'lifecycle. use it as following:
//step#1: xcontext_t::instance().set_debug_modes(enum_debug_mode_memory_check)
//step#2: xcontext_t::instance().set_debug_plugin(xdbgplugin_t * plugin);
class xdbgplugin_t : public xobject_t
{
protected:
xdbgplugin_t();
virtual ~xdbgplugin_t();
private:
xdbgplugin_t(const xdbgplugin_t &);
xdbgplugin_t & operator = (const xdbgplugin_t &);
public:
virtual void* query_interface(const int32_t type) override;
public://subclass need overide
virtual bool on_object_create(xobject_t* target) = 0;
virtual bool on_object_destroy(xobject_t* target) = 0;
virtual bool on_object_addref(xobject_t* target) = 0;
virtual bool on_object_releaseref(xobject_t* target) = 0;
};
typedef std::function<void(void*)> xfunction_t;
typedef void (*xfunction_ptr)(void*);
class xparam_t
{
public:
enum enum_xparam_type
{
enum_xparam_type_null = 0, //empty
enum_xparam_type_int64 = 1, //int64...
enum_xparam_type_uint64 = 2, //uint64...
enum_xparam_type_uint256 = 3, //uint256..
enum_xparam_type_xobject = 4, //xobject_t*
enum_xparam_type_xfunction = 5, //xfunction_t*
enum_xparam_type_stdstring = 6, //std::string
enum_xparam_type_xcall = 7, //xcallback_t*
};
public:
xparam_t();
xparam_t(const int32_t val);
xparam_t(const int64_t val);
xparam_t(const uint64_t val);
xparam_t(xobject_t * val);
xparam_t(const uint256_t & val);
xparam_t(const xfunction_t & func_obj);
xparam_t(const xfunction_t* func_ptr);
xparam_t(const xfunction_ptr void_ptr);
xparam_t(const std::string & _strval);
~xparam_t();
xparam_t(const xparam_t & obj);
xparam_t & operator = (const xparam_t & right);
void copy_from(const xparam_t & obj);
void move_from(xparam_t & obj);
void close();
public:
inline enum_xparam_type get_type() const {return param_type;}
inline xobject_t* get_object() const {return object_ptr;}
inline int64_t get_int64() const {return int64_val;}
inline uint64_t get_uint64() const {return uint64_val;}
inline uint256_t* get_uint256() const {return uint256_ptr;}
inline xfunction_t* get_function() const {return function_ptr;}
inline const std::string& get_string() const {return string_val;}
private:
union
{
xobject_t* object_ptr;
xfunction_t* function_ptr;
uint256_t* uint256_ptr;
int64_t int64_val;
uint64_t uint64_val;
};
std::string string_val;
enum_xparam_type param_type;
};
typedef std::function<bool(xcall_t&,const int32_t thread_id,const uint64_t timenow_ms) > xcallback_t;
typedef bool (*xcallback_ptr)(xcall_t&,const int32_t thread_id,const uint64_t timenow_ms);
class xcall_t
{
public:
xcall_t();
//init as std::function object
xcall_t(xcallback_t call);
xcall_t(xcallback_t call,xparam_t param1); //init m_param1 to param1
xcall_t(xcallback_t call,xparam_t param1,xparam_t param2); //init m_param1 and m_param2 to param1,param2
xcall_t(xcallback_t call,xparam_t param1,xparam_t param2,xparam_t param3);//m_param1,m_param2,m_param3 are inited
//init as reference
xcall_t(xcallback_t &call);
xcall_t(xcallback_t &call,xparam_t param1); //init m_param1 to param1
xcall_t(xcallback_t &call,xparam_t param1,xparam_t param2); //init m_param1 and m_param2 to param1,param2
xcall_t(xcallback_t &call,xparam_t param1,xparam_t param2,xparam_t param3);//m_param1,m_param2,m_param3 are inited
//init as function pointer
xcall_t(xcallback_ptr call);
xcall_t(xcallback_ptr call,xparam_t param1); //init m_param1 to param1
xcall_t(xcallback_ptr call,xparam_t param1,xparam_t param2); //init m_param1 and m_param2 to param1,param2
xcall_t(xcallback_ptr call,xparam_t param1,xparam_t param2,xparam_t param3);//m_param1,m_param2,m_param3 are inited
~xcall_t();
xcall_t(const xcall_t & obj);
xcall_t & operator = (const xcall_t & obj);
public://optmize for memory copy for xparam_t
void bind(xparam_t & param1); //m_param1 is overwrited by param1
void bind(xparam_t & param1,xparam_t & param2); //m_param1 and m_param2 are overwrited by param1 and param2
void bind(xparam_t & param1,xparam_t & param2,xparam_t & param3);//m_param1,m_param2,m_param3 are overwrited
void bind_result(xparam_t & result);//m_result is overwrited by result
void bind_taskid(const uint32_t task_id){m_task_id = task_id;}
void move_from(xcall_t & obj);
void copy_from(const xcall_t & obj);
void close();
void init(){}; //contruction already done init, here just for compatible for template
public:
xparam_t& get_param1() {return m_param1;}
xparam_t& get_param2() {return m_param2;}
xparam_t& get_param3() {return m_param3;}
xparam_t* get_result() {return m_result;}
uint32_t get_taskid() const {return m_task_id;}
int get_last_err_code() const {return m_last_err_code;}
void set_last_err_code(const int code){m_last_err_code = code;}
public:
bool operator()(const int32_t thread_id,const int64_t timestamp_ms)
{
return (m_function)(*this,thread_id,timestamp_ms);
}
private:
xcallback_t m_function;
xparam_t m_param1;
xparam_t m_param2;
xparam_t m_param3;
xparam_t* m_result; //at most time, m_result is empty as asynchronization
uint32_t m_task_id; //taskid for this call
int m_last_err_code;
};
//io-related status
enum enum_xobject_status
{
enum_xobject_status_invalid = 0, //invalid status
enum_xobject_status_inited = 1, //io-object inited
enum_xobject_status_attached = 2, //io-object attached to host thread
enum_xobject_status_open = 5, //ready to use
enum_xobject_status_error = 6, //has eror happend
enum_xobject_status_closed = 7, //been closed status
};
//xiobject_t force requirement: object must be plug at io-thread of m_thread_id, maybe has mailbox
//io object can received command at host thread, command also can be delivered to xiobject_t
//every io-object must be created by new
class xiobject_t : public xobject_t
{
friend class xdatabox_t;
friend class xmailbox_t;
friend class xiosignaler_t;
enum {enum_max_plugins_count = 8};
protected:
//note:_context must be valid until application/process exit
xiobject_t(xcontext_t & _context,enum_xobject_type eType);//attach iobject at current thread
xiobject_t(xcontext_t & _context,const int32_t target_thread_id,enum_xobject_type eType);//attach this object to target_thread_id
virtual ~xiobject_t();
private:
xiobject_t();
xiobject_t(const xiobject_t&);
xiobject_t& operator = (const xiobject_t &);
public:
uint64_t get_time_now();
inline enum_xobject_status get_status() const {return m_status;}
inline int32_t get_thread_id() const {return m_thread_id;}//0 means no-bind to any thread yet
inline xdatabox_t* get_databox() const {return m_ptr_databox;}//packet is deliver to on_databox_packet
inline xiosignaler_t* get_signaler() const {return m_ptr_signaler;}
inline xcontext_t* get_context() const {return m_ptr_context;}
public: //multiple-thread safe
virtual bool is_close() override;
virtual bool close(bool force_async = true) override; //must call close before release object,otherwise object never be cleanup
virtual void* query_interface(const int32_t type) override;//caller respond to cast (void*) to related interface ptr
//virtual int32_t add_ref() override;
//virtual int32_t release_ref() override;
//allow create own mailbox object
bool create_mailbox(int32_t min_batch_read = -1,int32_t max_batch_read = -1,int32_t max_queue_len = 65535);
//allow create own databox object
bool create_databox(int32_t min_batch_read = -1,int32_t max_batch_read = -1,int32_t max_queue_len = 65535);
//allow create own signaler object
bool create_signaler();
public: //multiple thread safe,return error code if fail, refer enum_xerror_code
//Note: signal/post api execute the xcall_t at target thread through it's own mailbox or the thread'mailbox
//send() is 100% asynchronize,it ensure to execute call at target thread as the order,
//pass 0 for cur_thread_id if dont know actualy thread id
virtual int32_t send_call(xcall_t & call,int32_t cur_thread_id = 0); //send cmd and wakeup target io-thread,can be called at any thread
//dispatch() might execute immediately if now it is at target thread,otherwise do send()
virtual int32_t dispatch_call(xcall_t & call,int32_t cur_thread_id = 0);
//post is the optmization for larget amount xcall_t who need to deliver to target thread as bunch mode
virtual int32_t post_call(xcall_t & call,int32_t cur_thread_id = 0); //just pass data ,not singal to wake up thread immidiately
virtual int32_t signal_call(int32_t cur_thread_id = 0); //just wakeup the io-thread of this io object
virtual int32_t count_calls(int64_t & total_in, int64_t & total_out); //count how many calls are pending at queue, it is useful for debug purpose
public: //plugin query and register
//note:query_plugin search horizontally from this to parent for plugin
//caller respond to cast (xobject_t*) to related object ptr,and release_ref it as query_plugin has done add_ref before return
virtual xobject_t* query_plugin(const std::string & plugin_uri) override; //uri must be formated as ./name, ../name, */name, or name
//register a plugin with name of object, must finish all registeration at init stage of object for multiple-thread safe
virtual bool register_plugin(xobject_t * plugin_ptr) override;
protected:
//on_object_close be called when close command processed at host thread,logic flow: Caller(Thread#A)->Close()->Wake this object thread(B)->clean and then execute: on_object_close
virtual bool on_object_close(); //notify the subclass the object is closed
//packet is from send(xpacket_t & packet) or dispatch(xpacket_t & packet) of xdatabox_t
//subclass need overwrite this virtual function if they need support signal(xpacket_t) or send(xpacket_t),only allow called internally
virtual bool on_databox_open(xpacket_t & packet,int32_t cur_thread_id, uint64_t time_now_ms){return false;}
//notify the owner of mailbox and give owner one chance to process something else
//mailbox do default action(execute xcall_t) when return false(owner object not process it)
virtual bool on_mailbox_open(xcall_t & call,int32_t cur_thread_id, uint64_t time_now_ms){return false;}//return false if not handle this
//xobject may bind a xiosignal object that may trigger and wakeup the host thread when need
//signal is going close when receive error_code as enum_xerror_code_close
//return false means object not handled this event
virtual bool on_signal_up(int32_t error_code,int32_t cur_thread_id, uint64_t time_now_ms) override {return true;}
public://allow send/post/dispatch general lambda function to execute
#ifdef __GCC_50_OR_ABOVE__
template<class _Rp, class ..._ArgTypes>
bool send_call(const std::function<_Rp(_ArgTypes...)> & job_function,_ArgTypes... margs)
{
//safe to use this inside of lambda since this already been encoded into xcall
std::function<_Rp(_ArgTypes...)> * _job_function_ptr = new std::function<_Rp(_ArgTypes...)>(job_function);
auto _internal_asyn_function = [_job_function_ptr,margs...](base::xcall_t & call, const int32_t cur_thread_id,const uint64_t timenow_ms)->bool{
(*_job_function_ptr)(margs...);
delete _job_function_ptr; //now delete the attached function ptr
return true;
};
xcall_t asyn_call(_internal_asyn_function,(xobject_t*)this);//allow use this ptr safely
if(send_call(asyn_call,0) == enum_xcode_successful)
return true;
return false;
}
template<class _Rp, class ..._ArgTypes>
bool dispatch_call(const std::function<_Rp(_ArgTypes...)> & job_function,_ArgTypes... margs)
{
//safe to use this inside of lambda since this already been encoded into xcall
std::function<_Rp(_ArgTypes...)> * _job_function_ptr = new std::function<_Rp(_ArgTypes...)>(job_function);
auto _internal_asyn_function = [_job_function_ptr,margs...](base::xcall_t & call, const int32_t cur_thread_id,const uint64_t timenow_ms)->bool{
(*_job_function_ptr)(margs...);
delete _job_function_ptr; //now delete the attached function ptr
return true;
};
xcall_t asyn_call(_internal_asyn_function,(xobject_t*)this);//allow use this ptr safely
if(dispatch_call(asyn_call,0) == enum_xcode_successful)
return true;
return false;
}
template<class _Rp, class ..._ArgTypes>
bool post_call(const std::function<_Rp(_ArgTypes...)> & job_function,_ArgTypes... margs)
{
//safe to use this inside of lambda since this already been encoded into xcall
std::function<_Rp(_ArgTypes...)> * _job_function_ptr = new std::function<_Rp(_ArgTypes...)>(job_function);
auto _internal_asyn_function = [_job_function_ptr,margs...](base::xcall_t & call, const int32_t cur_thread_id,const uint64_t timenow_ms)->bool{
(*_job_function_ptr)(margs...);
delete _job_function_ptr; //now delete the attached function ptr
return true;
};
xcall_t asyn_call(_internal_asyn_function,(xobject_t*)this);//allow use this ptr safely
if(post_call(asyn_call,0) == enum_xcode_successful)
return true;
return false;
}
#else
bool send_call(const xfunction_t& job_function,void* param);
bool dispatch_call(const xfunction_t& job_function,void* param);
bool post_call(const xfunction_t& job_function,void* param);
#endif //end of __GCC_50_OR_ABOVE__
protected:
inline xmailbox_t* get_mailbox() const {return m_ptr_mailbox;}
inline xiothread_t* get_thread() const {return m_ptr_thread;}
int32_t get_current_thread_id();
void set_status(enum_xobject_status newstatue);
uint64_t update_time_now();//trigger refresh time to more accurately and return latest time now.carefully: it ask call at host thread
private:
virtual bool lock() {return false;} //if need subclass may provde lock function and unlock
virtual bool unlock(){return false;}
private:
xiosignaler_t* m_ptr_signaler; //dedicated signaler of this object,it usally be NULL
xdatabox_t * m_ptr_databox; //dedicated databox of this object, it usally be NULL
xmailbox_t * m_ptr_mailbox; //dedicated mailbox of this object, it usally be NULL
xiothread_t * m_ptr_thread; //associated io thread with this io object
xcontext_t * m_ptr_context; //associated with global context object
int32_t m_thread_id; //the logic thread id whom this object belong to under m_pContext
enum_xobject_status m_status; //status of io object
private: //note: only support max 8 plugins for one object as considering size and reality
xobject_t* m_plugins[enum_max_plugins_count];
};
enum enum_xevent_route_path
{
enum_xevent_route_path_down = 0, //event go down from higher layer/object to lower layer
enum_xevent_route_path_up = 1, //event go up from lower layer/object to upper layer
};
enum enum_xevent_type
{
enum_xevent_app_type_max = 32765,
enum_xevent_app_type_min = 1,
enum_xevent_type_invalid = 0, //below event types are reserved by xbase
enum_xevent_core_type_pdu = -5, //that is a event of pdu(usally from/to network)
enum_xevent_core_type_timer = -6, //that is a system event of time,more about NTP change
enum_xevent_core_type_clock = -7, //that is an event of global clock(distributed logic clock)
enum_xevent_core_type_create_block = -8, //that is an event to create block by upper layer and pass back to lowwer layer
enum_xevent_core_type_tc = -9, //that is an event of time cert block
};
//general event wrap
class xvevent_t : public xobject_t
{
protected:
xvevent_t(const int _event_type);
virtual ~xvevent_t();
private:
xvevent_t();
xvevent_t(const xvevent_t & obj);
xvevent_t& operator = (const xvevent_t & obj);
public:
const int get_type() const {return m_event_type;}
const int get_priority() const {return m_event_priority;}
const int get_error_code() const {return m_error_code;}
const std::string& get_result_data() const {return m_result_data;}
const xvip2_t& get_from_xip() const {return m_from_xip;}
const xvip2_t& get_to_xip() const {return m_to_xip;}
const uint64_t get_cookie() const {return m_event_cookie;}
const uint64_t get_clock() const {return m_event_clock;}
void set_from_xip(const xvip2_t & from) {m_from_xip = from;}
void set_to_xip(const xvip2_t & to) {m_to_xip = to;}
void set_cookie(const uint64_t cookie){m_event_cookie = cookie;}
void set_clock(const uint64_t clock){m_event_clock = clock;}
enum_xevent_route_path get_route_path() const; //default is enum_xevent_route_path_down
void set_route_path(enum_xevent_route_path path); //mark what is direction of this event will route
virtual void* query_interface(const int32_t type) override; //caller respond to cast (void*) to related interface ptr
private:
xvip2_t m_from_xip; //from address
xvip2_t m_to_xip; //target address ,-1 means to broadcast everyone,and 0 means anyone may handle,
uint64_t m_event_clock; //application set latest clock(it might global clock time or height)
uint64_t m_event_cookie; //application may set cookie let event carry
int16_t m_event_type; //init as 0 that is invalid
protected:
int16_t m_event_priority;//priority level for event
int16_t m_reserved_for_event;
int16_t m_error_code; //default it is 0 = successful
std::string m_result_data; //default it is empty
};
//xionode_t manage chain structure with parent & child xiobject
class xionode_t : public xiobject_t
{
protected:
xionode_t(xionode_t & parent_object,enum_xobject_type eType);
xionode_t(xcontext_t & _context,const int32_t thread_id,enum_xobject_type eType);
virtual ~xionode_t();
private:
xionode_t();
xionode_t(const xionode_t &);
xionode_t & operator = (const xionode_t &);
public:
xionode_t* get_parent_node() const;
xionode_t* get_child_node() const;
const xvip2_t & get_xip2_addr() const{return m_xip2_addr;}
const xvip_t get_xip2_low_addr() const{return m_xip2_addr.low_addr;}
const uint64_t get_xip2_high_addr()const{return m_xip2_addr.high_addr;}
//note: query_interface search vertically from subclass ->parent class ->root class of this object
//note: query_interface not involve add_ref operation,so caller need do it manually if need keep returned ptr longer
//caller respond to cast (void*) to related interface ptr
virtual void* query_interface(const int32_t type) override;
//note:query_plugin search horizontally from this to parent for plugin
//caller respond to cast (xobject_t*) to related object ptr,and release_ref it as query_plugin has done add_ref before return
virtual xobject_t* query_plugin(const std::string & plugin_uri) override; //uri must be formated as ./name, ../name, */name, or name
virtual bool is_match(const xvip2_t& xip_address);
public://attach/detach childnode to this node at multiple thread safe
virtual bool attach_child_node(xionode_t * child_node,const xvip2_t & child_address,const std::string extra_data);
virtual bool detach_child_node(xionode_t * child_node,const std::string extra_data);
public: //multiple_thread safe
//push_event_up: throw event from lower(child) layer to higher(parent)
bool push_event_up(const xvevent_t & event ,xionode_t* from_child,int32_t cur_thread_id,uint64_t timenow_ms);
//push_event_down: push event from higher(parent) layer to lower(child)
bool push_event_down(const xvevent_t & event ,xionode_t* from_parent,int32_t cur_thread_id,uint64_t timenow_ms);
protected: //guanrentee be called at object'thread,triggered by push_event_up or push_event_down
//note: to return false may call parent'push_event_up,or stop further routing when return true
virtual bool on_event_up(const xvevent_t & event,xionode_t* from_child,const int32_t cur_thread_id,const uint64_t timenow_ms);
//note: to return false may call child'push_event_down,or stop further routing when return true
virtual bool on_event_down(const xvevent_t & event,xionode_t* from_parent,const int32_t cur_thread_id,const uint64_t timenow_ms);
protected: //notify self about event of childnode join & leave,at self thread (so multiple thread safe)
//notify has child-node joined this node,errorcode refer enum_error_code ,return true when the event is handled
virtual bool on_child_node_join(const int32_t error_code,const int32_t cur_thread_id,const uint64_t timenow_ms,xionode_t* childnode);
//notify has child-node left from this node,
virtual bool on_child_node_leave(const int32_t error_code,const int32_t cur_thread_id,const uint64_t timenow_ms,xionode_t* childnode);
protected://callbacked from parent node to notify result of attach & detach (note:called from parent'running thread)
//notify this node that is joined into parent-node
virtual bool on_join_parent_node(const int32_t error_code,const int32_t cur_thread_id,const uint64_t timenow_ms,const xvip2_t & alloc_address,const std::string & extra_data,xionode_t* from_parent);
//notify this node that is left from parent-node
virtual bool on_leave_parent_node(const int32_t error_code,const int32_t cur_thread_id,const uint64_t timenow_ms,const std::string & extra_data,xionode_t* from_parent);
protected:
virtual bool on_object_close() override; //notify the subclass the object is closed
virtual bool reset_xip_addr(const xvip2_t & new_addr); //reserved for future to replace this xip2 address
private:
xionode_t* m_parent_node;
xionode_t* m_child_node;
xvip2_t m_xip2_addr; //see detail definition at xbase.h for xip2
};
//for xfd_events_t
enum enum_xfd_event_type
{
//poll event(compatible with libuv)
/*
enum uv_poll_event {
UV_READABLE = 1,
UV_WRITABLE = 2,
UV_DISCONNECT = 4,
UV_PRIORITIZED = 8
};
*/
enum_xio_event_poll_in = 1 << 0, //when data can read
enum_xio_event_poll_out = 1 << 1, //when data send out/write
enum_xio_event_poll_disconnect = 1 << 2, //want socket/handle is disconnected
enum_xio_event_poll_prioritized = 1 << 3, //event is used to watch for sysfs interrupts or TCP out-of-band messages.
enum_xio_event_poll_idle = 1 << 4, //want on_sock_idle
enum_xio_event_poll_err = 1 << 5, //eror on file description
enum_xio_event_poll_hup = 1 << 6, //file description hangup
enum_xio_event_poll_invalid = 1 << 7, //invalid file description
enum_xio_event_poll_close = enum_xio_event_poll_err | enum_xio_event_poll_hup | enum_xio_event_poll_invalid,
enum_xio_event_poll_all = enum_xio_event_poll_in | enum_xio_event_poll_out | enum_xio_event_poll_disconnect | enum_xio_event_poll_prioritized | enum_xio_event_poll_idle | enum_xio_event_poll_close,
};
class xiosink_t : virtual public xrefcount_t
{
protected:
xiosink_t(){};
virtual ~xiosink_t(){};
private:
xiosink_t(const xiosink_t &);
xiosink_t & operator = (const xiosink_t &);
public: //return true when the event is handled
//xiohandle_t attached into io-thread of the target thread(host) ,return true when the event is handled
virtual bool on_iohandle_attach(xfd_handle_t handle,const int32_t error_code,const int32_t cur_thread_id,const uint64_t timenow_ms) = 0;
//xiohandle_t detached from io-thread of the target(host) thread,return true when the event is handled
virtual bool on_iohandle_detach(xfd_handle_t handle,const int32_t error_code,const int32_t cur_thread_id,const uint64_t timenow_ms) = 0;//detach means it detach from io-thread but maybe the fdhandle(socket) is still valid
//handle is closed and inited by caller if error_code is 0
virtual bool on_iohandle_close(const int32_t error_code,xfd_handle_t handle,const int32_t cur_thread_id,const uint64_t timenow_ms) = 0;
//readable event; return new fd_events_t if want change listened events,b_handled indicate whether it is processed
//when no-longer need this event set watchEvents to 0,which emove it from loop
//return true when the event is handled
virtual bool on_iohandle_read(xfd_handle_t handle,xfd_events_t & watchEvents,const int32_t cur_thread_id,const uint64_t timenow_ms) = 0;
//writeable event;return new fd_events_t if want change listened,and b_handled indicate whether event is handled or not
//when no-longer need this event set watchEvents to 0 which will remove it from loop
//return true when the event is handled
virtual bool on_iohandle_write(xfd_handle_t handle,xfd_events_t & watchEvents,const int32_t cur_thread_id,const uint64_t timenow_msd) = 0;
};
//xiohandle_t wrap posix descrptor with reactor mode
class xiohandle_t : public xiobject_t
{
public:
static bool set_nonblock(xfd_handle_t socket,bool non_block_or_not);
protected:
xiohandle_t(xcontext_t & _context,int32_t host_thread_id,xfd_handle_t rawHandle,xiosink_t * event_receiver);
virtual ~xiohandle_t();
private:
xiohandle_t();
xiohandle_t(const xiohandle_t &);
xiohandle_t & operator = (const xiohandle_t &);
public: //thread safe to call from any thread
xfd_handle_t get_handle() const {return m_raw_handle;} //query the raw handle
xfd_events_t get_events() const {return m_watch_events;}//refer enum_fd_event_type
public: //thread safe to call from any thread
virtual void* query_interface(const int32_t type) override; //caller respond to cast (void*) to related interface ptr
//multiple threads safe and return error code refer enum_error_code,queue_up indicate whether force asyncronize
virtual int32_t attach(bool queue_up = false); //start create native handle really and start to attach to host thread
virtual int32_t detach(bool queue_up = false); //stop to receive event. caller may call attach() to start receive signal
//start/stop receive on_iohandle_read(),multiple threads safe
virtual int32_t start_read(int32_t cur_thread_id) = 0;
virtual int32_t stop_read(int32_t cur_thread_id) = 0;
//start/stop receive on_iohandle_write(),multiple threads safe
virtual int32_t start_write(int32_t cur_thread_id) = 0;
virtual int32_t stop_write(int32_t cur_thread_id) = 0;
//combine function
virtual int32_t start_write_read(int32_t cur_thread_id) = 0;
protected: //can only be called from host thread
//xiohandle_t attached into io-thread of the target thread(host) ,return true when the event is handled
virtual bool on_iohandle_attach(const int32_t error_code,const int32_t cur_thread_id,const uint64_t timenow_ms);
//xiohandle_t detached from io-thread of the target(host) thread,return true when the event is handled
virtual bool on_iohandle_detach(const int32_t error_code,const int32_t cur_thread_id,const uint64_t timenow_ms); //detach means it detach from io-thread but maybe the fdhandle(socket) is still valid
//readable event; return new fd_events_t if want change listened events,b_handled indicate whether it is processed
//when no-longer need this event set watchEvents to 0,which emove it from loop
//return true when the event is handled,sock_read_buf indicate how many bytes can read from buffer if sock_read_buf >=0, it means dont know if sock_read_buf < 0
virtual bool on_iohandle_read(xfd_events_t & watchEvents,const int32_t cur_thread_id,const uint64_t timenow_ms);
//writeable event;return new fd_events_t if want change listened,b_handled indicate event is handled or not
//when no-longer need this event set watchEvents to 0 which will remove it from loop,return true when the event is handled
//sock_write_buf indicate how many bytes may write to socket if sock_write_buf >=0,it means dont know if sock_write_buf < 0
virtual bool on_iohandle_write(xfd_events_t & watchEvents,const int32_t cur_thread_id,const uint64_t timenow_msd);
//return true when the event is handled
virtual bool on_iohandle_close(const int32_t error_code,const int32_t cur_thread_id,const uint64_t timenow_ms);
//can only be called from host thread
virtual bool on_object_close() override; //notify the subclass the object is closed
private:
virtual bool process_iohandle_attach_cmd(xcall_t & cmd,const int32_t cur_thread_id,const uint64_t timenow_ms) = 0; //must implement
virtual bool process_iohandle_detach_cmd(xcall_t & cmd,const int32_t cur_thread_id,const uint64_t timenow_ms) = 0; //must implement
public:
//advance use case: provider(like libcur) that manage real socket handle self to share for mutiple session so xiohandle_t can not close them,to solve the request we have to provide reset_handle before xiohandle_t.close()
//and other case xiohandle_t and subclass may manage this handle and close it before object destroy
xfd_handle_t reset_handle();
protected:
void set_events(xfd_events_t new_event) {m_watch_events = new_event;} //refer enum_fd_event_type
xfd_handle_t close_handle(); //return the closed handle to reference
protected:
xiosink_t * m_ptr_event_receiver;
void* m_padding_sizeof_pointer;
private:
xfd_handle_t m_raw_handle; //must valid and unique as system wide
xfd_events_t m_watch_events; //refer enum_fd_event_type,the events socket currently watching/monitor
};
//use case: xauto_ptr is mixed with std:unique_ptr and std::shared_ptr, but with limited functions
//usally receive the ptr of created object, and release it when go out of scope
template<typename T> //T must be subclass of i_refcount_t
class xauto_ptr
{
static_assert(std::is_base_of<xrefcount_t, T>::value, "T must be type from xrefcount_t");
public:
xauto_ptr(std::nullptr_t)
:m_raw_ptr(nullptr)
{
}
xauto_ptr(const T * obj_ptr)
:m_raw_ptr((T*)obj_ptr)
{
}
xauto_ptr(T & obj_ref)
:m_raw_ptr(&obj_ref)
{
}
xauto_ptr(xauto_ptr && moved)
:m_raw_ptr(moved.m_raw_ptr)
{
moved.m_raw_ptr = nullptr;
}
~xauto_ptr()
{
if(m_raw_ptr != nullptr)
m_raw_ptr->release_ref();
}
//test whether it is valid or not
inline operator bool () const noexcept {return (m_raw_ptr != nullptr);}
inline bool operator == (std::nullptr_t) const noexcept {return (m_raw_ptr == nullptr);}
inline bool operator != (std::nullptr_t) const noexcept {return (m_raw_ptr != nullptr);}
//get raw ptr
inline T * operator ()() const noexcept {return m_raw_ptr;}
inline T * get() const noexcept {return m_raw_ptr;}
inline T* operator ->() const noexcept {return m_raw_ptr;}
//test first before call it
inline T& operator *() const noexcept {return *m_raw_ptr;}
//disable this convenient way as that compiler might do copy-elision that might cause issue,e.g. T * ptr = xauto_ptr<T> get_xxx()
//inline operator T* () const noexcept {return m_raw_ptr;}
//inline operator const T* ()const noexcept {return m_raw_ptr;}
protected:
xauto_ptr(const xauto_ptr & other)
:m_raw_ptr((T*)other.m_raw_ptr)
{
if(m_raw_ptr != nullptr)
m_raw_ptr->add_ref();//gain reference
}
private:
xauto_ptr();
xauto_ptr & operator = (const xauto_ptr &);
protected:
xauto_ptr & operator = (xauto_ptr && moved)
{
if (this != &moved)
{
T * old_ptr = m_raw_ptr;
m_raw_ptr = moved.m_raw_ptr;
moved.m_raw_ptr = nullptr;
if(old_ptr != nullptr)
old_ptr->release_ref();
}
return *this;
}
protected:
T * m_raw_ptr;
};
template<class T>
bool operator!=(std::nullptr_t, xauto_ptr<T> const & ptr) { return (ptr != nullptr);}
template<class T>
bool operator==(std::nullptr_t, xauto_ptr<T> const & ptr) { return (ptr == nullptr);}
template<typename T, typename ... ArgsT>
xauto_ptr<T>
make_auto_ptr(ArgsT && ... args) {
return xauto_ptr<T>(new T(std::forward<ArgsT>(args)... ));
}
//auto addref when contruct and do releaes_ref when out of scope
template<typename T>
class auto_reference : public xauto_ptr<T>
{
typedef xauto_ptr<T> base;
public:
auto_reference(T * obj_ptr)
:base(obj_ptr)
{
if(obj_ptr != nullptr)
obj_ptr->add_ref();
}
auto_reference(T & obj_ref)
:base(obj_ref)
{
obj_ref.add_ref();
}
~auto_reference(){};//xauto_ptr::~xauto_ptr will do release
private:
auto_reference();
auto_reference(const auto_reference &);
auto_reference & operator = (const auto_reference &);
};
};//end of namespace of base
}; //end of namespace top