Ignore:
Timestamp:
Oct 24, 2006 5:13:30 PM (18 years ago)
Author:
bennylp
Message:

Bulk of PJLIB implementations on Symbian: exception framework, errno, OS core, Unicode, pool backend, log (to console), socket, address resolution, select, etc. IOQueue still doesn't work.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • pjproject/branches/symbian/pjlib/src/pj/ioqueue_symbian.cpp

    r686 r788  
    1818 */ 
    1919#include <pj/ioqueue.h> 
     20#include <pj/assert.h> 
     21#include <pj/errno.h> 
     22#include <pj/list.h> 
     23#include <pj/lock.h> 
     24#include <pj/pool.h> 
     25#include <pj/string.h> 
     26 
     27#include "os_symbian.h" 
     28 
     29class CIoqueueCallback; 
     30 
     31/* 
     32 * IO Queue structure. 
     33 */ 
     34struct pj_ioqueue_t 
     35{ 
     36    int              eventCount; 
     37    CPjTimeoutTimer *timeoutTimer; 
     38}; 
     39 
     40 
     41///////////////////////////////////////////////////////////////////////////// 
     42// Class to encapsulate asynchronous socket operation. 
     43// 
     44class CIoqueueCallback : public CActive 
     45{ 
     46public: 
     47    CIoqueueCallback(pj_ioqueue_t *ioqueue, 
     48                     pj_ioqueue_key_t *key, pj_sock_t sock,  
     49                     const pj_ioqueue_callback *cb, void *user_data) 
     50    : CActive(CActive::EPriorityStandard), 
     51          ioqueue_(ioqueue), key_(key), sock_((CPjSocket*)sock), cb_(cb),  
     52          user_data_(user_data), aBufferPtr_(NULL, 0), type_(TYPE_NONE) 
     53    { 
     54        CActiveScheduler::Add(this); 
     55    } 
     56 
     57    // 
     58    // Start asynchronous recv() operation 
     59    // 
     60    pj_status_t StartRead(pj_ioqueue_op_key_t *op_key,  
     61                          void *buf, pj_ssize_t *size, unsigned flags, 
     62                          pj_sockaddr_t *addr, int *addrlen) 
     63    { 
     64        PJ_ASSERT_RETURN(IsActive()==false, PJ_EBUSY); 
     65        PJ_ASSERT_RETURN(pending_data_.common_.op_key_==NULL, PJ_EBUSY); 
     66 
     67        pending_data_.read_.op_key_ = op_key; 
     68        pending_data_.read_.addr_ = addr; 
     69        pending_data_.read_.addrlen_ = addrlen; 
     70 
     71        aBufferPtr_.Set((TUint8*)buf, 0, (TInt)*size); 
     72 
     73        type_ = TYPE_READ; 
     74        SetActive(); 
     75        sock_->Socket().RecvFrom(aBufferPtr_, aAddress_, flags, iStatus); 
     76 
     77        return PJ_EPENDING; 
     78    } 
     79 
     80 
     81    // 
     82    // Start asynchronous accept() operation. 
     83    // 
     84    pj_status_t StartAccept(pj_ioqueue_op_key_t *op_key, 
     85                            pj_sock_t *new_sock, 
     86                            pj_sockaddr_t *local, 
     87                            pj_sockaddr_t *remote, 
     88                            int *addrlen ) 
     89    { 
     90        PJ_ASSERT_RETURN(IsActive()==false, PJ_EBUSY); 
     91        PJ_ASSERT_RETURN(pending_data_.common_.op_key_==NULL, PJ_EBUSY); 
     92 
     93        pending_data_.accept_.op_key_ = op_key; 
     94        pending_data_.accept_.new_sock_ = new_sock; 
     95        pending_data_.accept_.local_ = local; 
     96        pending_data_.accept_.remote_ = remote; 
     97        pending_data_.accept_.addrlen_ = addrlen; 
     98 
     99        // Create blank socket 
     100        blank_sock_.Open(PjSymbianOS::Instance()->SocketServ()); 
     101 
     102        type_ = TYPE_ACCEPT; 
     103        SetActive(); 
     104        sock_->Socket().Accept(blank_sock_, iStatus); 
     105 
     106        return PJ_EPENDING; 
     107    } 
     108 
     109 
     110    // 
     111    // Completion callback. 
     112    // 
     113    void RunL() 
     114    { 
     115        Type cur_type = type_; 
     116 
     117        type_ = TYPE_NONE; 
     118 
     119        if (cur_type == TYPE_READ) { 
     120            // 
     121            // Completion of asynchronous RecvFrom() 
     122            // 
     123 
     124            /* Clear op_key (save it to temp variable first!) */ 
     125            pj_ioqueue_op_key_t *op_key = pending_data_.read_.op_key_; 
     126            pending_data_.read_.op_key_ = NULL; 
     127 
     128            // Handle failure condition 
     129            if (iStatus != KErrNone) { 
     130                cb_->on_read_complete(key_, op_key, -PJ_RETURN_OS_ERROR(iStatus.Int())); 
     131                return; 
     132            } 
     133 
     134            if (pending_data_.read_.addr_) { 
     135                PjSymbianOS::Addr2pj(aAddress_,  
     136                                     *(pj_sockaddr_in*)pending_data_.read_.addr_); 
     137                pending_data_.read_.addr_ = NULL; 
     138            } 
     139            if (pending_data_.read_.addrlen_) { 
     140                *pending_data_.read_.addrlen_ = sizeof(pj_sockaddr_in); 
     141                pending_data_.read_.addrlen_ = NULL; 
     142            } 
     143 
     144            /* Call callback */ 
     145            cb_->on_read_complete(key_, op_key, aBufferPtr_.Length()); 
     146 
     147        } else if (cur_type == TYPE_ACCEPT) { 
     148            // 
     149            // Completion of asynchronous Accept() 
     150            // 
     151             
     152            /* Clear op_key (save it to temp variable first!) */ 
     153            pj_ioqueue_op_key_t *op_key = pending_data_.read_.op_key_; 
     154            pending_data_.read_.op_key_ = NULL; 
     155 
     156            // Handle failure condition 
     157            if (iStatus != KErrNone) { 
     158                if (pending_data_.accept_.new_sock_) 
     159                    *pending_data_.accept_.new_sock_ = PJ_INVALID_SOCKET; 
     160 
     161                cb_->on_accept_complete(key_, op_key, PJ_INVALID_SOCKET, 
     162                                        -PJ_RETURN_OS_ERROR(iStatus.Int())); 
     163                return; 
     164            } 
     165 
     166            CPjSocket *pjNewSock = new CPjSocket(blank_sock_); 
     167 
     168            if (pending_data_.accept_.new_sock_) { 
     169                *pending_data_.accept_.new_sock_ = (pj_sock_t)pjNewSock; 
     170                pending_data_.accept_.new_sock_ = NULL; 
     171            } 
     172 
     173            if (pending_data_.accept_.local_) { 
     174                TInetAddr aAddr; 
     175                blank_sock_.LocalName(aAddr); 
     176                PjSymbianOS::Addr2pj(aAddr, *(pj_sockaddr_in*)pending_data_.accept_.local_); 
     177                pending_data_.accept_.local_ = NULL; 
     178            } 
     179 
     180            if (pending_data_.accept_.remote_) { 
     181                TInetAddr aAddr; 
     182                blank_sock_.RemoteName(aAddr); 
     183                PjSymbianOS::Addr2pj(aAddr, *(pj_sockaddr_in*)pending_data_.accept_.remote_); 
     184                pending_data_.accept_.remote_ = NULL; 
     185            } 
     186 
     187            if (pending_data_.accept_.addrlen_) { 
     188                *pending_data_.accept_.addrlen_ = sizeof(pj_sockaddr_in); 
     189                pending_data_.accept_.addrlen_ = NULL; 
     190            } 
     191 
     192            // Call callback. 
     193            cb_->on_accept_complete(key_, op_key, (pj_sock_t)pjNewSock, PJ_SUCCESS); 
     194        } 
     195 
     196        ioqueue_->eventCount++; 
     197    } 
     198 
     199    // 
     200    // CActive's DoCancel() 
     201    // 
     202    void DoCancel() 
     203    { 
     204        if (type_ == TYPE_READ) 
     205            sock_->Socket().CancelRecv(); 
     206        else if (type_ == TYPE_ACCEPT) 
     207            sock_->Socket().CancelAccept(); 
     208 
     209        type_ = TYPE_NONE; 
     210    } 
     211 
     212    // 
     213    // Cancel operation and call callback. 
     214    // 
     215    void CancelOperation(pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_status) 
     216    { 
     217        Type cur_type = type_; 
     218 
     219        Cancel(); 
     220 
     221        if (cur_type == TYPE_READ) 
     222            cb_->on_read_complete(key_, op_key, bytes_status); 
     223        else if (cur_type == TYPE_ACCEPT) 
     224            ; 
     225    } 
     226 
     227    // 
     228    // Accessors 
     229    // 
     230    void* get_user_data() const 
     231    { 
     232        return user_data_; 
     233    } 
     234    void set_user_data(void *user_data) 
     235    { 
     236        user_data_ = user_data; 
     237    } 
     238    pj_ioqueue_op_key_t *get_op_key() const 
     239    { 
     240        return pending_data_.common_.op_key_; 
     241    } 
     242    CPjSocket* get_pj_socket() 
     243    { 
     244        return sock_; 
     245    } 
     246 
     247private: 
     248    // Type of pending operation. 
     249    enum Type { 
     250        TYPE_NONE, 
     251        TYPE_READ, 
     252        TYPE_ACCEPT, 
     253    }; 
     254 
     255    // Static data. 
     256    pj_ioqueue_t                *ioqueue_; 
     257    pj_ioqueue_key_t            *key_; 
     258    CPjSocket                   *sock_; 
     259    const pj_ioqueue_callback   *cb_; 
     260    void                        *user_data_; 
     261 
     262    // Symbian data. 
     263    TPtr8                        aBufferPtr_; 
     264    TInetAddr                    aAddress_; 
     265 
     266    // Application data. 
     267    Type                         type_; 
     268 
     269    union Pending_Data 
     270    { 
     271        struct Common 
     272        { 
     273            pj_ioqueue_op_key_t *op_key_; 
     274        } common_; 
     275 
     276        struct Pending_Read 
     277        { 
     278            pj_ioqueue_op_key_t *op_key_; 
     279            pj_sockaddr_t       *addr_; 
     280            int                 *addrlen_; 
     281        } read_; 
     282 
     283        struct Pending_Accept 
     284        { 
     285            pj_ioqueue_op_key_t *op_key_; 
     286            pj_sock_t           *new_sock_; 
     287            pj_sockaddr_t       *local_; 
     288            pj_sockaddr_t       *remote_; 
     289            int                 *addrlen_; 
     290        } accept_; 
     291    }; 
     292 
     293    union Pending_Data           pending_data_; 
     294    RSocket                     blank_sock_; 
     295}; 
     296 
     297 
     298 
     299 
     300/* 
     301 * IO Queue key structure. 
     302 */ 
     303struct pj_ioqueue_key_t 
     304{ 
     305    CIoqueueCallback    *cbObj; 
     306}; 
    20307 
    21308 
     
    25312PJ_DEF(const char*) pj_ioqueue_name(void) 
    26313{ 
    27     return "symbian"; 
    28 } 
    29  
     314    return "ioqueue-symbian"; 
     315} 
     316 
     317 
     318/* 
     319 * Create a new I/O Queue framework. 
     320 */ 
     321PJ_DEF(pj_status_t) pj_ioqueue_create(  pj_pool_t *pool,  
     322                                        pj_size_t max_fd, 
     323                                        pj_ioqueue_t **p_ioqueue) 
     324{ 
     325    pj_ioqueue_t *ioq; 
     326 
     327    PJ_UNUSED_ARG(max_fd); 
     328 
     329    ioq = (pj_ioqueue_t*) pj_pool_zalloc(pool, sizeof(pj_ioqueue_t)); 
     330    ioq->timeoutTimer = CPjTimeoutTimer::NewL(); 
     331    *p_ioqueue = ioq; 
     332    return PJ_SUCCESS; 
     333} 
     334 
     335 
     336/* 
     337 * Destroy the I/O queue. 
     338 */ 
     339PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioq ) 
     340{ 
     341    delete ioq->timeoutTimer; 
     342    ioq->timeoutTimer = NULL; 
     343 
     344    return PJ_SUCCESS; 
     345} 
     346 
     347 
     348/* 
     349 * Set the lock object to be used by the I/O Queue.  
     350 */ 
     351PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioq,  
     352                                         pj_lock_t *lock, 
     353                                         pj_bool_t auto_delete ) 
     354{ 
     355    /* Don't really need lock for now */ 
     356    PJ_UNUSED_ARG(ioq); 
     357     
     358    if (auto_delete) { 
     359        pj_lock_destroy(lock); 
     360    } 
     361 
     362    return PJ_SUCCESS; 
     363} 
     364 
     365 
     366/* 
     367 * Register a socket to the I/O queue framework.  
     368 */ 
     369PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 
     370                                              pj_ioqueue_t *ioq, 
     371                                              pj_sock_t sock, 
     372                                              void *user_data, 
     373                                              const pj_ioqueue_callback *cb, 
     374                                              pj_ioqueue_key_t **p_key ) 
     375{ 
     376    pj_ioqueue_key_t *key; 
     377 
     378    key = (pj_ioqueue_key_t*) pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); 
     379    key->cbObj = new CIoqueueCallback(ioq, key, sock, cb, user_data); 
     380 
     381    *p_key = key; 
     382    return PJ_SUCCESS; 
     383} 
     384 
     385/* 
     386 * Unregister from the I/O Queue framework.  
     387 */ 
     388PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) 
     389{ 
     390    if (key == NULL || key->cbObj == NULL) 
     391        return PJ_SUCCESS; 
     392 
     393    // Close socket. 
     394    key->cbObj->get_pj_socket()->Socket().Close(); 
     395    delete key->cbObj->get_pj_socket(); 
     396 
     397    // Delete async object 
     398    delete key->cbObj; 
     399    key->cbObj = NULL; 
     400 
     401    return PJ_SUCCESS; 
     402} 
     403 
     404 
     405/* 
     406 * Get user data associated with an ioqueue key. 
     407 */ 
     408PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) 
     409{ 
     410    return key->cbObj->get_user_data(); 
     411} 
     412 
     413 
     414/* 
     415 * Set or change the user data to be associated with the file descriptor or 
     416 * handle or socket descriptor. 
     417 */ 
     418PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, 
     419                                              void *user_data, 
     420                                              void **old_data) 
     421{ 
     422    if (old_data) 
     423        *old_data = key->cbObj->get_user_data(); 
     424    key->cbObj->set_user_data(user_data); 
     425 
     426    return PJ_SUCCESS; 
     427} 
     428 
     429 
     430/* 
     431 * Initialize operation key. 
     432 */ 
     433PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key, 
     434                                     pj_size_t size ) 
     435{ 
     436    pj_memset(op_key, 0, size); 
     437} 
     438 
     439 
     440/* 
     441 * Check if operation is pending on the specified operation key. 
     442 */ 
     443PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key, 
     444                                         pj_ioqueue_op_key_t *op_key ) 
     445{ 
     446    return key->cbObj->get_op_key()==op_key && 
     447           key->cbObj->IsActive(); 
     448} 
     449 
     450 
     451/* 
     452 * Post completion status to the specified operation key and call the 
     453 * appropriate callback.  
     454 */ 
     455PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, 
     456                                                pj_ioqueue_op_key_t *op_key, 
     457                                                pj_ssize_t bytes_status ) 
     458{ 
     459    if (pj_ioqueue_is_pending(key, op_key)) { 
     460        key->cbObj->CancelOperation(op_key, bytes_status); 
     461    } 
     462    return PJ_SUCCESS; 
     463} 
     464 
     465 
     466#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 
     467/** 
     468 * Instruct I/O Queue to accept incoming connection on the specified  
     469 * listening socket. 
     470 */ 
     471PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, 
     472                                       pj_ioqueue_op_key_t *op_key, 
     473                                       pj_sock_t *new_sock, 
     474                                       pj_sockaddr_t *local, 
     475                                       pj_sockaddr_t *remote, 
     476                                       int *addrlen ) 
     477{ 
     478     
     479    return key->cbObj->StartAccept(op_key, new_sock, local, remote, addrlen); 
     480} 
     481 
     482 
     483/* 
     484 * Initiate non-blocking socket connect. 
     485 */ 
     486PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, 
     487                                        const pj_sockaddr_t *addr, 
     488                                        int addrlen ) 
     489{ 
     490    PJ_ASSERT_RETURN(addrlen == sizeof(pj_sockaddr_in), PJ_EINVAL); 
     491 
     492    RSocket &rSock = key->cbObj->get_pj_socket()->Socket(); 
     493    TInetAddr inetAddr; 
     494    PjSymbianOS::pj2Addr(*(const pj_sockaddr_in*)addr, inetAddr); 
     495    TRequestStatus reqStatus; 
     496 
     497    // We don't support async connect for now. 
     498    PJ_TODO(IOQUEUE_SUPPORT_ASYNC_CONNECT); 
     499 
     500    rSock.Connect(inetAddr, reqStatus); 
     501    User::WaitForRequest(reqStatus); 
     502 
     503    if (reqStatus == KErrNone) 
     504        return PJ_SUCCESS; 
     505 
     506    return PJ_RETURN_OS_ERROR(reqStatus.Int()); 
     507} 
     508 
     509 
     510#endif  /* PJ_HAS_TCP */ 
     511 
     512/* 
     513 * Poll the I/O Queue for completed events. 
     514 */ 
     515PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioq, 
     516                             const pj_time_val *timeout) 
     517{ 
     518    CPjTimeoutTimer *timer; 
     519 
     520    if (timeout) { 
     521        if (!ioq->timeoutTimer->IsActive()) 
     522            timer = ioq->timeoutTimer; 
     523        else 
     524            timer = CPjTimeoutTimer::NewL(); 
     525 
     526        timer->StartTimer(timeout->sec*1000 + timeout->msec); 
     527 
     528    } else { 
     529        timer = NULL; 
     530    } 
     531 
     532    ioq->eventCount = 0; 
     533 
     534    do { 
     535        PjSymbianOS::Instance()->WaitForActiveObjects(); 
     536    } while (ioq->eventCount == 0 && !timer->HasTimedOut()); 
     537 
     538    if (!timer->HasTimedOut()) 
     539        timer->Cancel(); 
     540 
     541    if (timer != ioq->timeoutTimer) 
     542        delete timer; 
     543 
     544    return ioq->eventCount; 
     545} 
     546 
     547 
     548/* 
     549 * Instruct the I/O Queue to read from the specified handle. 
     550 */ 
     551PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, 
     552                                     pj_ioqueue_op_key_t *op_key, 
     553                                     void *buffer, 
     554                                     pj_ssize_t *length, 
     555                                     pj_uint32_t flags ) 
     556{ 
     557    return pj_ioqueue_recvfrom(key, op_key, buffer, length, flags, NULL, NULL); 
     558} 
     559 
     560 
     561/* 
     562 * This function behaves similarly as #pj_ioqueue_recv(), except that it is 
     563 * normally called for socket, and the remote address will also be returned 
     564 * along with the data. 
     565 */ 
     566PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, 
     567                                         pj_ioqueue_op_key_t *op_key, 
     568                                         void *buffer, 
     569                                         pj_ssize_t *length, 
     570                                         pj_uint32_t flags, 
     571                                         pj_sockaddr_t *addr, 
     572                                         int *addrlen) 
     573{ 
     574    if (key->cbObj->IsActive()) 
     575        return PJ_EBUSY; 
     576 
     577    return key->cbObj->StartRead(op_key, buffer, length, flags, addr, addrlen); 
     578} 
     579 
     580 
     581/* 
     582 * Instruct the I/O Queue to write to the handle. 
     583 */ 
     584PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, 
     585                                     pj_ioqueue_op_key_t *op_key, 
     586                                     const void *data, 
     587                                     pj_ssize_t *length, 
     588                                     pj_uint32_t flags ) 
     589{ 
     590    TRequestStatus reqStatus; 
     591    TPtrC8 aBuffer((const TUint8*)data, (TInt)*length); 
     592    TSockXfrLength aLen; 
     593     
     594    PJ_UNUSED_ARG(op_key); 
     595 
     596    // Forcing pending operation is not supported. 
     597    PJ_ASSERT_RETURN((flags & PJ_IOQUEUE_ALWAYS_ASYNC)==0, PJ_EINVAL); 
     598 
     599    key->cbObj->get_pj_socket()->Socket().Send(aBuffer, flags, reqStatus, aLen); 
     600    User::WaitForRequest(reqStatus); 
     601 
     602    if (reqStatus.Int() != KErrNone) 
     603        return PJ_RETURN_OS_ERROR(reqStatus.Int()); 
     604 
     605    *length = aLen.Length(); 
     606    return PJ_SUCCESS; 
     607} 
     608 
     609 
     610/* 
     611 * Instruct the I/O Queue to write to the handle. 
     612 */ 
     613PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, 
     614                                       pj_ioqueue_op_key_t *op_key, 
     615                                       const void *data, 
     616                                       pj_ssize_t *length, 
     617                                       pj_uint32_t flags, 
     618                                       const pj_sockaddr_t *addr, 
     619                                       int addrlen) 
     620{ 
     621    TRequestStatus reqStatus; 
     622    TPtrC8 aBuffer; 
     623    TInetAddr inetAddr; 
     624    TSockXfrLength aLen; 
     625     
     626    PJ_UNUSED_ARG(op_key); 
     627 
     628    // Forcing pending operation is not supported. 
     629    PJ_ASSERT_RETURN((flags & PJ_IOQUEUE_ALWAYS_ASYNC)==0, PJ_EINVAL); 
     630 
     631    // Must be pj_sockaddr_in for now. 
     632    PJ_ASSERT_RETURN(addrlen == sizeof(pj_sockaddr_in), PJ_EINVAL); 
     633 
     634    aBuffer.Set((const TUint8*)data, (TInt)*length); 
     635    PjSymbianOS::pj2Addr(*(const pj_sockaddr_in*)addr, inetAddr); 
     636    CPjSocket *pjSock = key->cbObj->get_pj_socket(); 
     637 
     638    pjSock->Socket().Send(aBuffer, flags, reqStatus, aLen); 
     639    User::WaitForRequest(reqStatus); 
     640 
     641    if (reqStatus.Int() != KErrNone) 
     642        return PJ_RETURN_OS_ERROR(reqStatus.Int()); 
     643 
     644    *length = aLen.Length(); 
     645    return PJ_SUCCESS; 
     646} 
     647 
Note: See TracChangeset for help on using the changeset viewer.