Changeset 36 for pjproject/main/pjlib/include/pj++/proactor.hpp
- Timestamp:
- Nov 9, 2005 3:37:19 PM (19 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/main/pjlib/include/pj++/proactor.hpp
r29 r36 1 1 /* $Id$ 2 *3 2 */ 4 #ifndef __PJPP_ EVENT_HANDLER_H__5 #define __PJPP_ EVENT_HANDLER_H__3 #ifndef __PJPP_PROACTOR_H__ 4 #define __PJPP_PROACTOR_H__ 6 5 7 6 #include <pj/ioqueue.h> … … 9 8 #include <pj++/sock.hpp> 10 9 #include <pj++/timer.hpp> 11 12 class PJ_Proactor; 13 14 15 class PJ_Event_Handler 10 #include <pj/errno.h> 11 12 class Pj_Proactor; 13 class Pj_Event_Handler; 14 15 16 ////////////////////////////////////////////////////////////////////////////// 17 // Asynchronous operation key. 18 // 19 // Applications may inheric this class to put their application 20 // specific data. 21 // 22 class Pj_Async_Op : public pj_ioqueue_op_key_t 16 23 { 17 friend class PJ_Proactor;18 24 public: 19 PJ_Event_Handler(); 20 virtual ~PJ_Event_Handler(); 21 22 virtual pj_oshandle_t get_handle() = 0; 23 24 bool read(void *buf, pj_size_t len); 25 bool recvfrom(void *buf, pj_size_t len, PJ_INET_Addr *addr); 26 bool write(const void *data, pj_size_t len); 27 bool sendto(const void *data, pj_size_t len, const PJ_INET_Addr &addr); 25 // 26 // Constructor. 27 // 28 explicit Pj_Async_Op(Pj_Event_Handler *handler) 29 : handler_(handler) 30 { 31 pj_memset(this, 0, sizeof(pj_ioqueue_op_key_t)); 32 } 33 34 // 35 // Check whether operation is still pending for this key. 36 // 37 bool is_pending(); 38 39 // 40 // Cancel the operation. 41 // 42 bool cancel(pj_ssize_t bytes_status=-PJ_ECANCELLED); 43 44 protected: 45 Pj_Event_Handler *handler_; 46 }; 47 48 49 ////////////////////////////////////////////////////////////////////////////// 50 // Event handler. 51 // 52 // Applications should inherit this class to receive various event 53 // notifications. 54 // 55 // Applications should implement get_socket_handle(). 56 // 57 class Pj_Event_Handler : public Pj_Object 58 { 59 friend class Pj_Proactor; 60 public: 61 // 62 // Default constructor. 63 // 64 Pj_Event_Handler() 65 : key_(NULL) 66 { 67 pj_memset(&timer_, 0, sizeof(timer_)); 68 timer_.user_data = this; 69 timer_.cb = &timer_callback; 70 } 71 72 // 73 // Destroy. 74 // 75 virtual ~Pj_Event_Handler() 76 { 77 unregister(); 78 } 79 80 // 81 // Unregister this handler from the ioqueue. 82 // 83 void unregister() 84 { 85 if (key_) { 86 pj_ioqueue_unregister(key_); 87 key_ = NULL; 88 } 89 } 90 91 // 92 // Get socket handle associated with this. 93 // 94 virtual pj_sock_t get_socket_handle() 95 { 96 return PJ_INVALID_SOCKET; 97 } 98 99 // 100 // Receive data. 101 // 102 pj_status_t recv( Pj_Async_Op *op_key, 103 void *buf, pj_ssize_t *len, 104 unsigned flags) 105 { 106 return pj_ioqueue_recv( key_, op_key, 107 buf, len, flags); 108 } 109 110 // 111 // Recvfrom() 112 // 113 pj_status_t recvfrom( Pj_Async_Op *op_key, 114 void *buf, pj_ssize_t *len, unsigned flags, 115 Pj_Inet_Addr *addr) 116 { 117 addr->addrlen_ = sizeof(Pj_Inet_Addr); 118 return pj_ioqueue_recvfrom( key_, op_key, buf, len, flags, 119 addr, &addr->addrlen_ ); 120 } 121 122 // 123 // send() 124 // 125 pj_status_t send( Pj_Async_Op *op_key, 126 const void *data, pj_ssize_t *len, 127 unsigned flags) 128 { 129 return pj_ioqueue_send( key_, op_key, data, len, flags); 130 } 131 132 // 133 // sendto() 134 // 135 pj_status_t sendto( Pj_Async_Op *op_key, 136 const void *data, pj_ssize_t *len, unsigned flags, 137 const Pj_Inet_Addr &addr) 138 { 139 return pj_ioqueue_sendto(key_, op_key, data, len, flags, 140 &addr, sizeof(addr)); 141 } 142 28 143 #if PJ_HAS_TCP 29 bool connect(const PJ_INET_Addr &addr); 30 bool accept(PJ_Socket *sock, PJ_INET_Addr *local=NULL, PJ_INET_Addr *remote=NULL); 144 // 145 // connect() 146 // 147 pj_status_t connect(const Pj_Inet_Addr &addr) 148 { 149 return pj_ioqueue_connect(key_, &addr, sizeof(addr)); 150 } 151 152 // 153 // accept. 154 // 155 pj_status_t accept( Pj_Async_Op *op_key, 156 Pj_Socket *sock, 157 Pj_Inet_Addr *local = NULL, 158 Pj_Inet_Addr *remote = NULL) 159 { 160 int *addrlen = local ? &local->addrlen_ : NULL; 161 return pj_ioqueue_accept( key_, op_key, &sock->sock_, 162 local, remote, addrlen ); 163 } 164 31 165 #endif 32 166 33 167 protected: 34 // 168 ////////////////// 35 169 // Overridables 36 // 37 virtual void on_timeout(int data) {} 38 virtual void on_read_complete(pj_ssize_t bytes_read) {} 39 virtual void on_write_complete(pj_ssize_t bytes_sent) {} 170 ////////////////// 171 172 // 173 // Timeout callback. 174 // 175 virtual void on_timeout(int data) 176 { 177 } 178 179 // 180 // On read complete callback. 181 // 182 virtual void on_read_complete( Pj_Async_Op *op_key, 183 pj_ssize_t bytes_read) 184 { 185 } 186 187 // 188 // On write complete callback. 189 // 190 virtual void on_write_complete( Pj_Async_Op *op_key, 191 pj_ssize_t bytes_sent) 192 { 193 } 194 40 195 #if PJ_HAS_TCP 41 virtual void on_connect_complete(int status) {} 42 virtual void on_accept_complete(int status) {} 196 // 197 // On connect complete callback. 198 // 199 virtual void on_connect_complete(pj_status_t status) 200 { 201 } 202 203 // 204 // On new connection callback. 205 // 206 virtual void on_accept_complete( Pj_Async_Op *op_key, 207 pj_sock_t new_sock, 208 pj_status_t status) 209 { 210 } 211 43 212 #endif 44 213 214 45 215 private: 46 PJ_Proactor *proactor_;47 216 pj_ioqueue_key_t *key_; 48 217 pj_timer_entry timer_; 49 int tmp_recvfrom_addr_len; 50 51 public: 52 // Internal IO Queue/timer callback. 53 static void timer_callback( pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry); 54 static void read_complete_cb(pj_ioqueue_key_t *key, pj_ssize_t bytes_read); 55 static void write_complete_cb(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent); 56 static void accept_complete_cb(pj_ioqueue_key_t *key, int status); 57 static void connect_complete_cb(pj_ioqueue_key_t *key, int status); 218 219 friend class Pj_Proactor; 220 friend class Pj_Async_Op; 221 222 // 223 // Static timer callback. 224 // 225 static void timer_callback( pj_timer_heap_t *timer_heap, 226 struct pj_timer_entry *entry) 227 { 228 Pj_Event_Handler *handler = 229 (Pj_Event_Handler*) entry->user_data; 230 231 handler->on_timeout(entry->id); 232 } 58 233 }; 59 234 60 class PJ_Proactor 235 inline bool Pj_Async_Op::is_pending() 236 { 237 return pj_ioqueue_is_pending(handler_->key_, this) != 0; 238 } 239 240 inline bool Pj_Async_Op::cancel(pj_ssize_t bytes_status) 241 { 242 return pj_ioqueue_post_completion(handler_->key_, this, 243 bytes_status) == PJ_SUCCESS; 244 } 245 246 ////////////////////////////////////////////////////////////////////////////// 247 // Proactor 248 // 249 class Pj_Proactor : public Pj_Object 61 250 { 62 251 public: 63 static PJ_Proactor *create(PJ_Pool *pool, pj_size_t max_fd, 64 pj_size_t timer_entry_count, unsigned timer_flags=0); 65 66 void destroy(); 67 68 bool register_handler(PJ_Pool *pool, PJ_Event_Handler *handler); 69 void unregister_handler(PJ_Event_Handler *handler); 70 71 static bool schedule_timer( pj_timer_heap_t *timer, PJ_Event_Handler *handler, 72 const PJ_Time_Val &delay, int id=-1); 73 bool schedule_timer(PJ_Event_Handler *handler, const PJ_Time_Val &delay, int id=-1); 74 bool cancel_timer(PJ_Event_Handler *handler); 75 76 bool handle_events(PJ_Time_Val *timeout); 77 78 pj_ioqueue_t *get_io_queue(); 79 pj_timer_heap_t *get_timer_heap(); 252 // 253 // Default constructor, initializes to NULL. 254 // 255 Pj_Proactor() 256 : ioq_(NULL), th_(NULL) 257 { 258 cb_.on_read_complete = &read_complete_cb; 259 cb_.on_write_complete = &write_complete_cb; 260 cb_.on_accept_complete = &accept_complete_cb; 261 cb_.on_connect_complete = &connect_complete_cb; 262 } 263 264 // 265 // Construct proactor. 266 // 267 Pj_Proactor( Pj_Pool *pool, pj_size_t max_fd, 268 pj_size_t max_timer_entries ) 269 : ioq_(NULL), th_(NULL) 270 { 271 cb_.on_read_complete = &read_complete_cb; 272 cb_.on_write_complete = &write_complete_cb; 273 cb_.on_accept_complete = &accept_complete_cb; 274 cb_.on_connect_complete = &connect_complete_cb; 275 } 276 277 // 278 // Destructor. 279 // 280 ~Pj_Proactor() 281 { 282 destroy(); 283 } 284 285 // 286 // Create proactor. 287 // 288 pj_status_t create( Pj_Pool *pool, pj_size_t max_fd, 289 pj_size_t timer_entry_count) 290 { 291 pj_status_t status; 292 293 destroy(); 294 295 status = pj_ioqueue_create(pool->pool_(), max_fd, &ioq_); 296 if (status != PJ_SUCCESS) 297 return status; 298 299 status = pj_timer_heap_create(pool->pool_(), 300 timer_entry_count, &th_); 301 if (status != PJ_SUCCESS) { 302 pj_ioqueue_destroy(ioq_); 303 ioq_ = NULL; 304 return NULL; 305 } 306 307 status; 308 } 309 310 // 311 // Destroy proactor. 312 // 313 void destroy() 314 { 315 if (ioq_) { 316 pj_ioqueue_destroy(ioq_); 317 ioq_ = NULL; 318 } 319 if (th_) { 320 pj_timer_heap_destroy(th_); 321 th_ = NULL; 322 } 323 } 324 325 // 326 // Register handler. 327 // This will call handler->get_socket_handle() 328 // 329 pj_status_t register_socket_handler(Pj_Pool *pool, 330 Pj_Event_Handler *handler) 331 { 332 return pj_ioqueue_register_sock( pool->pool_(), ioq_, 333 handler->get_socket_handle(), 334 handler, &cb_, &handler->key_ ); 335 } 336 337 // 338 // Unregister handler. 339 // 340 static void unregister_handler(Pj_Event_Handler *handler) 341 { 342 if (handler->key_) { 343 pj_ioqueue_unregister( handler->key_ ); 344 handler->key_ = NULL; 345 } 346 } 347 348 // 349 // Scheduler timer. 350 // 351 bool schedule_timer( Pj_Event_Handler *handler, 352 const Pj_Time_Val &delay, 353 int id=-1) 354 { 355 return schedule_timer(th_, handler, delay, id); 356 } 357 358 // 359 // Cancel timer. 360 // 361 bool cancel_timer(Pj_Event_Handler *handler) 362 { 363 return pj_timer_heap_cancel(th_, &handler->timer_) == 1; 364 } 365 366 // 367 // Handle events. 368 // 369 int handle_events(Pj_Time_Val *max_timeout) 370 { 371 Pj_Time_Val timeout(0, 0); 372 int timer_count; 373 374 timer_count = pj_timer_heap_poll( th_, &timeout ); 375 376 if (timeout.get_sec() < 0) 377 timeout.sec = PJ_MAXINT32; 378 379 /* If caller specifies maximum time to wait, then compare the value 380 * with the timeout to wait from timer, and use the minimum value. 381 */ 382 if (max_timeout && timeout >= *max_timeout) { 383 timeout = *max_timeout; 384 } 385 386 /* Poll events in ioqueue. */ 387 int ioqueue_count; 388 389 ioqueue_count = pj_ioqueue_poll(ioq_, &timeout); 390 if (ioqueue_count < 0) 391 return ioqueue_count; 392 393 return ioqueue_count + timer_count; 394 } 395 396 // 397 // Get the internal ioqueue object. 398 // 399 pj_ioqueue_t *get_io_queue() 400 { 401 return ioq_; 402 } 403 404 // 405 // Get the internal timer heap object. 406 // 407 pj_timer_heap_t *get_timer_heap() 408 { 409 return th_; 410 } 80 411 81 412 private: 82 413 pj_ioqueue_t *ioq_; 83 414 pj_timer_heap_t *th_; 84 85 PJ_Proactor() {} 415 pj_ioqueue_callback cb_; 416 417 static bool schedule_timer( pj_timer_heap_t *timer, 418 Pj_Event_Handler *handler, 419 const Pj_Time_Val &delay, 420 int id=-1) 421 { 422 handler->timer_.id = id; 423 return pj_timer_heap_schedule(timer, &handler->timer_, &delay) == 0; 424 } 425 426 427 // 428 // Static read completion callback. 429 // 430 static void read_complete_cb( pj_ioqueue_key_t *key, 431 pj_ioqueue_op_key_t *op_key, 432 pj_ssize_t bytes_read) 433 { 434 Pj_Event_Handler *handler = 435 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key); 436 437 handler->on_read_complete((Pj_Async_Op*)op_key, bytes_read); 438 } 439 440 // 441 // Static write completion callback. 442 // 443 static void write_complete_cb(pj_ioqueue_key_t *key, 444 pj_ioqueue_op_key_t *op_key, 445 pj_ssize_t bytes_sent) 446 { 447 Pj_Event_Handler *handler = 448 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key); 449 450 handler->on_write_complete((Pj_Async_Op*)op_key, bytes_sent); 451 } 452 453 // 454 // Static accept completion callback. 455 // 456 static void accept_complete_cb(pj_ioqueue_key_t *key, 457 pj_ioqueue_op_key_t *op_key, 458 pj_sock_t new_sock, 459 pj_status_t status) 460 { 461 Pj_Event_Handler *handler = 462 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key); 463 464 handler->on_accept_complete((Pj_Async_Op*)op_key, new_sock, status); 465 } 466 467 // 468 // Static connect completion callback. 469 // 470 static void connect_complete_cb(pj_ioqueue_key_t *key, 471 pj_status_t status) 472 { 473 Pj_Event_Handler *handler = 474 (Pj_Event_Handler*) pj_ioqueue_get_user_data(key); 475 476 handler->on_connect_complete(status); 477 } 478 86 479 }; 87 480 88 #endif /* __PJPP_ EVENT_HANDLER_H__ */481 #endif /* __PJPP_PROACTOR_H__ */
Note: See TracChangeset
for help on using the changeset viewer.