Ignore:
Timestamp:
Nov 9, 2005 3:37:19 PM (19 years ago)
Author:
bennylp
Message:

Rework pjlib++

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/main/pjlib/include/pj++/proactor.hpp

    r29 r36  
    11/* $Id$ 
    2  * 
    32 */ 
    4 #ifndef __PJPP_EVENT_HANDLER_H__ 
    5 #define __PJPP_EVENT_HANDLER_H__ 
     3#ifndef __PJPP_PROACTOR_H__ 
     4#define __PJPP_PROACTOR_H__ 
    65 
    76#include <pj/ioqueue.h> 
     
    98#include <pj++/sock.hpp> 
    109#include <pj++/timer.hpp> 
    11  
    12 class PJ_Proactor; 
    13  
    14  
    15 class PJ_Event_Handler 
     10#include <pj/errno.h> 
     11 
     12class Pj_Proactor; 
     13class Pj_Event_Handler; 
     14 
     15 
     16////////////////////////////////////////////////////////////////////////////// 
     17// Asynchronous operation key. 
     18// 
     19// Applications may inheric this class to put their application 
     20// specific data. 
     21// 
     22class Pj_Async_Op : public pj_ioqueue_op_key_t 
    1623{ 
    17     friend class PJ_Proactor; 
    1824public: 
    19     PJ_Event_Handler(); 
    20     virtual ~PJ_Event_Handler(); 
    21  
    22     virtual pj_oshandle_t get_handle() = 0; 
    23  
    24     bool read(void *buf, pj_size_t len); 
    25     bool recvfrom(void *buf, pj_size_t len, PJ_INET_Addr *addr); 
    26     bool write(const void *data, pj_size_t len); 
    27     bool sendto(const void *data, pj_size_t len, const PJ_INET_Addr &addr); 
     25    // 
     26    // Constructor. 
     27    // 
     28    explicit Pj_Async_Op(Pj_Event_Handler *handler) 
     29        : handler_(handler) 
     30    { 
     31        pj_memset(this, 0, sizeof(pj_ioqueue_op_key_t)); 
     32    } 
     33 
     34    // 
     35    // Check whether operation is still pending for this key. 
     36    // 
     37    bool is_pending(); 
     38 
     39    // 
     40    // Cancel the operation. 
     41    // 
     42    bool cancel(pj_ssize_t bytes_status=-PJ_ECANCELLED); 
     43 
     44protected: 
     45    Pj_Event_Handler *handler_; 
     46}; 
     47 
     48 
     49////////////////////////////////////////////////////////////////////////////// 
     50// Event handler. 
     51// 
     52// Applications should inherit this class to receive various event 
     53// notifications. 
     54// 
     55// Applications should implement get_socket_handle(). 
     56// 
     57class Pj_Event_Handler : public Pj_Object 
     58{ 
     59    friend class Pj_Proactor; 
     60public: 
     61    // 
     62    // Default constructor. 
     63    // 
     64    Pj_Event_Handler() 
     65        : key_(NULL) 
     66    { 
     67        pj_memset(&timer_, 0, sizeof(timer_)); 
     68        timer_.user_data = this; 
     69        timer_.cb = &timer_callback; 
     70    } 
     71     
     72    // 
     73    // Destroy. 
     74    // 
     75    virtual ~Pj_Event_Handler() 
     76    { 
     77        unregister(); 
     78    } 
     79 
     80    // 
     81    // Unregister this handler from the ioqueue. 
     82    // 
     83    void unregister() 
     84    { 
     85        if (key_) { 
     86            pj_ioqueue_unregister(key_); 
     87            key_ = NULL; 
     88        } 
     89    } 
     90 
     91    // 
     92    // Get socket handle associated with this. 
     93    // 
     94    virtual pj_sock_t get_socket_handle() 
     95    { 
     96        return PJ_INVALID_SOCKET; 
     97    } 
     98 
     99    // 
     100    // Receive data. 
     101    // 
     102    pj_status_t recv( Pj_Async_Op *op_key,  
     103                      void *buf, pj_ssize_t *len,  
     104                      unsigned flags) 
     105    { 
     106        return pj_ioqueue_recv( key_, op_key, 
     107                                buf, len, flags); 
     108    } 
     109 
     110    // 
     111    // Recvfrom() 
     112    // 
     113    pj_status_t recvfrom( Pj_Async_Op *op_key,  
     114                          void *buf, pj_ssize_t *len, unsigned flags, 
     115                          Pj_Inet_Addr *addr) 
     116    { 
     117        addr->addrlen_ = sizeof(Pj_Inet_Addr); 
     118        return pj_ioqueue_recvfrom( key_, op_key, buf, len, flags, 
     119                                    addr, &addr->addrlen_ ); 
     120    } 
     121 
     122    // 
     123    // send() 
     124    // 
     125    pj_status_t send( Pj_Async_Op *op_key,  
     126                      const void *data, pj_ssize_t *len,  
     127                      unsigned flags) 
     128    { 
     129        return pj_ioqueue_send( key_, op_key, data, len, flags); 
     130    } 
     131 
     132    // 
     133    // sendto() 
     134    // 
     135    pj_status_t sendto( Pj_Async_Op *op_key, 
     136                        const void *data, pj_ssize_t *len, unsigned flags, 
     137                        const Pj_Inet_Addr &addr) 
     138    { 
     139        return pj_ioqueue_sendto(key_, op_key, data, len, flags, 
     140                                 &addr, sizeof(addr)); 
     141    } 
     142 
    28143#if PJ_HAS_TCP 
    29     bool connect(const PJ_INET_Addr &addr); 
    30     bool accept(PJ_Socket *sock, PJ_INET_Addr *local=NULL, PJ_INET_Addr *remote=NULL); 
     144    // 
     145    // connect() 
     146    // 
     147    pj_status_t connect(const Pj_Inet_Addr &addr) 
     148    { 
     149        return pj_ioqueue_connect(key_, &addr, sizeof(addr)); 
     150    } 
     151 
     152    // 
     153    // accept. 
     154    // 
     155    pj_status_t accept( Pj_Async_Op *op_key, 
     156                        Pj_Socket *sock,  
     157                        Pj_Inet_Addr *local = NULL,  
     158                        Pj_Inet_Addr *remote = NULL) 
     159    { 
     160        int *addrlen = local ? &local->addrlen_ : NULL; 
     161        return pj_ioqueue_accept( key_, op_key, &sock->sock_, 
     162                                  local, remote, addrlen ); 
     163    } 
     164 
    31165#endif 
    32166 
    33167protected: 
    34     // 
     168    ////////////////// 
    35169    // Overridables 
    36     // 
    37     virtual void on_timeout(int data) {} 
    38     virtual void on_read_complete(pj_ssize_t bytes_read) {} 
    39     virtual void on_write_complete(pj_ssize_t bytes_sent) {} 
     170    ////////////////// 
     171 
     172    // 
     173    // Timeout callback. 
     174    // 
     175    virtual void on_timeout(int data)  
     176    { 
     177    } 
     178 
     179    // 
     180    // On read complete callback. 
     181    // 
     182    virtual void on_read_complete( Pj_Async_Op *op_key, 
     183                                   pj_ssize_t bytes_read)  
     184    { 
     185    } 
     186 
     187    // 
     188    // On write complete callback. 
     189    // 
     190    virtual void on_write_complete( Pj_Async_Op *op_key,  
     191                                    pj_ssize_t bytes_sent)  
     192    { 
     193    } 
     194 
    40195#if PJ_HAS_TCP 
    41     virtual void on_connect_complete(int status) {} 
    42     virtual void on_accept_complete(int status) {} 
     196    // 
     197    // On connect complete callback. 
     198    // 
     199    virtual void on_connect_complete(pj_status_t status)  
     200    { 
     201    } 
     202 
     203    // 
     204    // On new connection callback. 
     205    // 
     206    virtual void on_accept_complete( Pj_Async_Op *op_key, 
     207                                     pj_sock_t new_sock, 
     208                                     pj_status_t status)  
     209    { 
     210    } 
     211 
    43212#endif 
    44213 
     214 
    45215private: 
    46     PJ_Proactor      *proactor_; 
    47216    pj_ioqueue_key_t *key_; 
    48217    pj_timer_entry    timer_; 
    49     int               tmp_recvfrom_addr_len; 
    50  
    51 public: 
    52     // Internal IO Queue/timer callback. 
    53     static void timer_callback( pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry); 
    54     static void read_complete_cb(pj_ioqueue_key_t *key, pj_ssize_t bytes_read); 
    55     static void write_complete_cb(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent); 
    56     static void accept_complete_cb(pj_ioqueue_key_t *key, int status); 
    57     static void connect_complete_cb(pj_ioqueue_key_t *key, int status); 
     218 
     219    friend class Pj_Proactor; 
     220    friend class Pj_Async_Op; 
     221 
     222    // 
     223    // Static timer callback. 
     224    // 
     225    static void timer_callback( pj_timer_heap_t *timer_heap,  
     226                                struct pj_timer_entry *entry) 
     227    { 
     228        Pj_Event_Handler *handler =  
     229            (Pj_Event_Handler*) entry->user_data; 
     230 
     231        handler->on_timeout(entry->id); 
     232    } 
    58233}; 
    59234 
    60 class PJ_Proactor 
     235inline bool Pj_Async_Op::is_pending() 
     236{ 
     237    return pj_ioqueue_is_pending(handler_->key_, this) != 0; 
     238} 
     239 
     240inline bool Pj_Async_Op::cancel(pj_ssize_t bytes_status) 
     241{ 
     242    return pj_ioqueue_post_completion(handler_->key_, this,  
     243                                      bytes_status) == PJ_SUCCESS; 
     244} 
     245 
     246////////////////////////////////////////////////////////////////////////////// 
     247// Proactor 
     248// 
     249class Pj_Proactor : public Pj_Object 
    61250{ 
    62251public: 
    63     static PJ_Proactor *create(PJ_Pool *pool, pj_size_t max_fd,  
    64                                pj_size_t timer_entry_count, unsigned timer_flags=0); 
    65  
    66     void destroy(); 
    67  
    68     bool register_handler(PJ_Pool *pool, PJ_Event_Handler *handler); 
    69     void unregister_handler(PJ_Event_Handler *handler); 
    70  
    71     static bool schedule_timer( pj_timer_heap_t *timer, PJ_Event_Handler *handler, 
    72                                 const PJ_Time_Val &delay, int id=-1); 
    73     bool schedule_timer(PJ_Event_Handler *handler, const PJ_Time_Val &delay, int id=-1); 
    74     bool cancel_timer(PJ_Event_Handler *handler); 
    75  
    76     bool handle_events(PJ_Time_Val *timeout); 
    77  
    78     pj_ioqueue_t *get_io_queue(); 
    79     pj_timer_heap_t *get_timer_heap(); 
     252    // 
     253    // Default constructor, initializes to NULL. 
     254    // 
     255    Pj_Proactor() 
     256        : ioq_(NULL), th_(NULL) 
     257    { 
     258        cb_.on_read_complete    = &read_complete_cb; 
     259        cb_.on_write_complete   = &write_complete_cb; 
     260        cb_.on_accept_complete  = &accept_complete_cb; 
     261        cb_.on_connect_complete = &connect_complete_cb; 
     262    } 
     263 
     264    // 
     265    // Construct proactor. 
     266    // 
     267    Pj_Proactor( Pj_Pool *pool, pj_size_t max_fd, 
     268                 pj_size_t max_timer_entries ) 
     269    : ioq_(NULL), th_(NULL) 
     270    { 
     271        cb_.on_read_complete    = &read_complete_cb; 
     272        cb_.on_write_complete   = &write_complete_cb; 
     273        cb_.on_accept_complete  = &accept_complete_cb; 
     274        cb_.on_connect_complete = &connect_complete_cb; 
     275    } 
     276 
     277    // 
     278    // Destructor. 
     279    // 
     280    ~Pj_Proactor() 
     281    { 
     282        destroy(); 
     283    } 
     284 
     285    // 
     286    // Create proactor. 
     287    // 
     288    pj_status_t create( Pj_Pool *pool, pj_size_t max_fd,  
     289                        pj_size_t timer_entry_count) 
     290    { 
     291        pj_status_t status; 
     292 
     293        destroy(); 
     294 
     295        status = pj_ioqueue_create(pool->pool_(), max_fd, &ioq_); 
     296        if (status != PJ_SUCCESS)  
     297            return status; 
     298         
     299        status = pj_timer_heap_create(pool->pool_(),  
     300                                      timer_entry_count, &th_); 
     301        if (status != PJ_SUCCESS) { 
     302            pj_ioqueue_destroy(ioq_); 
     303            ioq_ = NULL; 
     304            return NULL; 
     305        } 
     306         
     307        status; 
     308    } 
     309 
     310    // 
     311    // Destroy proactor. 
     312    // 
     313    void destroy() 
     314    { 
     315        if (ioq_) { 
     316            pj_ioqueue_destroy(ioq_); 
     317            ioq_ = NULL; 
     318        } 
     319        if (th_) { 
     320            pj_timer_heap_destroy(th_); 
     321            th_ = NULL; 
     322        } 
     323    } 
     324 
     325    // 
     326    // Register handler. 
     327    // This will call handler->get_socket_handle() 
     328    // 
     329    pj_status_t register_socket_handler(Pj_Pool *pool,  
     330                                        Pj_Event_Handler *handler) 
     331    { 
     332        return   pj_ioqueue_register_sock( pool->pool_(), ioq_, 
     333                                           handler->get_socket_handle(), 
     334                                           handler, &cb_, &handler->key_ ); 
     335    } 
     336 
     337    // 
     338    // Unregister handler. 
     339    // 
     340    static void unregister_handler(Pj_Event_Handler *handler) 
     341    { 
     342        if (handler->key_) { 
     343            pj_ioqueue_unregister( handler->key_ ); 
     344            handler->key_ = NULL; 
     345        } 
     346    } 
     347 
     348    // 
     349    // Scheduler timer. 
     350    // 
     351    bool schedule_timer( Pj_Event_Handler *handler,  
     352                         const Pj_Time_Val &delay,  
     353                         int id=-1) 
     354    { 
     355        return schedule_timer(th_, handler, delay, id); 
     356    } 
     357 
     358    // 
     359    // Cancel timer. 
     360    // 
     361    bool cancel_timer(Pj_Event_Handler *handler) 
     362    { 
     363        return pj_timer_heap_cancel(th_, &handler->timer_) == 1; 
     364    } 
     365 
     366    // 
     367    // Handle events. 
     368    // 
     369    int handle_events(Pj_Time_Val *max_timeout) 
     370    { 
     371        Pj_Time_Val timeout(0, 0); 
     372        int timer_count; 
     373 
     374        timer_count = pj_timer_heap_poll( th_, &timeout ); 
     375 
     376        if (timeout.get_sec() < 0)  
     377            timeout.sec = PJ_MAXINT32; 
     378 
     379        /* If caller specifies maximum time to wait, then compare the value  
     380         * with the timeout to wait from timer, and use the minimum value. 
     381         */ 
     382        if (max_timeout && timeout >= *max_timeout) { 
     383            timeout = *max_timeout; 
     384        } 
     385 
     386        /* Poll events in ioqueue. */ 
     387        int ioqueue_count; 
     388 
     389        ioqueue_count = pj_ioqueue_poll(ioq_, &timeout); 
     390        if (ioqueue_count < 0) 
     391            return ioqueue_count; 
     392 
     393        return ioqueue_count + timer_count; 
     394    } 
     395 
     396    // 
     397    // Get the internal ioqueue object. 
     398    // 
     399    pj_ioqueue_t *get_io_queue() 
     400    { 
     401        return ioq_; 
     402    } 
     403 
     404    // 
     405    // Get the internal timer heap object. 
     406    // 
     407    pj_timer_heap_t *get_timer_heap() 
     408    { 
     409        return th_; 
     410    } 
    80411 
    81412private: 
    82413    pj_ioqueue_t *ioq_; 
    83414    pj_timer_heap_t *th_; 
    84  
    85     PJ_Proactor() {} 
     415    pj_ioqueue_callback cb_; 
     416 
     417    static bool schedule_timer( pj_timer_heap_t *timer,  
     418                                Pj_Event_Handler *handler, 
     419                                const Pj_Time_Val &delay,  
     420                                int id=-1) 
     421    { 
     422        handler->timer_.id = id; 
     423        return pj_timer_heap_schedule(timer, &handler->timer_, &delay) == 0; 
     424    } 
     425 
     426 
     427    // 
     428    // Static read completion callback. 
     429    // 
     430    static void read_complete_cb( pj_ioqueue_key_t *key,  
     431                                  pj_ioqueue_op_key_t *op_key,  
     432                                  pj_ssize_t bytes_read) 
     433    { 
     434        Pj_Event_Handler *handler =  
     435            (Pj_Event_Handler*) pj_ioqueue_get_user_data(key); 
     436 
     437        handler->on_read_complete((Pj_Async_Op*)op_key, bytes_read); 
     438    } 
     439 
     440    // 
     441    // Static write completion callback. 
     442    // 
     443    static void write_complete_cb(pj_ioqueue_key_t *key,  
     444                                  pj_ioqueue_op_key_t *op_key, 
     445                                  pj_ssize_t bytes_sent) 
     446    { 
     447        Pj_Event_Handler *handler =  
     448            (Pj_Event_Handler*) pj_ioqueue_get_user_data(key); 
     449 
     450        handler->on_write_complete((Pj_Async_Op*)op_key, bytes_sent); 
     451    } 
     452 
     453    // 
     454    // Static accept completion callback. 
     455    // 
     456    static void accept_complete_cb(pj_ioqueue_key_t *key,  
     457                                   pj_ioqueue_op_key_t *op_key, 
     458                                   pj_sock_t new_sock, 
     459                                   pj_status_t status) 
     460    { 
     461        Pj_Event_Handler *handler =  
     462            (Pj_Event_Handler*) pj_ioqueue_get_user_data(key); 
     463 
     464        handler->on_accept_complete((Pj_Async_Op*)op_key, new_sock, status); 
     465    } 
     466 
     467    // 
     468    // Static connect completion callback. 
     469    // 
     470    static void connect_complete_cb(pj_ioqueue_key_t *key,  
     471                                    pj_status_t status) 
     472    { 
     473        Pj_Event_Handler *handler =  
     474            (Pj_Event_Handler*) pj_ioqueue_get_user_data(key); 
     475 
     476        handler->on_connect_complete(status); 
     477    } 
     478 
    86479}; 
    87480 
    88 #endif  /* __PJPP_EVENT_HANDLER_H__ */ 
     481#endif  /* __PJPP_PROACTOR_H__ */ 
Note: See TracChangeset for help on using the changeset viewer.