Changeset 5256


Ignore:
Timestamp:
Mar 11, 2016 4:17:32 AM (9 years ago)
Author:
nanang
Message:

Re #1900:

  • Works on UWP socket & ioqueue.
  • Media transport UDP: cancel any pending send on detach, otherwise there is possibility that send buffer is already freed by application (stream) when the send op starts.
  • Ioqueue common abs: rename 'generic' as it seems to be a keyword in C++/CX, fixed #if/#endif possition in ioqueue_init_key().
  • pjsua GUI app: fixed thread registration status check.
Location:
pjproject/branches/projects/uwp
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • pjproject/branches/projects/uwp/pjlib/src/pj/ioqueue_common_abs.c

    r5210 r5256  
    117117#if !PJ_IOQUEUE_HAS_SAFE_UNREG 
    118118    rc = pj_lock_create_simple_mutex(pool, NULL, &key->lock); 
    119 #endif 
    120119    if (rc != PJ_SUCCESS) 
    121120        return rc; 
     121#endif 
    122122 
    123123    /* Group lock */ 
  • pjproject/branches/projects/uwp/pjlib/src/pj/ioqueue_common_abs.h

    r4359 r5256  
    8181union operation_key 
    8282{ 
    83     struct generic_operation generic; 
     83    struct generic_operation generic_op; 
    8484    struct read_operation    read; 
    8585    struct write_operation   write; 
  • pjproject/branches/projects/uwp/pjlib/src/pj/ioqueue_uwp.cpp

    r5254 r5256  
    1717 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  
    1818 */ 
    19 /* 
    2019#include <pj/ioqueue.h> 
    21 #include <pj/assert.h> 
    2220#include <pj/errno.h> 
    23 #include <pj/list.h> 
    24 #include <pj/lock.h> 
    25 #include <pj/pool.h> 
    26 #include <pj/string.h> 
    27  
    28 #include <pj/ioqueue.h> 
    29 #include <pj/os.h> 
    3021#include <pj/lock.h> 
    3122#include <pj/log.h> 
    32 #include <pj/list.h> 
    33 #include <pj/pool.h> 
    34 #include <pj/string.h> 
    35 #include <pj/assert.h> 
    36 #include <pj/sock.h> 
    37 #include <pj/compat/socket.h> 
    38 #include <pj/sock_select.h> 
    39 #include <pj/sock_qos.h> 
    40 #include <pj/errno.h> 
    41 #include <pj/rand.h> 
    42 */ 
    43  
    44 #include <pj/ioqueue.h> 
    45 #include <pj/sock.h> 
    46 #include <pj/addr_resolv.h> 
    47 #include <pj/assert.h> 
    48 #include <pj/errno.h> 
    49 #include <pj/lock.h> 
    50 #include <pj/math.h> 
    5123#include <pj/os.h> 
    5224#include <pj/pool.h> 
    53 #include <pj/string.h> 
    54 #include <pj/unicode.h> 
    55 #include <pj/compat/socket.h> 
    5625 
    5726#include <ppltasks.h> 
    5827#include <string> 
    5928 
     29#define THIS_FILE   "ioq_uwp" 
     30 
    6031#include "sock_uwp.h" 
    61  
    62  
    63 /* 
    64  * IO Queue structure. 
    65  */ 
     32#include "ioqueue_common_abs.h" 
     33 
     34 /* 
     35 * This describes each key. 
     36 */ 
     37struct pj_ioqueue_key_t 
     38{ 
     39    DECLARE_COMMON_KEY 
     40}; 
     41 
     42/* 
     43* This describes the I/O queue itself. 
     44*/ 
    6645struct pj_ioqueue_t 
    6746{ 
    68     int              dummy; 
     47    DECLARE_COMMON_IOQUEUE 
     48    pj_thread_desc   thread_desc[16]; 
     49    unsigned         thread_cnt; 
    6950}; 
    7051 
    7152 
    72 /* 
    73  * IO Queue key structure. 
    74  */ 
    75 struct pj_ioqueue_key_t 
    76 { 
    77     pj_sock_t                sock; 
    78     void                    *user_data; 
    79     pj_ioqueue_callback      cb; 
    80 }; 
    81  
     53#include "ioqueue_common_abs.c" 
     54 
     55static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, 
     56                                     pj_ioqueue_key_t *key,  
     57                                     enum ioqueue_event_type event_type) 
     58{ 
     59    PJ_UNUSED_ARG(ioqueue); 
     60    PJ_UNUSED_ARG(key); 
     61    PJ_UNUSED_ARG(event_type); 
     62} 
     63 
     64 
     65static void start_next_read(pj_ioqueue_key_t *key) 
     66{ 
     67    if (key_has_pending_read(key)) { 
     68        PjUwpSocket *s = (PjUwpSocket*)key->fd; 
     69        struct read_operation *op; 
     70        op = (struct read_operation*)key->read_list.next; 
     71 
     72        if (op->op == PJ_IOQUEUE_OP_RECV) 
     73            s->Recv(NULL, (pj_ssize_t*)&op->size); 
     74        else 
     75            s->RecvFrom(NULL, (pj_ssize_t*)&op->size, NULL); 
     76    } 
     77} 
     78 
     79 
     80static void start_next_write(pj_ioqueue_key_t *key) 
     81{ 
     82    if (key_has_pending_write(key)) { 
     83        PjUwpSocket *s = (PjUwpSocket*)key->fd; 
     84        struct write_operation *op; 
     85        op = (struct write_operation*)key->write_list.next; 
     86 
     87        if (op->op == PJ_IOQUEUE_OP_SEND) 
     88            s->Send(op->buf, (pj_ssize_t*)&op->size); 
     89        else 
     90            s->SendTo(op->buf, (pj_ssize_t*)&op->size, &op->rmt_addr); 
     91    } 
     92} 
     93 
     94 
     95static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, 
     96                                pj_ioqueue_key_t *key, 
     97                                enum ioqueue_event_type event_type ) 
     98{ 
     99    PJ_UNUSED_ARG(ioqueue); 
     100 
     101    if (event_type == READABLE_EVENT) { 
     102        /* This is either recv, recvfrom, or accept, do nothing on accept */ 
     103        start_next_read(key); 
     104    } else if (event_type == WRITEABLE_EVENT) { 
     105        /* This is either send, sendto, or connect, do nothing on connect */ 
     106        //start_next_write(key); 
     107    } 
     108} 
     109 
     110 
     111static void check_thread(pj_ioqueue_t *ioq) { 
     112    if (pj_thread_is_registered()) 
     113        return; 
     114 
     115    pj_thread_t *t; 
     116    char tmp[16]; 
     117    pj_ansi_snprintf(tmp, sizeof(tmp), "UwpThread%02d", ioq->thread_cnt); 
     118    pj_thread_register(tmp, ioq->thread_desc[ioq->thread_cnt++], &t); 
     119    pj_assert(ioq->thread_cnt < PJ_ARRAY_SIZE(ioq->thread_desc)); 
     120    ioq->thread_cnt %= PJ_ARRAY_SIZE(ioq->thread_desc); 
     121} 
    82122 
    83123static void on_read(PjUwpSocket *s, int bytes_read) 
    84124{ 
    85     pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data; 
    86     if (key->cb.on_read_complete) { 
    87         (*key->cb.on_read_complete)(key, (pj_ioqueue_op_key_t*)s->read_userdata, bytes_read); 
    88     } 
    89     s->read_userdata = NULL; 
     125    pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData(); 
     126    pj_ioqueue_t *ioq = key->ioqueue; 
     127    check_thread(ioq); 
     128 
     129    ioqueue_dispatch_read_event(key->ioqueue, key); 
     130     
     131    if (bytes_read > 0) 
     132        start_next_read(key); 
    90133} 
    91134 
    92135static void on_write(PjUwpSocket *s, int bytes_sent) 
    93136{ 
    94     pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data; 
    95     if (key->cb.on_write_complete) { 
    96         (*key->cb.on_write_complete)(key, (pj_ioqueue_op_key_t*)s->write_userdata, bytes_sent); 
    97     } 
    98     s->write_userdata = NULL; 
    99 } 
    100  
    101 static void on_accept(PjUwpSocket *s, pj_status_t status) 
    102 { 
    103     pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data; 
    104     if (key->cb.on_accept_complete) { 
    105         if (status == PJ_SUCCESS) { 
    106             pj_sock_t new_sock; 
    107             pj_sockaddr addr; 
    108             int addrlen; 
    109             status = pj_sock_accept(key->sock, &new_sock, &addr, &addrlen); 
    110             (*key->cb.on_accept_complete)(key, (pj_ioqueue_op_key_t*)s->accept_userdata, new_sock, status); 
    111         } else { 
    112             (*key->cb.on_accept_complete)(key, (pj_ioqueue_op_key_t*)s->accept_userdata, NULL, status); 
    113         } 
    114     } 
    115     s->accept_userdata = NULL; 
     137    PJ_UNUSED_ARG(bytes_sent); 
     138    pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData(); 
     139    pj_ioqueue_t *ioq = key->ioqueue; 
     140    check_thread(ioq); 
     141 
     142    ioqueue_dispatch_write_event(key->ioqueue, key); 
     143 
     144    //start_next_write(key); 
     145} 
     146 
     147static void on_accept(PjUwpSocket *s) 
     148{ 
     149    pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData(); 
     150    pj_ioqueue_t *ioq = key->ioqueue; 
     151    check_thread(ioq); 
     152 
     153    ioqueue_dispatch_read_event(key->ioqueue, key); 
    116154} 
    117155 
    118156static void on_connect(PjUwpSocket *s, pj_status_t status) 
    119157{ 
    120     pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data; 
    121     if (key->cb.on_connect_complete) { 
    122         (*key->cb.on_connect_complete)(key, status); 
    123     } 
     158    PJ_UNUSED_ARG(status); 
     159    pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData(); 
     160    pj_ioqueue_t *ioq = key->ioqueue; 
     161    check_thread(ioq); 
     162 
     163    ioqueue_dispatch_write_event(key->ioqueue, key); 
    124164} 
    125165 
     
    142182{ 
    143183    pj_ioqueue_t *ioq; 
     184    pj_lock_t *lock; 
     185    pj_status_t rc; 
    144186 
    145187    PJ_UNUSED_ARG(max_fd); 
    146188 
    147189    ioq = PJ_POOL_ZALLOC_T(pool, pj_ioqueue_t); 
     190 
     191    /* Create and init ioqueue mutex */ 
     192    rc = pj_lock_create_null_mutex(pool, "ioq%p", &lock); 
     193    if (rc != PJ_SUCCESS) 
     194        return rc; 
     195 
     196    rc = pj_ioqueue_set_lock(ioq, lock, PJ_TRUE); 
     197    if (rc != PJ_SUCCESS) 
     198        return rc; 
     199 
     200    PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioq)); 
     201 
    148202    *p_ioqueue = ioq; 
    149203    return PJ_SUCCESS; 
     
    156210PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioq ) 
    157211{ 
    158     PJ_UNUSED_ARG(ioq); 
    159     return PJ_SUCCESS; 
    160 } 
    161  
    162  
    163 /* 
    164  * Set the lock object to be used by the I/O Queue.  
    165  */ 
    166 PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioq,  
    167                                          pj_lock_t *lock, 
    168                                          pj_bool_t auto_delete ) 
    169 { 
    170     /* Don't really need lock for now */ 
    171     PJ_UNUSED_ARG(ioq); 
    172      
    173     if (auto_delete) { 
    174         pj_lock_destroy(lock); 
    175     } 
    176  
    177     return PJ_SUCCESS; 
    178 } 
    179  
    180 PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue, 
    181                                                        pj_bool_t allow) 
    182 { 
    183     /* Not supported, just return PJ_SUCCESS silently */ 
    184     PJ_UNUSED_ARG(ioqueue); 
    185     PJ_UNUSED_ARG(allow); 
    186     return PJ_SUCCESS; 
    187 } 
     212    return ioqueue_destroy(ioq); 
     213} 
     214 
    188215 
    189216/* 
     
    191218 */ 
    192219PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 
    193                                               pj_ioqueue_t *ioq, 
     220                                              pj_ioqueue_t *ioqueue, 
    194221                                              pj_sock_t sock, 
    195222                                              void *user_data, 
     
    197224                                              pj_ioqueue_key_t **p_key ) 
    198225{ 
    199     PJ_UNUSED_ARG(ioq); 
    200  
    201     pj_ioqueue_key_t *key; 
    202  
    203     key = PJ_POOL_ZALLOC_T(pool, pj_ioqueue_key_t); 
    204     key->sock = sock; 
    205     key->user_data = user_data;     
    206     pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); 
    207  
    208     PjUwpSocket *s = (PjUwpSocket*)sock; 
    209     s->is_blocking = PJ_FALSE; 
    210     s->user_data = key; 
    211     s->on_read = &on_read; 
    212     s->on_write = &on_write; 
    213     s->on_accept = &on_accept; 
    214     s->on_connect = &on_connect; 
    215  
    216     *p_key = key; 
    217     return PJ_SUCCESS; 
     226    return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data, 
     227                                     cb, p_key); 
    218228} 
    219229 
     
    226236                                              pj_ioqueue_key_t **p_key) 
    227237{ 
    228     PJ_UNUSED_ARG(grp_lock); 
    229  
    230     return pj_ioqueue_register_sock(pool, ioqueue, sock, user_data, cb, p_key); 
     238    PjUwpSocketCallback uwp_cb = 
     239                            { &on_read, &on_write, &on_accept, &on_connect }; 
     240    pj_ioqueue_key_t *key; 
     241    pj_status_t rc; 
     242 
     243    pj_lock_acquire(ioqueue->lock); 
     244 
     245    key = PJ_POOL_ZALLOC_T(pool, pj_ioqueue_key_t); 
     246    rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb); 
     247    if (rc != PJ_SUCCESS) { 
     248        key = NULL; 
     249        goto on_return; 
     250    } 
     251 
     252    /* Create ioqueue key lock, if not yet */ 
     253    if (!key->lock) { 
     254        rc = pj_lock_create_simple_mutex(pool, NULL, &key->lock); 
     255        if (rc != PJ_SUCCESS) { 
     256            key = NULL; 
     257            goto on_return; 
     258        } 
     259    } 
     260 
     261    PjUwpSocket *s = (PjUwpSocket*)sock; 
     262    s->SetNonBlocking(&uwp_cb, key); 
     263 
     264on_return: 
     265    if (rc != PJ_SUCCESS) { 
     266        if (key && key->grp_lock) 
     267            pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0); 
     268    } 
     269    *p_key = key; 
     270    pj_lock_release(ioqueue->lock); 
     271 
     272    return rc; 
     273 
    231274} 
    232275 
     
    236279PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) 
    237280{ 
    238     if (key == NULL || key->sock == NULL) 
    239         return PJ_SUCCESS; 
    240  
    241     if (key->sock) 
    242         pj_sock_close(key->sock); 
    243     key->sock = NULL; 
     281    pj_ioqueue_t *ioqueue; 
     282 
     283    PJ_ASSERT_RETURN(key, PJ_EINVAL); 
     284 
     285    ioqueue = key->ioqueue; 
     286 
     287    /* Lock the key to make sure no callback is simultaneously modifying 
     288     * the key. We need to lock the key before ioqueue here to prevent 
     289     * deadlock. 
     290     */ 
     291    pj_ioqueue_lock_key(key); 
     292 
     293    /* Also lock ioqueue */ 
     294    pj_lock_acquire(ioqueue->lock); 
     295 
     296    /* Close socket. */ 
     297    pj_sock_close(key->fd); 
     298 
     299    /* Clear callback */ 
     300    key->cb.on_accept_complete = NULL; 
     301    key->cb.on_connect_complete = NULL; 
     302    key->cb.on_read_complete = NULL; 
     303    key->cb.on_write_complete = NULL; 
     304 
     305    pj_lock_release(ioqueue->lock); 
     306 
     307    if (key->grp_lock) { 
     308        pj_grp_lock_t *grp_lock = key->grp_lock; 
     309        pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); 
     310        pj_grp_lock_release(grp_lock); 
     311    } else { 
     312        pj_ioqueue_unlock_key(key); 
     313    } 
     314 
     315    pj_lock_destroy(key->lock); 
    244316 
    245317    return PJ_SUCCESS; 
    246318} 
    247319 
    248  
    249 /* 
    250  * Get user data associated with an ioqueue key. 
    251  */ 
    252 PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) 
    253 { 
    254     return key->user_data; 
    255 } 
    256  
    257  
    258 /* 
    259  * Set or change the user data to be associated with the file descriptor or 
    260  * handle or socket descriptor. 
    261  */ 
    262 PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, 
    263                                               void *user_data, 
    264                                               void **old_data) 
    265 { 
    266     if (old_data) 
    267         *old_data = key->user_data; 
    268     key->user_data= user_data; 
    269  
    270     return PJ_SUCCESS; 
    271 } 
    272  
    273  
    274 /* 
    275  * Initialize operation key. 
    276  */ 
    277 PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key, 
    278                                      pj_size_t size ) 
    279 { 
    280     pj_bzero(op_key, size); 
    281 } 
    282  
    283  
    284 /* 
    285  * Check if operation is pending on the specified operation key. 
    286  */ 
    287 PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key, 
    288                                          pj_ioqueue_op_key_t *op_key ) 
    289 { 
    290     PJ_UNUSED_ARG(key); 
    291     PJ_UNUSED_ARG(op_key); 
    292     return PJ_FALSE; 
    293 } 
    294  
    295  
    296 /* 
    297  * Post completion status to the specified operation key and call the 
    298  * appropriate callback.  
    299  */ 
    300 PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, 
    301                                                 pj_ioqueue_op_key_t *op_key, 
    302                                                 pj_ssize_t bytes_status ) 
    303 { 
    304     PJ_UNUSED_ARG(key); 
    305     PJ_UNUSED_ARG(op_key); 
    306     PJ_UNUSED_ARG(bytes_status); 
    307     return PJ_ENOTSUP; 
    308 } 
    309  
    310  
    311 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 
    312 /** 
    313  * Instruct I/O Queue to accept incoming connection on the specified  
    314  * listening socket. 
    315  */ 
    316 PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, 
    317                                        pj_ioqueue_op_key_t *op_key, 
    318                                        pj_sock_t *new_sock, 
    319                                        pj_sockaddr_t *local, 
    320                                        pj_sockaddr_t *remote, 
    321                                        int *addrlen ) 
    322 { 
    323     PJ_UNUSED_ARG(new_sock); 
    324     PJ_UNUSED_ARG(local); 
    325     PJ_UNUSED_ARG(remote); 
    326     PJ_UNUSED_ARG(addrlen); 
    327  
    328     PjUwpSocket *s = (PjUwpSocket*)key->sock; 
    329     s->accept_userdata = op_key; 
    330     return pj_sock_listen(key->sock, 0); 
    331 } 
    332  
    333  
    334 /* 
    335  * Initiate non-blocking socket connect. 
    336  */ 
    337 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, 
    338                                         const pj_sockaddr_t *addr, 
    339                                         int addrlen ) 
    340 { 
    341     return pj_sock_connect(key->sock, addr, addrlen); 
    342 } 
    343  
    344  
    345 #endif  /* PJ_HAS_TCP */ 
    346320 
    347321/* 
     
    351325                             const pj_time_val *timeout) 
    352326{ 
    353     /* Polling is not necessary on uwp, since all async activities 
    354      * are registered to active scheduler. 
     327    /* Polling is not necessary on UWP, since each socket handles 
     328     * its events already. 
    355329     */ 
    356330    PJ_UNUSED_ARG(ioq); 
    357     PJ_UNUSED_ARG(timeout); 
     331 
     332    pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); 
     333 
    358334    return 0; 
    359335} 
    360336 
    361  
    362 /* 
    363  * Instruct the I/O Queue to read from the specified handle. 
    364  */ 
    365 PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, 
    366                                      pj_ioqueue_op_key_t *op_key, 
    367                                      void *buffer, 
    368                                      pj_ssize_t *length, 
    369                                      pj_uint32_t flags ) 
    370 { 
    371     PjUwpSocket *s = (PjUwpSocket*)key->sock; 
    372     s->read_userdata = op_key; 
    373     return pj_sock_recv(key->sock, buffer, length, flags); 
    374 } 
    375  
    376  
    377 /* 
    378  * This function behaves similarly as #pj_ioqueue_recv(), except that it is 
    379  * normally called for socket, and the remote address will also be returned 
    380  * along with the data. 
    381  */ 
    382 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, 
    383                                          pj_ioqueue_op_key_t *op_key, 
    384                                          void *buffer, 
    385                                          pj_ssize_t *length, 
    386                                          pj_uint32_t flags, 
    387                                          pj_sockaddr_t *addr, 
    388                                          int *addrlen) 
    389 { 
    390     PjUwpSocket *s = (PjUwpSocket*)key->sock; 
    391     s->read_userdata = op_key; 
    392     return pj_sock_recvfrom(key->sock, buffer, length, flags, addr, addrlen); 
    393 } 
    394  
    395  
    396 /* 
    397  * Instruct the I/O Queue to write to the handle. 
    398  */ 
    399 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, 
    400                                      pj_ioqueue_op_key_t *op_key, 
    401                                      const void *data, 
    402                                      pj_ssize_t *length, 
    403                                      pj_uint32_t flags ) 
    404 { 
    405     PjUwpSocket *s = (PjUwpSocket*)key->sock; 
    406     s->write_userdata = op_key; 
    407     return pj_sock_send(key->sock, data, length, flags); 
    408 } 
    409  
    410  
    411 /* 
    412  * Instruct the I/O Queue to write to the handle. 
    413  */ 
    414 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, 
    415                                        pj_ioqueue_op_key_t *op_key, 
    416                                        const void *data, 
    417                                        pj_ssize_t *length, 
    418                                        pj_uint32_t flags, 
    419                                        const pj_sockaddr_t *addr, 
    420                                        int addrlen) 
    421 { 
    422     PjUwpSocket *s = (PjUwpSocket*)key->sock; 
    423     s->write_userdata = op_key; 
    424     return pj_sock_sendto(key->sock, data, length, flags, addr, addrlen); 
    425 } 
    426  
    427 PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, 
    428                                                pj_bool_t allow) 
    429 { 
    430         /* Not supported, just return PJ_SUCCESS silently */ 
    431         PJ_UNUSED_ARG(key); 
    432         PJ_UNUSED_ARG(allow); 
    433         return PJ_SUCCESS; 
    434 } 
    435  
    436 PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) 
    437 { 
    438         /* Not supported, just return PJ_SUCCESS silently */ 
    439         PJ_UNUSED_ARG(key); 
    440         return PJ_SUCCESS; 
    441 } 
    442  
    443 PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) 
    444 { 
    445         /* Not supported, just return PJ_SUCCESS silently */ 
    446         PJ_UNUSED_ARG(key); 
    447         return PJ_SUCCESS; 
    448 } 
  • pjproject/branches/projects/uwp/pjlib/src/pj/sock_uwp.cpp

    r5254 r5256  
    1717 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  
    1818 */ 
    19 #include <pj/sock.h> 
    20 #include <pj/addr_resolv.h> 
    2119#include <pj/assert.h> 
    2220#include <pj/errno.h> 
    2321#include <pj/math.h> 
    2422#include <pj/os.h> 
    25 #include <pj/string.h> 
    26 #include <pj/unicode.h> 
    2723#include <pj/compat/socket.h> 
    2824 
     
    3026#include <string> 
    3127 
     28#define THIS_FILE       "sock_uwp.cpp" 
     29 
    3230#include "sock_uwp.h" 
    33  
    34 #define THIS_FILE       "sock_uwp.cpp" 
    3531 
    3632 /* 
     
    201197    { 
    202198        try { 
     199            if (uwp_sock->sock_state >= SOCKSTATE_DISCONNECTED) 
     200                return; 
     201 
    203202            recv_args = args; 
    204203            avail_data_len = args->GetDataReader()->UnconsumedBufferLength; 
    205204 
    206             // Notify application asynchronously 
    207             concurrency::create_task([this]() 
    208             { 
    209                 if (uwp_sock->on_read) { 
    210                     if (!pj_thread_is_registered()) 
    211                         pj_thread_register("MsgReceive", thread_desc,  
    212                                            &rec_thread); 
    213  
    214                     (tp)(*uwp_sock->read_userdata) 
    215                     (*uwp_sock->on_read)(uwp_sock, avail_data_len); 
    216                 } 
    217             }); 
     205            if (uwp_sock->cb.on_read) { 
     206                (*uwp_sock->cb.on_read)(uwp_sock, avail_data_len); 
     207            } 
    218208 
    219209            WaitForSingleObjectEx(recv_wait, INFINITE, false); 
    220         } catch (Exception^ e) {} 
     210        } catch (...) {} 
    221211    } 
    222212 
     
    265255    HANDLE recv_wait; 
    266256    int avail_data_len; 
    267     pj_thread_desc thread_desc; 
    268     pj_thread_t *rec_thread; 
    269257}; 
    270258 
     
    288276            conn_args = args; 
    289277 
    290             // Notify application asynchronously 
    291             concurrency::create_task([this]() 
    292             { 
    293                 if (uwp_sock->on_accept) { 
    294                     if (!pj_thread_is_registered()) 
    295                         pj_thread_register("ConnReceive", thread_desc,  
    296                                            &listener_thread); 
    297  
    298                     (*uwp_sock->on_accept)(uwp_sock, PJ_SUCCESS); 
    299                 } 
    300             }); 
     278            if (uwp_sock->cb.on_accept) { 
     279                (*uwp_sock->cb.on_accept)(uwp_sock); 
     280            } 
    301281 
    302282            WaitForSingleObjectEx(conn_wait, INFINITE, false); 
     
    304284    } 
    305285 
    306     pj_status_t GetAcceptedSocket(StreamSocket^ stream_sock) 
     286    pj_status_t GetAcceptedSocket(StreamSocket^& stream_sock) 
    307287    { 
    308288        if (conn_args == nullptr) 
     
    333313    EventRegistrationToken event_token; 
    334314    HANDLE conn_wait; 
    335  
    336     pj_thread_desc thread_desc; 
    337     pj_thread_t *listener_thread; 
    338315}; 
    339316 
     
    341318PjUwpSocket::PjUwpSocket(int af_, int type_, int proto_) : 
    342319    af(af_), type(type_), proto(proto_), 
    343     sock_type(SOCKTYPE_UNKNOWN), sock_state(SOCKSTATE_NULL), 
    344     is_blocking(PJ_TRUE), is_busy_sending(PJ_FALSE) 
     320    sock_type(SOCKTYPE_UNKNOWN), 
     321    sock_state(SOCKSTATE_NULL), 
     322    is_blocking(PJ_TRUE), 
     323    has_pending_bind(PJ_FALSE), 
     324    has_pending_send(PJ_FALSE), 
     325    has_pending_recv(PJ_FALSE) 
    345326{ 
    346327    pj_sockaddr_init(pj_AF_INET(), &local_addr, NULL, 0); 
     
    349330 
    350331PjUwpSocket::~PjUwpSocket() 
    351 {} 
     332{ 
     333    DeinitSocket(); 
     334} 
    352335 
    353336PjUwpSocket* PjUwpSocket::CreateAcceptSocket(Windows::Networking::Sockets::StreamSocket^ stream_sock_) 
     
    359342    new_sock->socket_reader = ref new DataReader(new_sock->stream_sock->InputStream); 
    360343    new_sock->socket_writer = ref new DataWriter(new_sock->stream_sock->OutputStream); 
     344    new_sock->socket_reader->InputStreamOptions = InputStreamOptions::Partial; 
    361345    new_sock->send_buffer = ref new Buffer(SEND_BUFFER_SIZE); 
    362346    new_sock->is_blocking = is_blocking; 
     
    399383} 
    400384 
     385 
     386void PjUwpSocket::DeinitSocket() 
     387{ 
     388    if (stream_sock) { 
     389        concurrency::create_task(stream_sock->CancelIOAsync()).wait(); 
     390    } 
     391    if (datagram_sock) { 
     392        concurrency::create_task(datagram_sock->CancelIOAsync()).wait(); 
     393    } 
     394    if (listener_sock) { 
     395        concurrency::create_task(listener_sock->CancelIOAsync()).wait(); 
     396    } 
     397    stream_sock = nullptr; 
     398    datagram_sock = nullptr; 
     399    dgram_recv_helper = nullptr; 
     400    listener_sock = nullptr; 
     401    listener_helper = nullptr; 
     402    socket_writer = nullptr; 
     403    socket_reader = nullptr; 
     404    sock_state = SOCKSTATE_NULL; 
     405} 
     406 
     407pj_status_t PjUwpSocket::Bind(const pj_sockaddr_t *addr) 
     408{ 
     409    /* Not initialized yet, socket type is perhaps TCP, just not decided yet 
     410     * whether it is a stream or a listener. 
     411     */ 
     412    if (sock_state < SOCKSTATE_INITIALIZED) { 
     413        pj_sockaddr_cp(&local_addr, addr); 
     414        has_pending_bind = PJ_TRUE; 
     415        return PJ_SUCCESS; 
     416    } 
     417     
     418    PJ_ASSERT_RETURN(sock_state == SOCKSTATE_INITIALIZED, PJ_EINVALIDOP); 
     419    if (sock_type != SOCKTYPE_DATAGRAM && sock_type != SOCKTYPE_LISTENER) 
     420        return PJ_EINVALIDOP; 
     421 
     422    if (has_pending_bind) { 
     423        has_pending_bind = PJ_FALSE; 
     424        if (!addr) 
     425            addr = &local_addr; 
     426    } 
     427 
     428    /* If no bound address is set, just return */ 
     429    if (!pj_sockaddr_has_addr(addr) && !pj_sockaddr_get_port(addr)) 
     430        return PJ_SUCCESS; 
     431 
     432    if (addr != &local_addr) 
     433        pj_sockaddr_cp(&local_addr, addr); 
     434 
     435    HRESULT err = 0; 
     436    try { 
     437        concurrency::create_task([this, addr]() { 
     438            HostName ^host; 
     439            int port; 
     440            sockaddr_to_hostname_port(addr, host, &port); 
     441            if (pj_sockaddr_has_addr(addr)) { 
     442                if (sock_type == SOCKTYPE_DATAGRAM) 
     443                    return datagram_sock->BindEndpointAsync(host, port.ToString()); 
     444                else 
     445                    return listener_sock->BindEndpointAsync(host, port.ToString()); 
     446            } else /* if (pj_sockaddr_get_port(addr) != 0) */ { 
     447                if (sock_type == SOCKTYPE_DATAGRAM) 
     448                    return datagram_sock->BindServiceNameAsync(port.ToString()); 
     449                else 
     450                    return listener_sock->BindServiceNameAsync(port.ToString()); 
     451            } 
     452        }).then([this, &err](concurrency::task<void> t) 
     453        { 
     454            try { 
     455                t.get(); 
     456            } catch (Exception^ e) { 
     457                err = e->HResult; 
     458            } 
     459        }).get(); 
     460    } catch (Exception^ e) { 
     461        err = e->HResult; 
     462    } 
     463 
     464    return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS); 
     465} 
     466 
     467 
     468pj_status_t PjUwpSocket::SendImp(const void *buf, pj_ssize_t *len) 
     469{ 
     470    if (has_pending_send) 
     471        return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 
     472 
     473    if (*len > (pj_ssize_t)send_buffer->Capacity) 
     474        return PJ_ETOOBIG; 
     475 
     476    CopyToIBuffer((unsigned char*)buf, *len, send_buffer); 
     477    send_buffer->Length = *len; 
     478    socket_writer->WriteBuffer(send_buffer); 
     479 
     480    /* Blocking version */ 
     481    if (is_blocking) { 
     482        pj_status_t status = PJ_SUCCESS; 
     483        concurrency::cancellation_token_source cts; 
     484        auto cts_token = cts.get_token(); 
     485        auto t = concurrency::create_task(socket_writer->StoreAsync(), 
     486                                          cts_token); 
     487        *len = cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 
     488            then([cts_token, &status](concurrency::task<unsigned int> t_) 
     489        { 
     490            int sent = 0; 
     491            try { 
     492                if (cts_token.is_canceled()) 
     493                    status = PJ_ETIMEDOUT; 
     494                else 
     495                    sent = t_.get(); 
     496            } catch (Exception^ e) { 
     497                status = PJ_RETURN_OS_ERROR(e->HResult); 
     498            } 
     499            return sent; 
     500        }).get(); 
     501 
     502        return status; 
     503    }  
     504 
     505    /* Non-blocking version */ 
     506    has_pending_send = PJ_TRUE; 
     507    concurrency::create_task(socket_writer->StoreAsync()). 
     508        then([this](concurrency::task<unsigned int> t_) 
     509    { 
     510        try { 
     511            unsigned int l = t_.get(); 
     512            has_pending_send = PJ_FALSE; 
     513 
     514            // invoke callback 
     515            if (cb.on_write) { 
     516                (*cb.on_write)(this, l); 
     517            } 
     518        } catch (...) { 
     519            has_pending_send = PJ_FALSE; 
     520            sock_state = SOCKSTATE_ERROR; 
     521            DeinitSocket(); 
     522 
     523            // invoke callback 
     524            if (cb.on_write) { 
     525                (*cb.on_write)(this, -PJ_EUNKNOWN); 
     526            } 
     527        } 
     528    }); 
     529 
     530    return PJ_SUCCESS; 
     531} 
     532 
     533 
     534pj_status_t PjUwpSocket::Send(const void *buf, pj_ssize_t *len) 
     535{ 
     536    if ((sock_type!=SOCKTYPE_STREAM && sock_type!=SOCKTYPE_DATAGRAM) || 
     537        (sock_state!=SOCKSTATE_CONNECTED)) 
     538    { 
     539        return PJ_EINVALIDOP; 
     540    } 
     541 
     542    /* Sending for SOCKTYPE_DATAGRAM is implemented in pj_sock_sendto() */ 
     543    if (sock_type == SOCKTYPE_DATAGRAM) { 
     544        return SendTo(buf, len, &remote_addr); 
     545    } 
     546 
     547    return SendImp(buf, len); 
     548} 
     549 
     550 
     551pj_status_t PjUwpSocket::SendTo(const void *buf, pj_ssize_t *len, 
     552                                const pj_sockaddr_t *to) 
     553{ 
     554    if (sock_type != SOCKTYPE_DATAGRAM || sock_state < SOCKSTATE_INITIALIZED 
     555        || sock_state >= SOCKSTATE_DISCONNECTED) 
     556    { 
     557        return PJ_EINVALIDOP; 
     558    } 
     559 
     560    if (has_pending_send) 
     561        return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 
     562 
     563    if (*len > (pj_ssize_t)send_buffer->Capacity) 
     564        return PJ_ETOOBIG; 
     565 
     566    HostName ^hostname; 
     567    int port; 
     568    sockaddr_to_hostname_port(to, hostname, &port); 
     569 
     570    concurrency::cancellation_token_source cts; 
     571    auto cts_token = cts.get_token(); 
     572    auto t = concurrency::create_task(datagram_sock->GetOutputStreamAsync( 
     573                                      hostname, port.ToString()), cts_token); 
     574    pj_status_t status = PJ_SUCCESS; 
     575 
     576    cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 
     577        then([this, cts_token, &status](concurrency::task<IOutputStream^> t_) 
     578    { 
     579        try { 
     580            if (cts_token.is_canceled()) { 
     581                status = PJ_ETIMEDOUT; 
     582            } else { 
     583                IOutputStream^ outstream = t_.get(); 
     584                socket_writer = ref new DataWriter(outstream); 
     585            } 
     586        } catch (Exception^ e) { 
     587            status = PJ_RETURN_OS_ERROR(e->HResult); 
     588        } 
     589    }).get(); 
     590 
     591    if (status != PJ_SUCCESS) 
     592        return status; 
     593 
     594    status = SendImp(buf, len); 
     595    if ((status == PJ_SUCCESS || status == PJ_EPENDING) && 
     596        sock_state < SOCKSTATE_CONNECTED) 
     597    { 
     598        sock_state = SOCKSTATE_CONNECTED; 
     599    } 
     600 
     601    return status; 
     602} 
     603 
     604 
     605int PjUwpSocket::ConsumeReadBuffer(void *buf, int max_len) 
     606{ 
     607    if (socket_reader->UnconsumedBufferLength == 0) 
     608        return 0; 
     609 
     610    int read_len = PJ_MIN((int)socket_reader->UnconsumedBufferLength,max_len); 
     611    IBuffer^ buffer = socket_reader->ReadBuffer(read_len); 
     612    read_len = buffer->Length; 
     613    CopyFromIBuffer((unsigned char*)buf, read_len, buffer); 
     614    return read_len; 
     615} 
     616 
     617 
     618pj_status_t PjUwpSocket::Recv(void *buf, pj_ssize_t *len) 
     619{ 
     620    /* Only for TCP, at least for now! */ 
     621    if (sock_type == SOCKTYPE_DATAGRAM) 
     622        return PJ_ENOTSUP; 
     623 
     624    if (sock_type != SOCKTYPE_STREAM || sock_state != SOCKSTATE_CONNECTED) 
     625        return PJ_EINVALIDOP; 
     626 
     627    if (has_pending_recv) 
     628        return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 
     629 
     630    /* First check if there is already some data in the read buffer */ 
     631    if (buf) { 
     632        int avail_len = ConsumeReadBuffer(buf, *len); 
     633        if (avail_len > 0) { 
     634            *len = avail_len; 
     635            return PJ_SUCCESS; 
     636        } 
     637    } 
     638 
     639    /* Blocking version */ 
     640    if (is_blocking) { 
     641        pj_status_t status = PJ_SUCCESS; 
     642        concurrency::cancellation_token_source cts; 
     643        auto cts_token = cts.get_token(); 
     644        auto t = concurrency::create_task(socket_reader->LoadAsync(*len), 
     645                                            cts_token); 
     646        *len = cancel_after_timeout(t, cts, READ_TIMEOUT) 
     647            .then([this, len, buf, cts_token, &status] 
     648                                    (concurrency::task<unsigned int> t_) 
     649        { 
     650            try { 
     651                if (cts_token.is_canceled()) { 
     652                    status = PJ_ETIMEDOUT; 
     653                    return 0; 
     654                } 
     655                t_.get(); 
     656            } catch (Exception^) { 
     657                status = PJ_ETIMEDOUT; 
     658                return 0; 
     659            } 
     660 
     661            *len = ConsumeReadBuffer(buf, *len); 
     662            return (int)*len; 
     663        }).get(); 
     664 
     665        return status; 
     666    } 
     667 
     668    /* Non-blocking version */ 
     669 
     670    has_pending_recv = PJ_TRUE; 
     671    concurrency::create_task(socket_reader->LoadAsync(*len)) 
     672        .then([this](concurrency::task<unsigned int> t_) 
     673    { 
     674        try { 
     675            // catch any exception 
     676            t_.get(); 
     677            has_pending_recv = PJ_FALSE; 
     678 
     679            // invoke callback 
     680            int read_len = socket_reader->UnconsumedBufferLength; 
     681            if (read_len > 0 && cb.on_read) { 
     682                (*cb.on_read)(this, read_len); 
     683            } 
     684        } catch (Exception^ e) { 
     685            has_pending_recv = PJ_FALSE; 
     686 
     687            // invoke callback 
     688            if (cb.on_read) { 
     689                (*cb.on_read)(this, -PJ_EUNKNOWN); 
     690            } 
     691        } 
     692    }); 
     693 
     694    return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 
     695} 
     696 
     697 
     698pj_status_t PjUwpSocket::RecvFrom(void *buf, pj_ssize_t *len, 
     699                                  pj_sockaddr_t *from) 
     700{ 
     701    if (sock_type != SOCKTYPE_DATAGRAM || sock_state < SOCKSTATE_INITIALIZED 
     702        || sock_state >= SOCKSTATE_DISCONNECTED) 
     703    { 
     704        return PJ_EINVALIDOP; 
     705    } 
     706 
     707    /* Start receive, if not yet */ 
     708    if (dgram_recv_helper == nullptr) { 
     709        dgram_recv_helper = ref new PjUwpSocketDatagramRecvHelper(this); 
     710    } 
     711 
     712    /* Try to read any available data first */ 
     713    if (buf || is_blocking) { 
     714        pj_status_t status; 
     715        status = dgram_recv_helper->ReadDataIfAvailable(buf, len, from); 
     716        if (status != PJ_ENOTFOUND) 
     717            return status; 
     718    } 
     719 
     720    /* Blocking version */ 
     721    if (is_blocking) { 
     722        int max_loop = 0; 
     723        pj_status_t status = PJ_ENOTFOUND; 
     724        while (status == PJ_ENOTFOUND && sock_state <= SOCKSTATE_CONNECTED) 
     725        { 
     726            status = dgram_recv_helper->ReadDataIfAvailable(buf, len, from); 
     727            if (status != PJ_SUCCESS) 
     728                pj_thread_sleep(100); 
     729 
     730            if (++max_loop > 10) 
     731                return PJ_ETIMEDOUT; 
     732        } 
     733        return status; 
     734    } 
     735 
     736    /* For non-blocking version, just return PJ_EPENDING */ 
     737    return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 
     738} 
     739 
     740 
     741pj_status_t PjUwpSocket::Connect(const pj_sockaddr_t *addr) 
     742{ 
     743    pj_status_t status; 
     744 
     745    PJ_ASSERT_RETURN((sock_type == SOCKTYPE_UNKNOWN && sock_state == SOCKSTATE_NULL) || 
     746                     (sock_type == SOCKTYPE_DATAGRAM && sock_state == SOCKSTATE_INITIALIZED), 
     747                     PJ_EINVALIDOP); 
     748 
     749    if (sock_type == SOCKTYPE_UNKNOWN) { 
     750        InitSocket(SOCKTYPE_STREAM); 
     751        // No need to check pending bind, no bind for TCP client socket 
     752    } 
     753 
     754    pj_sockaddr_cp(&remote_addr, addr); 
     755 
     756    auto t = concurrency::create_task([this, addr]() 
     757    { 
     758        HostName ^hostname; 
     759        int port; 
     760        sockaddr_to_hostname_port(&remote_addr, hostname, &port); 
     761        if (sock_type == SOCKTYPE_STREAM) 
     762            return stream_sock->ConnectAsync(hostname, port.ToString(), 
     763                                      SocketProtectionLevel::PlainSocket); 
     764        else 
     765            return datagram_sock->ConnectAsync(hostname, port.ToString()); 
     766    }).then([=](concurrency::task<void> t_) 
     767    { 
     768        try { 
     769            t_.get(); 
     770 
     771            sock_state = SOCKSTATE_CONNECTED; 
     772 
     773            // Update local & remote address 
     774            HostName^ local_address; 
     775            String^ local_port; 
     776 
     777            if (sock_type == SOCKTYPE_STREAM) { 
     778                local_address = stream_sock->Information->LocalAddress; 
     779                local_port = stream_sock->Information->LocalPort; 
     780 
     781                socket_reader = ref new DataReader(stream_sock->InputStream); 
     782                socket_writer = ref new DataWriter(stream_sock->OutputStream); 
     783                socket_reader->InputStreamOptions = InputStreamOptions::Partial; 
     784            } else { 
     785                local_address = datagram_sock->Information->LocalAddress; 
     786                local_port = datagram_sock->Information->LocalPort; 
     787            } 
     788            if (local_address && local_port) { 
     789                wstr_addr_to_sockaddr(local_address->CanonicalName->Data(), 
     790                    local_port->Data(), 
     791                    &local_addr); 
     792            } 
     793 
     794            if (!is_blocking && cb.on_connect) { 
     795                (*cb.on_connect)(this, PJ_SUCCESS); 
     796            } 
     797            return (pj_status_t)PJ_SUCCESS; 
     798 
     799        } catch (Exception^ ex) { 
     800 
     801            SocketErrorStatus status = SocketError::GetStatus(ex->HResult); 
     802 
     803            switch (status) 
     804            { 
     805            case SocketErrorStatus::UnreachableHost: 
     806                break; 
     807            case SocketErrorStatus::ConnectionTimedOut: 
     808                break; 
     809            case SocketErrorStatus::ConnectionRefused: 
     810                break; 
     811            default: 
     812                break; 
     813            } 
     814 
     815            if (!is_blocking && cb.on_connect) { 
     816                (*cb.on_connect)(this, PJ_EUNKNOWN); 
     817            } 
     818 
     819            return (pj_status_t)PJ_EUNKNOWN; 
     820        } 
     821    }); 
     822 
     823    if (!is_blocking) 
     824        return PJ_RETURN_OS_ERROR(PJ_BLOCKING_CONNECT_ERROR_VAL); 
     825 
     826    try { 
     827        status = t.get(); 
     828    } catch (Exception^) { 
     829        return PJ_EUNKNOWN; 
     830    } 
     831    return status; 
     832} 
     833 
     834pj_status_t PjUwpSocket::Listen() 
     835{ 
     836    PJ_ASSERT_RETURN((sock_type == SOCKTYPE_UNKNOWN) || 
     837                     (sock_type == SOCKTYPE_LISTENER && 
     838                      sock_state == SOCKSTATE_INITIALIZED), 
     839                     PJ_EINVALIDOP); 
     840 
     841    if (sock_type == SOCKTYPE_UNKNOWN) 
     842        InitSocket(SOCKTYPE_LISTENER); 
     843 
     844    if (has_pending_bind) 
     845        Bind(); 
     846 
     847    /* Start listen */ 
     848    if (listener_helper == nullptr) { 
     849        listener_helper = ref new PjUwpSocketListenerHelper(this); 
     850    } 
     851 
     852    return PJ_SUCCESS; 
     853} 
     854 
     855pj_status_t PjUwpSocket::Accept(PjUwpSocket **new_sock) 
     856{ 
     857    if (sock_type != SOCKTYPE_LISTENER || sock_state != SOCKSTATE_INITIALIZED) 
     858        return PJ_EINVALIDOP; 
     859 
     860    StreamSocket^ accepted_sock; 
     861    pj_status_t status = listener_helper->GetAcceptedSocket(accepted_sock); 
     862    if (status == PJ_ENOTFOUND) 
     863        return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 
     864 
     865    if (status != PJ_SUCCESS) 
     866        return status; 
     867 
     868    *new_sock = CreateAcceptSocket(accepted_sock); 
     869    return PJ_SUCCESS; 
     870} 
    401871 
    402872 
     
    7181188    PJ_ASSERT_RETURN(sock, PJ_EINVAL); 
    7191189    PJ_ASSERT_RETURN(addr && len>=(int)sizeof(pj_sockaddr_in), PJ_EINVAL); 
    720  
    7211190    PjUwpSocket *s = (PjUwpSocket*)sock; 
    722  
    723     if (s->sock_state > SOCKSTATE_INITIALIZED) 
    724         return PJ_EINVALIDOP; 
    725  
    726     pj_sockaddr_cp(&s->local_addr, addr); 
    727  
    728     /* Bind now if this is UDP. But if it is TCP, unfortunately we don't 
    729      * know yet whether it is SocketStream or Listener! 
    730      */ 
    731     if (s->type == pj_SOCK_DGRAM()) { 
    732         HRESULT err = 0; 
    733  
    734         try { 
    735             concurrency::create_task([s, addr]() { 
    736                 HostName ^hostname; 
    737                 int port; 
    738                 sockaddr_to_hostname_port(addr, hostname, &port); 
    739                 if (pj_sockaddr_has_addr(addr)) { 
    740                     s->datagram_sock->BindEndpointAsync(hostname,  
    741                                                         port.ToString()); 
    742                 } else if (pj_sockaddr_get_port(addr) != 0) { 
    743                     s->datagram_sock->BindServiceNameAsync(port.ToString()); 
    744                 } 
    745             }).then([s, &err](concurrency::task<void> t) 
    746             { 
    747                 try { 
    748                     t.get(); 
    749                     s->sock_state = SOCKSTATE_CONNECTED; 
    750                 } catch (Exception^ e) { 
    751                     err = e->HResult; 
    752                 } 
    753             }).get(); 
    754         } catch (Exception^ e) { 
    755             err = e->HResult; 
    756         } 
    757  
    758         return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS); 
    759     } 
    760  
    761     return PJ_SUCCESS; 
     1191    return s->Bind(addr); 
    7621192} 
    7631193 
     
    8121242 
    8131243    PjUwpSocket *s = (PjUwpSocket*)sock; 
    814  
    815     pj_sockaddr_cp(addr, &s->remote_addr); 
    816     *namelen = pj_sockaddr_get_len(&s->remote_addr); 
     1244    pj_sockaddr_cp(addr, s->GetRemoteAddr()); 
     1245    *namelen = pj_sockaddr_get_len(addr); 
    8171246 
    8181247    return PJ_SUCCESS; 
     
    8311260 
    8321261    PjUwpSocket *s = (PjUwpSocket*)sock; 
    833  
    834     pj_sockaddr_cp(addr, &s->local_addr); 
    835     *namelen = pj_sockaddr_get_len(&s->local_addr); 
     1262    pj_sockaddr_cp(addr, s->GetLocalAddr()); 
     1263    *namelen = pj_sockaddr_get_len(addr); 
    8361264 
    8371265    return PJ_SUCCESS; 
    8381266} 
    8391267 
    840  
    841 static pj_status_t sock_send_imp(PjUwpSocket *s, const void *buf, 
    842                                  pj_ssize_t *len) 
    843 { 
    844     if (s->is_busy_sending) 
    845         return PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK); 
    846  
    847     if (*len > (pj_ssize_t)s->send_buffer->Capacity) 
    848         return PJ_ETOOBIG; 
    849  
    850     CopyToIBuffer((unsigned char*)buf, *len, s->send_buffer); 
    851     s->send_buffer->Length = *len; 
    852     s->socket_writer->WriteBuffer(s->send_buffer); 
    853  
    854     if (s->is_blocking) { 
    855         pj_status_t status = PJ_SUCCESS; 
    856         concurrency::cancellation_token_source cts; 
    857         auto cts_token = cts.get_token(); 
    858         auto t = concurrency::create_task(s->socket_writer->StoreAsync(), 
    859                                           cts_token); 
    860         *len = cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 
    861             then([cts_token, &status](concurrency::task<unsigned int> t_) 
    862         { 
    863             int sent = 0; 
    864             try { 
    865                 if (cts_token.is_canceled()) 
    866                     status = PJ_ETIMEDOUT; 
    867                 else 
    868                     sent = t_.get(); 
    869             } catch (Exception^ e) { 
    870                 status = PJ_RETURN_OS_ERROR(e->HResult); 
    871             } 
    872             return sent; 
    873         }).get(); 
    874  
    875         return status; 
    876     }  
    877  
    878     s->is_busy_sending = true; 
    879     concurrency::create_task(s->socket_writer->StoreAsync()). 
    880         then([s](concurrency::task<unsigned int> t_) 
    881     { 
    882         try { 
    883             unsigned int l = t_.get(); 
    884             s->is_busy_sending = false; 
    885  
    886             // invoke callback 
    887             if (s->on_write) { 
    888                 (*s->on_write)(s, l); 
    889             } 
    890         } catch (Exception^ e) { 
    891             s->is_busy_sending = false; 
    892             if (s->sock_type == SOCKTYPE_STREAM) 
    893                 s->sock_state = SOCKSTATE_DISCONNECTED; 
    894  
    895             // invoke callback 
    896             if (s->on_write) { 
    897                 (*s->on_write)(s, -PJ_RETURN_OS_ERROR(e->HResult)); 
    898             } 
    899         } 
    900     }); 
    901  
    902     return PJ_EPENDING; 
    903 } 
    9041268 
    9051269/* 
     
    9161280 
    9171281    PjUwpSocket *s = (PjUwpSocket*)sock; 
    918  
    919     if ((s->sock_type!=SOCKTYPE_STREAM && s->sock_type!=SOCKTYPE_DATAGRAM) || 
    920         (s->sock_state!=SOCKSTATE_CONNECTED)) 
    921     { 
    922         return PJ_EINVALIDOP; 
    923     } 
    924   
    925     /* Sending for SOCKTYPE_DATAGRAM is implemented in pj_sock_sendto() */ 
    926     if (s->sock_type == SOCKTYPE_DATAGRAM) { 
    927         return pj_sock_sendto(sock, buf, len, flags, &s->remote_addr, 
    928                               pj_sockaddr_get_len(&s->remote_addr)); 
    929     } 
    930  
    931     return sock_send_imp(s, buf, len); 
     1282    return s->Send(buf, len); 
    9321283} 
    9331284 
     
    9491300     
    9501301    PjUwpSocket *s = (PjUwpSocket*)sock; 
    951  
    952     if (s->sock_type != SOCKTYPE_DATAGRAM || 
    953         s->sock_state < SOCKSTATE_INITIALIZED) 
    954     { 
    955         return PJ_EINVALIDOP; 
    956     } 
    957  
    958     if (s->is_busy_sending) 
    959         return PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK); 
    960  
    961     if (*len > (pj_ssize_t)s->send_buffer->Capacity) 
    962         return PJ_ETOOBIG; 
    963  
    964     HostName ^hostname; 
    965     int port; 
    966     sockaddr_to_hostname_port(to, hostname, &port); 
    967  
    968     concurrency::cancellation_token_source cts; 
    969     auto cts_token = cts.get_token(); 
    970     auto t = concurrency::create_task( 
    971                 s->datagram_sock->GetOutputStreamAsync( 
    972                     hostname, port.ToString()), cts_token); 
    973     pj_status_t status = PJ_SUCCESS; 
    974  
    975     cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 
    976         then([s, cts_token, &status](concurrency::task<IOutputStream^> t_) 
    977     { 
    978         try { 
    979             if (cts_token.is_canceled()) { 
    980                 status = PJ_ETIMEDOUT; 
    981             } else { 
    982                 IOutputStream^ outstream = t_.get(); 
    983                 s->socket_writer = ref new DataWriter(outstream); 
    984             } 
    985         } catch (Exception^ e) { 
    986             status = PJ_RETURN_OS_ERROR(e->HResult); 
    987         } 
    988     }).get(); 
    989  
    990     if (status != PJ_SUCCESS) 
    991         return status; 
    992  
    993     status = sock_send_imp(s, buf, len); 
    994  
    995     if ((status == PJ_SUCCESS || status == PJ_EPENDING) && 
    996         s->sock_state < SOCKSTATE_CONNECTED) 
    997     { 
    998         s->sock_state = SOCKSTATE_CONNECTED; 
    999     } 
    1000  
    1001     return status; 
    1002 } 
    1003  
    1004  
    1005 static int consume_read_buffer(PjUwpSocket *s, void *buf, int max_len) 
    1006 { 
    1007     if (s->socket_reader->UnconsumedBufferLength == 0) 
    1008         return 0; 
    1009  
    1010     int read_len = PJ_MIN((int)s->socket_reader->UnconsumedBufferLength, 
    1011                           max_len); 
    1012     IBuffer^ buffer = s->socket_reader->ReadBuffer(read_len); 
    1013     read_len = buffer->Length; 
    1014     CopyFromIBuffer((unsigned char*)buf, read_len, buffer); 
    1015     return read_len; 
     1302    return s->SendTo(buf, len, to); 
    10161303} 
    10171304 
     
    10311318 
    10321319    PjUwpSocket *s = (PjUwpSocket*)sock; 
    1033  
    1034     /* Only for TCP, at least for now! */ 
    1035     if (s->sock_type == SOCKTYPE_DATAGRAM) 
    1036         return PJ_ENOTSUP; 
    1037  
    1038     if (s->sock_type != SOCKTYPE_STREAM || 
    1039         s->sock_state != SOCKSTATE_CONNECTED) 
    1040     { 
    1041         return PJ_EINVALIDOP; 
    1042     } 
    1043  
    1044     /* First check if there is already some data in the read buffer */ 
    1045     int avail_len = consume_read_buffer(s, buf, *len); 
    1046     if (avail_len > 0) { 
    1047         *len = avail_len; 
    1048         return PJ_SUCCESS; 
    1049     } 
    1050  
    1051     /* Start sync read */ 
    1052     if (s->is_blocking) { 
    1053         pj_status_t status = PJ_SUCCESS; 
    1054         concurrency::cancellation_token_source cts; 
    1055         auto cts_token = cts.get_token(); 
    1056         auto t = concurrency::create_task(s->socket_reader->LoadAsync(*len), cts_token); 
    1057         *len = cancel_after_timeout(t, cts, READ_TIMEOUT) 
    1058                     .then([s, len, buf, cts_token, &status](concurrency::task<unsigned int> t_) 
    1059         { 
    1060             try { 
    1061                 if (cts_token.is_canceled()) { 
    1062                     status = PJ_ETIMEDOUT; 
    1063                     return 0; 
    1064                 } 
    1065                 t_.get(); 
    1066             } catch (Exception^) { 
    1067                 status = PJ_ETIMEDOUT; 
    1068                 return 0; 
    1069             } 
    1070  
    1071             *len = consume_read_buffer(s, buf, *len); 
    1072             return (int)*len; 
    1073         }).get(); 
    1074  
    1075         return status; 
    1076     } 
    1077  
    1078     /* Start async read */ 
    1079     int read_len = *len; 
    1080     concurrency::create_task(s->socket_reader->LoadAsync(read_len)) 
    1081         .then([s, &read_len](concurrency::task<unsigned int> t_) 
    1082     { 
    1083         try { 
    1084             // catch any exception 
    1085             t_.get(); 
    1086  
    1087             // invoke callback 
    1088             read_len = PJ_MIN((int)s->socket_reader->UnconsumedBufferLength, 
    1089                               read_len); 
    1090             if (read_len > 0 && s->on_read) { 
    1091                 (*s->on_read)(s, read_len); 
    1092             } 
    1093         } catch (Exception^ e) { 
    1094             // invoke callback 
    1095             if (s->on_read) { 
    1096                 (*s->on_read)(s, -PJ_RETURN_OS_ERROR(e->HResult)); 
    1097             } 
    1098             return 0; 
    1099         } 
    1100  
    1101         return (int)read_len; 
    1102     }); 
    1103  
    1104     return PJ_EPENDING; 
     1320    return s->Recv(buf, len); 
    11051321} 
    11061322 
     
    11231339 
    11241340    PjUwpSocket *s = (PjUwpSocket*)sock; 
    1125  
    1126     if (s->sock_type != SOCKTYPE_DATAGRAM || 
    1127         s->sock_state < SOCKSTATE_INITIALIZED) 
    1128     { 
    1129         return PJ_EINVALIDOP; 
    1130     } 
    1131  
    1132     /* Start receive, if not yet */ 
    1133     if (s->datagram_recv_helper == nullptr) { 
    1134         s->datagram_recv_helper = ref new PjUwpSocketDatagramRecvHelper(s); 
    1135     } 
    1136  
    1137     /* Try to read any available data first */ 
    1138     pj_status_t status = s->datagram_recv_helper-> 
    1139                                         ReadDataIfAvailable(buf, len, from); 
    1140     if (status != PJ_ENOTFOUND) 
    1141         return status; 
    1142  
    1143     /* Start sync read */ 
    1144     if (s->is_blocking) { 
    1145         while (status == PJ_ENOTFOUND && s->sock_state <= SOCKSTATE_CONNECTED) 
    1146         { 
    1147             status = s->datagram_recv_helper-> 
    1148                                         ReadDataIfAvailable(buf, len, from); 
    1149             pj_thread_sleep(100); 
    1150         } 
    1151         return PJ_SUCCESS; 
    1152     } 
    1153  
    1154     /* For async read, just return PJ_EPENDING */ 
    1155     return PJ_EPENDING; 
     1341    pj_status_t status = s->RecvFrom(buf, len, from); 
     1342    if (status == PJ_SUCCESS) 
     1343        *fromlen = pj_sockaddr_get_len(from); 
     1344    return status; 
    11561345} 
    11571346 
     
    12221411} 
    12231412 
    1224 static pj_status_t tcp_bind(PjUwpSocket *s) 
    1225 { 
    1226     /* If no bound address is set, just return */ 
    1227     if (!pj_sockaddr_has_addr(&s->local_addr) && 
    1228         pj_sockaddr_get_port(&s->local_addr)==0) 
    1229     { 
    1230         return PJ_SUCCESS; 
    1231     } 
    1232  
    1233     HRESULT err = 0;     
    1234  
    1235     try { 
    1236         concurrency::create_task([s]() { 
    1237             HostName ^hostname; 
    1238             int port; 
    1239             sockaddr_to_hostname_port(&s->local_addr, hostname, &port); 
    1240  
    1241             s->listener_sock->BindEndpointAsync(hostname,  
    1242                                                 port.ToString()); 
    1243         }).then([s, &err](concurrency::task<void> t) 
    1244         { 
    1245             try { 
    1246                 t.get(); 
    1247                 s->sock_state = SOCKSTATE_CONNECTED; 
    1248             } catch (Exception^ e) { 
    1249                 err = e->HResult; 
    1250             } 
    1251         }).get(); 
    1252     } catch (Exception^ e) { 
    1253         err = e->HResult; 
    1254     } 
    1255  
    1256     return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS); 
    1257 } 
    1258  
    12591413 
    12601414/* 
     
    12671421    PJ_CHECK_STACK(); 
    12681422    PJ_ASSERT_RETURN(sock && addr, PJ_EINVAL); 
    1269  
    12701423    PJ_UNUSED_ARG(namelen); 
    12711424 
    12721425    PjUwpSocket *s = (PjUwpSocket*)sock; 
    1273     pj_status_t status; 
    1274  
    1275     pj_sockaddr_cp(&s->remote_addr, addr); 
    1276  
    1277     /* UDP */ 
    1278  
    1279     if (s->sock_type == SOCKTYPE_DATAGRAM) { 
    1280         if (s->sock_state != SOCKSTATE_INITIALIZED) 
    1281             return PJ_EINVALIDOP; 
    1282          
    1283         HostName ^hostname; 
    1284         int port; 
    1285         sockaddr_to_hostname_port(addr, hostname, &port); 
    1286  
    1287         try { 
    1288             concurrency::create_task(s->datagram_sock->ConnectAsync 
    1289                                                    (hostname, port.ToString())) 
    1290                 .then([s](concurrency::task<void> t_) 
    1291             { 
    1292                 try { 
    1293                     t_.get(); 
    1294                 } catch (Exception^ ex)  
    1295                 { 
    1296                  
    1297                 } 
    1298             }).get(); 
    1299         } catch (Exception^) { 
    1300             return PJ_EUNKNOWN; 
    1301         } 
    1302  
    1303         // Update local & remote address 
    1304         wstr_addr_to_sockaddr(s->datagram_sock->Information->RemoteAddress->CanonicalName->Data(), 
    1305                               s->datagram_sock->Information->RemotePort->Data(), 
    1306                               &s->remote_addr); 
    1307         wstr_addr_to_sockaddr(s->datagram_sock->Information->LocalAddress->CanonicalName->Data(), 
    1308                               s->datagram_sock->Information->LocalPort->Data(), 
    1309                               &s->local_addr); 
    1310  
    1311         s->sock_state = SOCKSTATE_CONNECTED; 
    1312          
    1313         return PJ_SUCCESS; 
    1314     } 
    1315  
    1316     /* TCP */ 
    1317  
    1318     /* Init stream socket now */ 
    1319     s->InitSocket(SOCKTYPE_STREAM); 
    1320  
    1321     pj_sockaddr_cp(&s->remote_addr, addr); 
    1322     wstr_addr_to_sockaddr(s->stream_sock->Information->LocalAddress->CanonicalName->Data(), 
    1323                           s->stream_sock->Information->LocalPort->Data(), 
    1324                           &s->local_addr); 
    1325  
    1326     /* Perform any pending bind */ 
    1327     status = tcp_bind(s); 
    1328     if (status != PJ_SUCCESS) 
    1329         return status; 
    1330  
    1331     char tmp[PJ_INET6_ADDRSTRLEN]; 
    1332     wchar_t wtmp[PJ_INET6_ADDRSTRLEN]; 
    1333     pj_sockaddr_print(addr, tmp, PJ_INET6_ADDRSTRLEN, 0); 
    1334     pj_ansi_to_unicode(tmp, pj_ansi_strlen(tmp), wtmp, 
    1335                        PJ_INET6_ADDRSTRLEN); 
    1336     auto host = ref new HostName(ref new String(wtmp)); 
    1337     int port = pj_sockaddr_get_port(addr); 
    1338  
    1339     auto t = concurrency::create_task(s->stream_sock->ConnectAsync 
    1340              (host, port.ToString(), SocketProtectionLevel::PlainSocket)) 
    1341              .then([=](concurrency::task<void> t_) 
    1342     { 
    1343         try { 
    1344             t_.get(); 
    1345             s->socket_reader = ref new DataReader(s->stream_sock->InputStream); 
    1346             s->socket_writer = ref new DataWriter(s->stream_sock->OutputStream); 
    1347  
    1348             // Update local & remote address 
    1349             wstr_addr_to_sockaddr(s->stream_sock->Information->RemoteAddress->CanonicalName->Data(), 
    1350                                   s->stream_sock->Information->RemotePort->Data(), 
    1351                                   &s->remote_addr); 
    1352             wstr_addr_to_sockaddr(s->stream_sock->Information->LocalAddress->CanonicalName->Data(), 
    1353                                   s->stream_sock->Information->LocalPort->Data(), 
    1354                                   &s->local_addr); 
    1355  
    1356             s->sock_state = SOCKSTATE_CONNECTED; 
    1357  
    1358             if (!s->is_blocking && s->on_connect) { 
    1359                 (*s->on_connect)(s, PJ_SUCCESS); 
    1360             } 
    1361             return (pj_status_t)PJ_SUCCESS; 
    1362         } catch (Exception^ ex) { 
    1363             SocketErrorStatus status = SocketError::GetStatus(ex->HResult); 
    1364  
    1365             switch (status) 
    1366             { 
    1367             case SocketErrorStatus::UnreachableHost: 
    1368                 break; 
    1369             case SocketErrorStatus::ConnectionTimedOut: 
    1370                 break; 
    1371             case SocketErrorStatus::ConnectionRefused: 
    1372                 break; 
    1373             default: 
    1374                 break; 
    1375             } 
    1376  
    1377             if (!s->is_blocking && s->on_connect) { 
    1378                 (*s->on_connect)(s, PJ_EUNKNOWN); 
    1379             } 
    1380  
    1381             return (pj_status_t)PJ_EUNKNOWN; 
    1382         } 
    1383     }); 
    1384  
    1385     if (!s->is_blocking) 
    1386         return PJ_EPENDING; 
    1387  
    1388     try { 
    1389         status = t.get(); 
    1390     } catch (Exception^) { 
    1391         return PJ_EUNKNOWN; 
    1392     } 
    1393     return status; 
     1426    return s->Connect(addr); 
    13941427} 
    13951428 
     
    14201453 
    14211454    PjUwpSocket *s = (PjUwpSocket*)sock; 
    1422     pj_status_t status; 
    1423  
    1424     /* Init listener socket now */ 
    1425     s->InitSocket(SOCKTYPE_LISTENER); 
    1426  
    1427     /* Perform any pending bind */ 
    1428     status = tcp_bind(s); 
    1429     if (status != PJ_SUCCESS) 
    1430         return status; 
    1431  
    1432     /* Start listen */ 
    1433     if (s->listener_helper == nullptr) { 
    1434         s->listener_helper = ref new PjUwpSocketListenerHelper(s); 
    1435     } 
    1436  
    1437     return PJ_SUCCESS; 
     1455    return s->Listen(); 
    14381456} 
    14391457 
     
    14461464                                    int *addrlen) 
    14471465{ 
    1448     PJ_CHECK_STACK(); 
    1449  
     1466    pj_status_t status; 
     1467 
     1468    PJ_CHECK_STACK(); 
    14501469    PJ_ASSERT_RETURN(serverfd && newsock, PJ_EINVAL); 
    14511470 
    14521471    PjUwpSocket *s = (PjUwpSocket*)serverfd; 
    1453  
    1454     if (s->sock_type != SOCKTYPE_LISTENER || 
    1455         s->sock_state != SOCKSTATE_INITIALIZED) 
    1456     { 
    1457         return PJ_EINVALIDOP; 
    1458     } 
    1459  
    1460     StreamSocket^ accepted_sock; 
    1461     pj_status_t status = s->listener_helper->GetAcceptedSocket(accepted_sock); 
    1462     if (status == PJ_ENOTFOUND) 
    1463         return PJ_EPENDING; 
    1464  
     1472    PjUwpSocket *new_uwp_sock; 
     1473 
     1474    status = s->Accept(&new_uwp_sock); 
    14651475    if (status != PJ_SUCCESS) 
    14661476        return status; 
    1467  
    1468     PjUwpSocket *new_sock = s->CreateAcceptSocket(accepted_sock); 
    1469  
    1470     pj_sockaddr_cp(addr, &new_sock->remote_addr); 
    1471     *addrlen = pj_sockaddr_get_len(&new_sock->remote_addr); 
    1472     *newsock = (pj_sock_t)new_sock; 
     1477    if (newsock == NULL) 
     1478        return PJ_ENOTFOUND; 
     1479 
     1480    *newsock = (pj_sock_t)new_uwp_sock; 
     1481    pj_sockaddr_cp(addr, new_uwp_sock->GetRemoteAddr()); 
     1482    *addrlen = pj_sockaddr_get_len(addr); 
    14731483 
    14741484    return PJ_SUCCESS; 
  • pjproject/branches/projects/uwp/pjlib/src/pj/sock_uwp.h

    r5254 r5256  
    1919#pragma once 
    2020 
     21 
     22#include <pj/assert.h> 
     23#include <pj/sock.h> 
     24#include <pj/string.h> 
     25#include <pj/unicode.h> 
     26 
     27 
    2128enum { 
    2229    READ_TIMEOUT        = 60 * 1000, 
     
    3744ref class PjUwpSocketDatagramRecvHelper; 
    3845ref class PjUwpSocketListenerHelper; 
     46class PjUwpSocket; 
     47 
     48 
     49typedef struct PjUwpSocketCallback 
     50{ 
     51    void (*on_read)(PjUwpSocket *s, int bytes_read); 
     52    void (*on_write)(PjUwpSocket *s, int bytes_sent); 
     53    void (*on_accept)(PjUwpSocket *s); 
     54    void (*on_connect)(PjUwpSocket *s, pj_status_t status); 
     55} PjUwpSocketCallback; 
     56 
    3957 
    4058/* 
     
    4563public: 
    4664    PjUwpSocket(int af_, int type_, int proto_); 
     65    virtual ~PjUwpSocket(); 
     66    pj_status_t InitSocket(enum PjUwpSocketType sock_type_); 
     67    void DeinitSocket(); 
     68 
     69    void* GetUserData() { return user_data; } 
     70    void SetNonBlocking(const PjUwpSocketCallback *cb_, void *user_data_) 
     71    { 
     72        is_blocking = PJ_FALSE; 
     73        cb=*cb_; 
     74        user_data = user_data_; 
     75    } 
     76 
     77    enum PjUwpSocketType GetType() { return sock_type; } 
     78    enum PjUwpSocketState GetState() { return sock_state; } 
     79 
     80    pj_sockaddr* GetLocalAddr() { return &local_addr; } 
     81    pj_sockaddr* GetRemoteAddr() { return &remote_addr; } 
     82 
     83 
     84    pj_status_t Bind(const pj_sockaddr_t *addr = NULL); 
     85    pj_status_t Send(const void *buf, pj_ssize_t *len); 
     86    pj_status_t SendTo(const void *buf, pj_ssize_t *len, const pj_sockaddr_t *to); 
     87    pj_status_t Recv(void *buf, pj_ssize_t *len); 
     88    pj_status_t RecvFrom(void *buf, pj_ssize_t *len, pj_sockaddr_t *from); 
     89    pj_status_t Connect(const pj_sockaddr_t *addr); 
     90    pj_status_t Listen(); 
     91    pj_status_t Accept(PjUwpSocket **new_sock); 
     92 
     93    void (*on_read)(PjUwpSocket *s, int bytes_read); 
     94    void (*on_write)(PjUwpSocket *s, int bytes_sent); 
     95    void (*on_accept)(PjUwpSocket *s, pj_status_t status); 
     96    void (*on_connect)(PjUwpSocket *s, pj_status_t status); 
     97 
     98private: 
    4799    PjUwpSocket* CreateAcceptSocket(Windows::Networking::Sockets::StreamSocket^ stream_sock_); 
    48     virtual ~PjUwpSocket(); 
    49  
    50     pj_status_t InitSocket(enum PjUwpSocketType sock_type_); 
    51  
    52 public: 
     100    pj_status_t SendImp(const void *buf, pj_ssize_t *len); 
     101    int ConsumeReadBuffer(void *buf, int max_len); 
     102 
    53103    int af; 
    54104    int type; 
     
    57107    pj_sockaddr remote_addr; 
    58108    pj_bool_t is_blocking; 
     109    pj_bool_t has_pending_bind; 
     110    pj_bool_t has_pending_send; 
     111    pj_bool_t has_pending_recv; 
    59112    void *user_data; 
     113    PjUwpSocketCallback cb; 
    60114 
    61115    enum PjUwpSocketType sock_type; 
     
    66120     
    67121    /* Helper objects */ 
    68     PjUwpSocketDatagramRecvHelper^ datagram_recv_helper; 
     122    PjUwpSocketDatagramRecvHelper^ dgram_recv_helper; 
    69123    PjUwpSocketListenerHelper^ listener_helper; 
    70124 
     
    72126    Windows::Storage::Streams::DataWriter^ socket_writer; 
    73127    Windows::Storage::Streams::IBuffer^ send_buffer; 
    74     pj_bool_t is_busy_sending; 
    75  
    76     void *read_userdata; 
    77     void *write_userdata; 
    78     void *accept_userdata; 
    79  
    80     void (*on_read)(PjUwpSocket *s, int bytes_read); 
    81     void (*on_write)(PjUwpSocket *s, int bytes_sent); 
    82     void (*on_accept)(PjUwpSocket *s, pj_status_t status); 
    83     void (*on_connect)(PjUwpSocket *s, pj_status_t status); 
     128 
     129    friend PjUwpSocketDatagramRecvHelper; 
     130    friend PjUwpSocketListenerHelper; 
    84131}; 
    85132 
     
    93140                                         pj_sockaddr_t *sockaddr) 
    94141{ 
     142#if 0 
    95143    char tmp_str_buf[PJ_INET6_ADDRSTRLEN+1]; 
    96144    pj_assert(wcslen(waddr) < sizeof(tmp_str_buf)); 
     
    102150 
    103151    return PJ_SUCCESS; 
     152#endif 
     153    char tmp_str_buf[PJ_INET6_ADDRSTRLEN+1]; 
     154    pj_assert(wcslen(waddr) < sizeof(tmp_str_buf)); 
     155    pj_unicode_to_ansi(waddr, wcslen(waddr), tmp_str_buf, sizeof(tmp_str_buf)); 
     156    pj_str_t remote_host; 
     157    pj_strset(&remote_host, tmp_str_buf, pj_ansi_strlen(tmp_str_buf)); 
     158    pj_sockaddr *addr = (pj_sockaddr*)sockaddr; 
     159    pj_bool_t got_addr = PJ_FALSE; 
     160 
     161    if (pj_inet_pton(PJ_AF_INET, &remote_host, &addr->ipv4.sin_addr) 
     162        == PJ_SUCCESS) 
     163    { 
     164        addr->addr.sa_family = PJ_AF_INET; 
     165        got_addr = PJ_TRUE; 
     166    } else if (pj_inet_pton(PJ_AF_INET6, &remote_host, &addr->ipv6.sin6_addr) 
     167        == PJ_SUCCESS) 
     168    { 
     169        addr->addr.sa_family = PJ_AF_INET6; 
     170        got_addr = PJ_TRUE; 
     171    } 
     172    if (!got_addr) 
     173        return PJ_EINVAL; 
     174 
     175    pj_sockaddr_set_port(addr, (pj_uint16_t)_wtoi(wport)); 
     176    return PJ_SUCCESS; 
    104177} 
    105178 
  • pjproject/branches/projects/uwp/pjmedia/src/pjmedia/transport_udp.c

    r4538 r5256  
    753753 
    754754    if (udp->attached) { 
     755        int i; 
     756 
    755757        /* Lock the ioqueue keys to make sure that callbacks are 
    756758         * not executed. See ticket #460 for details. 
     
    772774        udp->rtcp_cb = NULL; 
    773775        udp->user_data = NULL; 
     776 
     777        /* Cancel any outstanding send */ 
     778        for (i=0; i<PJ_ARRAY_SIZE(udp->rtp_pending_write); ++i) { 
     779            pj_ioqueue_post_completion(udp->rtp_key, 
     780                                       &udp->rtp_pending_write[i].op_key, 0); 
     781        } 
     782        pj_ioqueue_post_completion(udp->rtcp_key, &udp->rtcp_write_op, 0); 
    774783 
    775784        /* Unlock keys */ 
  • pjproject/branches/projects/uwp/pjsip-apps/src/pjsua/winrt/gui/uwp/VoipBackEnd/MyApp.cpp

    r5254 r5256  
    471471 
    472472    try { 
     473        sipTpConfig->port = 5060; 
    473474        ep.transportCreate(::pjsip_transport_type_e::PJSIP_TRANSPORT_TCP, 
    474475                           *sipTpConfig); 
     
    583584bool MyAppRT::isThreadRegistered() 
    584585{ 
    585     return ep.libIsThreadRegistered(); 
     586    // Some threads are registered using PJLIB API, ep.libIsThreadRegistered() will return false on those threads. 
     587    //return ep.libIsThreadRegistered(); 
     588    return pj_thread_is_registered() != PJ_FALSE; 
    586589} 
    587590 
Note: See TracChangeset for help on using the changeset viewer.