Changeset 5256
- Timestamp:
- Mar 11, 2016 4:17:32 AM (9 years ago)
- Location:
- pjproject/branches/projects/uwp
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/branches/projects/uwp/pjlib/src/pj/ioqueue_common_abs.c
r5210 r5256 117 117 #if !PJ_IOQUEUE_HAS_SAFE_UNREG 118 118 rc = pj_lock_create_simple_mutex(pool, NULL, &key->lock); 119 #endif120 119 if (rc != PJ_SUCCESS) 121 120 return rc; 121 #endif 122 122 123 123 /* Group lock */ -
pjproject/branches/projects/uwp/pjlib/src/pj/ioqueue_common_abs.h
r4359 r5256 81 81 union operation_key 82 82 { 83 struct generic_operation generic ;83 struct generic_operation generic_op; 84 84 struct read_operation read; 85 85 struct write_operation write; -
pjproject/branches/projects/uwp/pjlib/src/pj/ioqueue_uwp.cpp
r5254 r5256 17 17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 18 18 */ 19 /*20 19 #include <pj/ioqueue.h> 21 #include <pj/assert.h>22 20 #include <pj/errno.h> 23 #include <pj/list.h>24 #include <pj/lock.h>25 #include <pj/pool.h>26 #include <pj/string.h>27 28 #include <pj/ioqueue.h>29 #include <pj/os.h>30 21 #include <pj/lock.h> 31 22 #include <pj/log.h> 32 #include <pj/list.h>33 #include <pj/pool.h>34 #include <pj/string.h>35 #include <pj/assert.h>36 #include <pj/sock.h>37 #include <pj/compat/socket.h>38 #include <pj/sock_select.h>39 #include <pj/sock_qos.h>40 #include <pj/errno.h>41 #include <pj/rand.h>42 */43 44 #include <pj/ioqueue.h>45 #include <pj/sock.h>46 #include <pj/addr_resolv.h>47 #include <pj/assert.h>48 #include <pj/errno.h>49 #include <pj/lock.h>50 #include <pj/math.h>51 23 #include <pj/os.h> 52 24 #include <pj/pool.h> 53 #include <pj/string.h>54 #include <pj/unicode.h>55 #include <pj/compat/socket.h>56 25 57 26 #include <ppltasks.h> 58 27 #include <string> 59 28 29 #define THIS_FILE "ioq_uwp" 30 60 31 #include "sock_uwp.h" 61 62 63 /* 64 * IO Queue structure. 65 */ 32 #include "ioqueue_common_abs.h" 33 34 /* 35 * This describes each key. 36 */ 37 struct pj_ioqueue_key_t 38 { 39 DECLARE_COMMON_KEY 40 }; 41 42 /* 43 * This describes the I/O queue itself. 44 */ 66 45 struct pj_ioqueue_t 67 46 { 68 int dummy; 47 DECLARE_COMMON_IOQUEUE 48 pj_thread_desc thread_desc[16]; 49 unsigned thread_cnt; 69 50 }; 70 51 71 52 72 /* 73 * IO Queue key structure. 74 */ 75 struct pj_ioqueue_key_t 76 { 77 pj_sock_t sock; 78 void *user_data; 79 pj_ioqueue_callback cb; 80 }; 81 53 #include "ioqueue_common_abs.c" 54 55 static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, 56 pj_ioqueue_key_t *key, 57 enum ioqueue_event_type event_type) 58 { 59 PJ_UNUSED_ARG(ioqueue); 60 PJ_UNUSED_ARG(key); 61 PJ_UNUSED_ARG(event_type); 62 } 63 64 65 static void start_next_read(pj_ioqueue_key_t *key) 66 { 67 if (key_has_pending_read(key)) { 68 PjUwpSocket *s = (PjUwpSocket*)key->fd; 69 struct read_operation *op; 70 op = (struct read_operation*)key->read_list.next; 71 72 if (op->op == PJ_IOQUEUE_OP_RECV) 73 s->Recv(NULL, (pj_ssize_t*)&op->size); 74 else 75 s->RecvFrom(NULL, (pj_ssize_t*)&op->size, NULL); 76 } 77 } 78 79 80 static void start_next_write(pj_ioqueue_key_t *key) 81 { 82 if (key_has_pending_write(key)) { 83 PjUwpSocket *s = (PjUwpSocket*)key->fd; 84 struct write_operation *op; 85 op = (struct write_operation*)key->write_list.next; 86 87 if (op->op == PJ_IOQUEUE_OP_SEND) 88 s->Send(op->buf, (pj_ssize_t*)&op->size); 89 else 90 s->SendTo(op->buf, (pj_ssize_t*)&op->size, &op->rmt_addr); 91 } 92 } 93 94 95 static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, 96 pj_ioqueue_key_t *key, 97 enum ioqueue_event_type event_type ) 98 { 99 PJ_UNUSED_ARG(ioqueue); 100 101 if (event_type == READABLE_EVENT) { 102 /* This is either recv, recvfrom, or accept, do nothing on accept */ 103 start_next_read(key); 104 } else if (event_type == WRITEABLE_EVENT) { 105 /* This is either send, sendto, or connect, do nothing on connect */ 106 //start_next_write(key); 107 } 108 } 109 110 111 static void check_thread(pj_ioqueue_t *ioq) { 112 if (pj_thread_is_registered()) 113 return; 114 115 pj_thread_t *t; 116 char tmp[16]; 117 pj_ansi_snprintf(tmp, sizeof(tmp), "UwpThread%02d", ioq->thread_cnt); 118 pj_thread_register(tmp, ioq->thread_desc[ioq->thread_cnt++], &t); 119 pj_assert(ioq->thread_cnt < PJ_ARRAY_SIZE(ioq->thread_desc)); 120 ioq->thread_cnt %= PJ_ARRAY_SIZE(ioq->thread_desc); 121 } 82 122 83 123 static void on_read(PjUwpSocket *s, int bytes_read) 84 124 { 85 pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data; 86 if (key->cb.on_read_complete) { 87 (*key->cb.on_read_complete)(key, (pj_ioqueue_op_key_t*)s->read_userdata, bytes_read); 88 } 89 s->read_userdata = NULL; 125 pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData(); 126 pj_ioqueue_t *ioq = key->ioqueue; 127 check_thread(ioq); 128 129 ioqueue_dispatch_read_event(key->ioqueue, key); 130 131 if (bytes_read > 0) 132 start_next_read(key); 90 133 } 91 134 92 135 static void on_write(PjUwpSocket *s, int bytes_sent) 93 136 { 94 pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data; 95 if (key->cb.on_write_complete) { 96 (*key->cb.on_write_complete)(key, (pj_ioqueue_op_key_t*)s->write_userdata, bytes_sent); 97 } 98 s->write_userdata = NULL; 99 } 100 101 static void on_accept(PjUwpSocket *s, pj_status_t status) 102 { 103 pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data; 104 if (key->cb.on_accept_complete) { 105 if (status == PJ_SUCCESS) { 106 pj_sock_t new_sock; 107 pj_sockaddr addr; 108 int addrlen; 109 status = pj_sock_accept(key->sock, &new_sock, &addr, &addrlen); 110 (*key->cb.on_accept_complete)(key, (pj_ioqueue_op_key_t*)s->accept_userdata, new_sock, status); 111 } else { 112 (*key->cb.on_accept_complete)(key, (pj_ioqueue_op_key_t*)s->accept_userdata, NULL, status); 113 } 114 } 115 s->accept_userdata = NULL; 137 PJ_UNUSED_ARG(bytes_sent); 138 pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData(); 139 pj_ioqueue_t *ioq = key->ioqueue; 140 check_thread(ioq); 141 142 ioqueue_dispatch_write_event(key->ioqueue, key); 143 144 //start_next_write(key); 145 } 146 147 static void on_accept(PjUwpSocket *s) 148 { 149 pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData(); 150 pj_ioqueue_t *ioq = key->ioqueue; 151 check_thread(ioq); 152 153 ioqueue_dispatch_read_event(key->ioqueue, key); 116 154 } 117 155 118 156 static void on_connect(PjUwpSocket *s, pj_status_t status) 119 157 { 120 pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->user_data; 121 if (key->cb.on_connect_complete) { 122 (*key->cb.on_connect_complete)(key, status); 123 } 158 PJ_UNUSED_ARG(status); 159 pj_ioqueue_key_t *key = (pj_ioqueue_key_t*)s->GetUserData(); 160 pj_ioqueue_t *ioq = key->ioqueue; 161 check_thread(ioq); 162 163 ioqueue_dispatch_write_event(key->ioqueue, key); 124 164 } 125 165 … … 142 182 { 143 183 pj_ioqueue_t *ioq; 184 pj_lock_t *lock; 185 pj_status_t rc; 144 186 145 187 PJ_UNUSED_ARG(max_fd); 146 188 147 189 ioq = PJ_POOL_ZALLOC_T(pool, pj_ioqueue_t); 190 191 /* Create and init ioqueue mutex */ 192 rc = pj_lock_create_null_mutex(pool, "ioq%p", &lock); 193 if (rc != PJ_SUCCESS) 194 return rc; 195 196 rc = pj_ioqueue_set_lock(ioq, lock, PJ_TRUE); 197 if (rc != PJ_SUCCESS) 198 return rc; 199 200 PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioq)); 201 148 202 *p_ioqueue = ioq; 149 203 return PJ_SUCCESS; … … 156 210 PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioq ) 157 211 { 158 PJ_UNUSED_ARG(ioq); 159 return PJ_SUCCESS; 160 } 161 162 163 /* 164 * Set the lock object to be used by the I/O Queue. 165 */ 166 PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioq, 167 pj_lock_t *lock, 168 pj_bool_t auto_delete ) 169 { 170 /* Don't really need lock for now */ 171 PJ_UNUSED_ARG(ioq); 172 173 if (auto_delete) { 174 pj_lock_destroy(lock); 175 } 176 177 return PJ_SUCCESS; 178 } 179 180 PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue, 181 pj_bool_t allow) 182 { 183 /* Not supported, just return PJ_SUCCESS silently */ 184 PJ_UNUSED_ARG(ioqueue); 185 PJ_UNUSED_ARG(allow); 186 return PJ_SUCCESS; 187 } 212 return ioqueue_destroy(ioq); 213 } 214 188 215 189 216 /* … … 191 218 */ 192 219 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 193 pj_ioqueue_t *ioq ,220 pj_ioqueue_t *ioqueue, 194 221 pj_sock_t sock, 195 222 void *user_data, … … 197 224 pj_ioqueue_key_t **p_key ) 198 225 { 199 PJ_UNUSED_ARG(ioq); 200 201 pj_ioqueue_key_t *key; 202 203 key = PJ_POOL_ZALLOC_T(pool, pj_ioqueue_key_t); 204 key->sock = sock; 205 key->user_data = user_data; 206 pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); 207 208 PjUwpSocket *s = (PjUwpSocket*)sock; 209 s->is_blocking = PJ_FALSE; 210 s->user_data = key; 211 s->on_read = &on_read; 212 s->on_write = &on_write; 213 s->on_accept = &on_accept; 214 s->on_connect = &on_connect; 215 216 *p_key = key; 217 return PJ_SUCCESS; 226 return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data, 227 cb, p_key); 218 228 } 219 229 … … 226 236 pj_ioqueue_key_t **p_key) 227 237 { 228 PJ_UNUSED_ARG(grp_lock); 229 230 return pj_ioqueue_register_sock(pool, ioqueue, sock, user_data, cb, p_key); 238 PjUwpSocketCallback uwp_cb = 239 { &on_read, &on_write, &on_accept, &on_connect }; 240 pj_ioqueue_key_t *key; 241 pj_status_t rc; 242 243 pj_lock_acquire(ioqueue->lock); 244 245 key = PJ_POOL_ZALLOC_T(pool, pj_ioqueue_key_t); 246 rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb); 247 if (rc != PJ_SUCCESS) { 248 key = NULL; 249 goto on_return; 250 } 251 252 /* Create ioqueue key lock, if not yet */ 253 if (!key->lock) { 254 rc = pj_lock_create_simple_mutex(pool, NULL, &key->lock); 255 if (rc != PJ_SUCCESS) { 256 key = NULL; 257 goto on_return; 258 } 259 } 260 261 PjUwpSocket *s = (PjUwpSocket*)sock; 262 s->SetNonBlocking(&uwp_cb, key); 263 264 on_return: 265 if (rc != PJ_SUCCESS) { 266 if (key && key->grp_lock) 267 pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0); 268 } 269 *p_key = key; 270 pj_lock_release(ioqueue->lock); 271 272 return rc; 273 231 274 } 232 275 … … 236 279 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) 237 280 { 238 if (key == NULL || key->sock == NULL) 239 return PJ_SUCCESS; 240 241 if (key->sock) 242 pj_sock_close(key->sock); 243 key->sock = NULL; 281 pj_ioqueue_t *ioqueue; 282 283 PJ_ASSERT_RETURN(key, PJ_EINVAL); 284 285 ioqueue = key->ioqueue; 286 287 /* Lock the key to make sure no callback is simultaneously modifying 288 * the key. We need to lock the key before ioqueue here to prevent 289 * deadlock. 290 */ 291 pj_ioqueue_lock_key(key); 292 293 /* Also lock ioqueue */ 294 pj_lock_acquire(ioqueue->lock); 295 296 /* Close socket. */ 297 pj_sock_close(key->fd); 298 299 /* Clear callback */ 300 key->cb.on_accept_complete = NULL; 301 key->cb.on_connect_complete = NULL; 302 key->cb.on_read_complete = NULL; 303 key->cb.on_write_complete = NULL; 304 305 pj_lock_release(ioqueue->lock); 306 307 if (key->grp_lock) { 308 pj_grp_lock_t *grp_lock = key->grp_lock; 309 pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); 310 pj_grp_lock_release(grp_lock); 311 } else { 312 pj_ioqueue_unlock_key(key); 313 } 314 315 pj_lock_destroy(key->lock); 244 316 245 317 return PJ_SUCCESS; 246 318 } 247 319 248 249 /*250 * Get user data associated with an ioqueue key.251 */252 PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )253 {254 return key->user_data;255 }256 257 258 /*259 * Set or change the user data to be associated with the file descriptor or260 * handle or socket descriptor.261 */262 PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,263 void *user_data,264 void **old_data)265 {266 if (old_data)267 *old_data = key->user_data;268 key->user_data= user_data;269 270 return PJ_SUCCESS;271 }272 273 274 /*275 * Initialize operation key.276 */277 PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,278 pj_size_t size )279 {280 pj_bzero(op_key, size);281 }282 283 284 /*285 * Check if operation is pending on the specified operation key.286 */287 PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,288 pj_ioqueue_op_key_t *op_key )289 {290 PJ_UNUSED_ARG(key);291 PJ_UNUSED_ARG(op_key);292 return PJ_FALSE;293 }294 295 296 /*297 * Post completion status to the specified operation key and call the298 * appropriate callback.299 */300 PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,301 pj_ioqueue_op_key_t *op_key,302 pj_ssize_t bytes_status )303 {304 PJ_UNUSED_ARG(key);305 PJ_UNUSED_ARG(op_key);306 PJ_UNUSED_ARG(bytes_status);307 return PJ_ENOTSUP;308 }309 310 311 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0312 /**313 * Instruct I/O Queue to accept incoming connection on the specified314 * listening socket.315 */316 PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,317 pj_ioqueue_op_key_t *op_key,318 pj_sock_t *new_sock,319 pj_sockaddr_t *local,320 pj_sockaddr_t *remote,321 int *addrlen )322 {323 PJ_UNUSED_ARG(new_sock);324 PJ_UNUSED_ARG(local);325 PJ_UNUSED_ARG(remote);326 PJ_UNUSED_ARG(addrlen);327 328 PjUwpSocket *s = (PjUwpSocket*)key->sock;329 s->accept_userdata = op_key;330 return pj_sock_listen(key->sock, 0);331 }332 333 334 /*335 * Initiate non-blocking socket connect.336 */337 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,338 const pj_sockaddr_t *addr,339 int addrlen )340 {341 return pj_sock_connect(key->sock, addr, addrlen);342 }343 344 345 #endif /* PJ_HAS_TCP */346 320 347 321 /* … … 351 325 const pj_time_val *timeout) 352 326 { 353 /* Polling is not necessary on uwp, since all async activities354 * are registered to active scheduler.327 /* Polling is not necessary on UWP, since each socket handles 328 * its events already. 355 329 */ 356 330 PJ_UNUSED_ARG(ioq); 357 PJ_UNUSED_ARG(timeout); 331 332 pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); 333 358 334 return 0; 359 335 } 360 336 361 362 /*363 * Instruct the I/O Queue to read from the specified handle.364 */365 PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,366 pj_ioqueue_op_key_t *op_key,367 void *buffer,368 pj_ssize_t *length,369 pj_uint32_t flags )370 {371 PjUwpSocket *s = (PjUwpSocket*)key->sock;372 s->read_userdata = op_key;373 return pj_sock_recv(key->sock, buffer, length, flags);374 }375 376 377 /*378 * This function behaves similarly as #pj_ioqueue_recv(), except that it is379 * normally called for socket, and the remote address will also be returned380 * along with the data.381 */382 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,383 pj_ioqueue_op_key_t *op_key,384 void *buffer,385 pj_ssize_t *length,386 pj_uint32_t flags,387 pj_sockaddr_t *addr,388 int *addrlen)389 {390 PjUwpSocket *s = (PjUwpSocket*)key->sock;391 s->read_userdata = op_key;392 return pj_sock_recvfrom(key->sock, buffer, length, flags, addr, addrlen);393 }394 395 396 /*397 * Instruct the I/O Queue to write to the handle.398 */399 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,400 pj_ioqueue_op_key_t *op_key,401 const void *data,402 pj_ssize_t *length,403 pj_uint32_t flags )404 {405 PjUwpSocket *s = (PjUwpSocket*)key->sock;406 s->write_userdata = op_key;407 return pj_sock_send(key->sock, data, length, flags);408 }409 410 411 /*412 * Instruct the I/O Queue to write to the handle.413 */414 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,415 pj_ioqueue_op_key_t *op_key,416 const void *data,417 pj_ssize_t *length,418 pj_uint32_t flags,419 const pj_sockaddr_t *addr,420 int addrlen)421 {422 PjUwpSocket *s = (PjUwpSocket*)key->sock;423 s->write_userdata = op_key;424 return pj_sock_sendto(key->sock, data, length, flags, addr, addrlen);425 }426 427 PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,428 pj_bool_t allow)429 {430 /* Not supported, just return PJ_SUCCESS silently */431 PJ_UNUSED_ARG(key);432 PJ_UNUSED_ARG(allow);433 return PJ_SUCCESS;434 }435 436 PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)437 {438 /* Not supported, just return PJ_SUCCESS silently */439 PJ_UNUSED_ARG(key);440 return PJ_SUCCESS;441 }442 443 PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)444 {445 /* Not supported, just return PJ_SUCCESS silently */446 PJ_UNUSED_ARG(key);447 return PJ_SUCCESS;448 } -
pjproject/branches/projects/uwp/pjlib/src/pj/sock_uwp.cpp
r5254 r5256 17 17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 18 18 */ 19 #include <pj/sock.h>20 #include <pj/addr_resolv.h>21 19 #include <pj/assert.h> 22 20 #include <pj/errno.h> 23 21 #include <pj/math.h> 24 22 #include <pj/os.h> 25 #include <pj/string.h>26 #include <pj/unicode.h>27 23 #include <pj/compat/socket.h> 28 24 … … 30 26 #include <string> 31 27 28 #define THIS_FILE "sock_uwp.cpp" 29 32 30 #include "sock_uwp.h" 33 34 #define THIS_FILE "sock_uwp.cpp"35 31 36 32 /* … … 201 197 { 202 198 try { 199 if (uwp_sock->sock_state >= SOCKSTATE_DISCONNECTED) 200 return; 201 203 202 recv_args = args; 204 203 avail_data_len = args->GetDataReader()->UnconsumedBufferLength; 205 204 206 // Notify application asynchronously 207 concurrency::create_task([this]() 208 { 209 if (uwp_sock->on_read) { 210 if (!pj_thread_is_registered()) 211 pj_thread_register("MsgReceive", thread_desc, 212 &rec_thread); 213 214 (tp)(*uwp_sock->read_userdata) 215 (*uwp_sock->on_read)(uwp_sock, avail_data_len); 216 } 217 }); 205 if (uwp_sock->cb.on_read) { 206 (*uwp_sock->cb.on_read)(uwp_sock, avail_data_len); 207 } 218 208 219 209 WaitForSingleObjectEx(recv_wait, INFINITE, false); 220 } catch ( Exception^ e) {}210 } catch (...) {} 221 211 } 222 212 … … 265 255 HANDLE recv_wait; 266 256 int avail_data_len; 267 pj_thread_desc thread_desc;268 pj_thread_t *rec_thread;269 257 }; 270 258 … … 288 276 conn_args = args; 289 277 290 // Notify application asynchronously 291 concurrency::create_task([this]() 292 { 293 if (uwp_sock->on_accept) { 294 if (!pj_thread_is_registered()) 295 pj_thread_register("ConnReceive", thread_desc, 296 &listener_thread); 297 298 (*uwp_sock->on_accept)(uwp_sock, PJ_SUCCESS); 299 } 300 }); 278 if (uwp_sock->cb.on_accept) { 279 (*uwp_sock->cb.on_accept)(uwp_sock); 280 } 301 281 302 282 WaitForSingleObjectEx(conn_wait, INFINITE, false); … … 304 284 } 305 285 306 pj_status_t GetAcceptedSocket(StreamSocket^ stream_sock)286 pj_status_t GetAcceptedSocket(StreamSocket^& stream_sock) 307 287 { 308 288 if (conn_args == nullptr) … … 333 313 EventRegistrationToken event_token; 334 314 HANDLE conn_wait; 335 336 pj_thread_desc thread_desc;337 pj_thread_t *listener_thread;338 315 }; 339 316 … … 341 318 PjUwpSocket::PjUwpSocket(int af_, int type_, int proto_) : 342 319 af(af_), type(type_), proto(proto_), 343 sock_type(SOCKTYPE_UNKNOWN), sock_state(SOCKSTATE_NULL), 344 is_blocking(PJ_TRUE), is_busy_sending(PJ_FALSE) 320 sock_type(SOCKTYPE_UNKNOWN), 321 sock_state(SOCKSTATE_NULL), 322 is_blocking(PJ_TRUE), 323 has_pending_bind(PJ_FALSE), 324 has_pending_send(PJ_FALSE), 325 has_pending_recv(PJ_FALSE) 345 326 { 346 327 pj_sockaddr_init(pj_AF_INET(), &local_addr, NULL, 0); … … 349 330 350 331 PjUwpSocket::~PjUwpSocket() 351 {} 332 { 333 DeinitSocket(); 334 } 352 335 353 336 PjUwpSocket* PjUwpSocket::CreateAcceptSocket(Windows::Networking::Sockets::StreamSocket^ stream_sock_) … … 359 342 new_sock->socket_reader = ref new DataReader(new_sock->stream_sock->InputStream); 360 343 new_sock->socket_writer = ref new DataWriter(new_sock->stream_sock->OutputStream); 344 new_sock->socket_reader->InputStreamOptions = InputStreamOptions::Partial; 361 345 new_sock->send_buffer = ref new Buffer(SEND_BUFFER_SIZE); 362 346 new_sock->is_blocking = is_blocking; … … 399 383 } 400 384 385 386 void PjUwpSocket::DeinitSocket() 387 { 388 if (stream_sock) { 389 concurrency::create_task(stream_sock->CancelIOAsync()).wait(); 390 } 391 if (datagram_sock) { 392 concurrency::create_task(datagram_sock->CancelIOAsync()).wait(); 393 } 394 if (listener_sock) { 395 concurrency::create_task(listener_sock->CancelIOAsync()).wait(); 396 } 397 stream_sock = nullptr; 398 datagram_sock = nullptr; 399 dgram_recv_helper = nullptr; 400 listener_sock = nullptr; 401 listener_helper = nullptr; 402 socket_writer = nullptr; 403 socket_reader = nullptr; 404 sock_state = SOCKSTATE_NULL; 405 } 406 407 pj_status_t PjUwpSocket::Bind(const pj_sockaddr_t *addr) 408 { 409 /* Not initialized yet, socket type is perhaps TCP, just not decided yet 410 * whether it is a stream or a listener. 411 */ 412 if (sock_state < SOCKSTATE_INITIALIZED) { 413 pj_sockaddr_cp(&local_addr, addr); 414 has_pending_bind = PJ_TRUE; 415 return PJ_SUCCESS; 416 } 417 418 PJ_ASSERT_RETURN(sock_state == SOCKSTATE_INITIALIZED, PJ_EINVALIDOP); 419 if (sock_type != SOCKTYPE_DATAGRAM && sock_type != SOCKTYPE_LISTENER) 420 return PJ_EINVALIDOP; 421 422 if (has_pending_bind) { 423 has_pending_bind = PJ_FALSE; 424 if (!addr) 425 addr = &local_addr; 426 } 427 428 /* If no bound address is set, just return */ 429 if (!pj_sockaddr_has_addr(addr) && !pj_sockaddr_get_port(addr)) 430 return PJ_SUCCESS; 431 432 if (addr != &local_addr) 433 pj_sockaddr_cp(&local_addr, addr); 434 435 HRESULT err = 0; 436 try { 437 concurrency::create_task([this, addr]() { 438 HostName ^host; 439 int port; 440 sockaddr_to_hostname_port(addr, host, &port); 441 if (pj_sockaddr_has_addr(addr)) { 442 if (sock_type == SOCKTYPE_DATAGRAM) 443 return datagram_sock->BindEndpointAsync(host, port.ToString()); 444 else 445 return listener_sock->BindEndpointAsync(host, port.ToString()); 446 } else /* if (pj_sockaddr_get_port(addr) != 0) */ { 447 if (sock_type == SOCKTYPE_DATAGRAM) 448 return datagram_sock->BindServiceNameAsync(port.ToString()); 449 else 450 return listener_sock->BindServiceNameAsync(port.ToString()); 451 } 452 }).then([this, &err](concurrency::task<void> t) 453 { 454 try { 455 t.get(); 456 } catch (Exception^ e) { 457 err = e->HResult; 458 } 459 }).get(); 460 } catch (Exception^ e) { 461 err = e->HResult; 462 } 463 464 return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS); 465 } 466 467 468 pj_status_t PjUwpSocket::SendImp(const void *buf, pj_ssize_t *len) 469 { 470 if (has_pending_send) 471 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 472 473 if (*len > (pj_ssize_t)send_buffer->Capacity) 474 return PJ_ETOOBIG; 475 476 CopyToIBuffer((unsigned char*)buf, *len, send_buffer); 477 send_buffer->Length = *len; 478 socket_writer->WriteBuffer(send_buffer); 479 480 /* Blocking version */ 481 if (is_blocking) { 482 pj_status_t status = PJ_SUCCESS; 483 concurrency::cancellation_token_source cts; 484 auto cts_token = cts.get_token(); 485 auto t = concurrency::create_task(socket_writer->StoreAsync(), 486 cts_token); 487 *len = cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 488 then([cts_token, &status](concurrency::task<unsigned int> t_) 489 { 490 int sent = 0; 491 try { 492 if (cts_token.is_canceled()) 493 status = PJ_ETIMEDOUT; 494 else 495 sent = t_.get(); 496 } catch (Exception^ e) { 497 status = PJ_RETURN_OS_ERROR(e->HResult); 498 } 499 return sent; 500 }).get(); 501 502 return status; 503 } 504 505 /* Non-blocking version */ 506 has_pending_send = PJ_TRUE; 507 concurrency::create_task(socket_writer->StoreAsync()). 508 then([this](concurrency::task<unsigned int> t_) 509 { 510 try { 511 unsigned int l = t_.get(); 512 has_pending_send = PJ_FALSE; 513 514 // invoke callback 515 if (cb.on_write) { 516 (*cb.on_write)(this, l); 517 } 518 } catch (...) { 519 has_pending_send = PJ_FALSE; 520 sock_state = SOCKSTATE_ERROR; 521 DeinitSocket(); 522 523 // invoke callback 524 if (cb.on_write) { 525 (*cb.on_write)(this, -PJ_EUNKNOWN); 526 } 527 } 528 }); 529 530 return PJ_SUCCESS; 531 } 532 533 534 pj_status_t PjUwpSocket::Send(const void *buf, pj_ssize_t *len) 535 { 536 if ((sock_type!=SOCKTYPE_STREAM && sock_type!=SOCKTYPE_DATAGRAM) || 537 (sock_state!=SOCKSTATE_CONNECTED)) 538 { 539 return PJ_EINVALIDOP; 540 } 541 542 /* Sending for SOCKTYPE_DATAGRAM is implemented in pj_sock_sendto() */ 543 if (sock_type == SOCKTYPE_DATAGRAM) { 544 return SendTo(buf, len, &remote_addr); 545 } 546 547 return SendImp(buf, len); 548 } 549 550 551 pj_status_t PjUwpSocket::SendTo(const void *buf, pj_ssize_t *len, 552 const pj_sockaddr_t *to) 553 { 554 if (sock_type != SOCKTYPE_DATAGRAM || sock_state < SOCKSTATE_INITIALIZED 555 || sock_state >= SOCKSTATE_DISCONNECTED) 556 { 557 return PJ_EINVALIDOP; 558 } 559 560 if (has_pending_send) 561 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 562 563 if (*len > (pj_ssize_t)send_buffer->Capacity) 564 return PJ_ETOOBIG; 565 566 HostName ^hostname; 567 int port; 568 sockaddr_to_hostname_port(to, hostname, &port); 569 570 concurrency::cancellation_token_source cts; 571 auto cts_token = cts.get_token(); 572 auto t = concurrency::create_task(datagram_sock->GetOutputStreamAsync( 573 hostname, port.ToString()), cts_token); 574 pj_status_t status = PJ_SUCCESS; 575 576 cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 577 then([this, cts_token, &status](concurrency::task<IOutputStream^> t_) 578 { 579 try { 580 if (cts_token.is_canceled()) { 581 status = PJ_ETIMEDOUT; 582 } else { 583 IOutputStream^ outstream = t_.get(); 584 socket_writer = ref new DataWriter(outstream); 585 } 586 } catch (Exception^ e) { 587 status = PJ_RETURN_OS_ERROR(e->HResult); 588 } 589 }).get(); 590 591 if (status != PJ_SUCCESS) 592 return status; 593 594 status = SendImp(buf, len); 595 if ((status == PJ_SUCCESS || status == PJ_EPENDING) && 596 sock_state < SOCKSTATE_CONNECTED) 597 { 598 sock_state = SOCKSTATE_CONNECTED; 599 } 600 601 return status; 602 } 603 604 605 int PjUwpSocket::ConsumeReadBuffer(void *buf, int max_len) 606 { 607 if (socket_reader->UnconsumedBufferLength == 0) 608 return 0; 609 610 int read_len = PJ_MIN((int)socket_reader->UnconsumedBufferLength,max_len); 611 IBuffer^ buffer = socket_reader->ReadBuffer(read_len); 612 read_len = buffer->Length; 613 CopyFromIBuffer((unsigned char*)buf, read_len, buffer); 614 return read_len; 615 } 616 617 618 pj_status_t PjUwpSocket::Recv(void *buf, pj_ssize_t *len) 619 { 620 /* Only for TCP, at least for now! */ 621 if (sock_type == SOCKTYPE_DATAGRAM) 622 return PJ_ENOTSUP; 623 624 if (sock_type != SOCKTYPE_STREAM || sock_state != SOCKSTATE_CONNECTED) 625 return PJ_EINVALIDOP; 626 627 if (has_pending_recv) 628 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 629 630 /* First check if there is already some data in the read buffer */ 631 if (buf) { 632 int avail_len = ConsumeReadBuffer(buf, *len); 633 if (avail_len > 0) { 634 *len = avail_len; 635 return PJ_SUCCESS; 636 } 637 } 638 639 /* Blocking version */ 640 if (is_blocking) { 641 pj_status_t status = PJ_SUCCESS; 642 concurrency::cancellation_token_source cts; 643 auto cts_token = cts.get_token(); 644 auto t = concurrency::create_task(socket_reader->LoadAsync(*len), 645 cts_token); 646 *len = cancel_after_timeout(t, cts, READ_TIMEOUT) 647 .then([this, len, buf, cts_token, &status] 648 (concurrency::task<unsigned int> t_) 649 { 650 try { 651 if (cts_token.is_canceled()) { 652 status = PJ_ETIMEDOUT; 653 return 0; 654 } 655 t_.get(); 656 } catch (Exception^) { 657 status = PJ_ETIMEDOUT; 658 return 0; 659 } 660 661 *len = ConsumeReadBuffer(buf, *len); 662 return (int)*len; 663 }).get(); 664 665 return status; 666 } 667 668 /* Non-blocking version */ 669 670 has_pending_recv = PJ_TRUE; 671 concurrency::create_task(socket_reader->LoadAsync(*len)) 672 .then([this](concurrency::task<unsigned int> t_) 673 { 674 try { 675 // catch any exception 676 t_.get(); 677 has_pending_recv = PJ_FALSE; 678 679 // invoke callback 680 int read_len = socket_reader->UnconsumedBufferLength; 681 if (read_len > 0 && cb.on_read) { 682 (*cb.on_read)(this, read_len); 683 } 684 } catch (Exception^ e) { 685 has_pending_recv = PJ_FALSE; 686 687 // invoke callback 688 if (cb.on_read) { 689 (*cb.on_read)(this, -PJ_EUNKNOWN); 690 } 691 } 692 }); 693 694 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 695 } 696 697 698 pj_status_t PjUwpSocket::RecvFrom(void *buf, pj_ssize_t *len, 699 pj_sockaddr_t *from) 700 { 701 if (sock_type != SOCKTYPE_DATAGRAM || sock_state < SOCKSTATE_INITIALIZED 702 || sock_state >= SOCKSTATE_DISCONNECTED) 703 { 704 return PJ_EINVALIDOP; 705 } 706 707 /* Start receive, if not yet */ 708 if (dgram_recv_helper == nullptr) { 709 dgram_recv_helper = ref new PjUwpSocketDatagramRecvHelper(this); 710 } 711 712 /* Try to read any available data first */ 713 if (buf || is_blocking) { 714 pj_status_t status; 715 status = dgram_recv_helper->ReadDataIfAvailable(buf, len, from); 716 if (status != PJ_ENOTFOUND) 717 return status; 718 } 719 720 /* Blocking version */ 721 if (is_blocking) { 722 int max_loop = 0; 723 pj_status_t status = PJ_ENOTFOUND; 724 while (status == PJ_ENOTFOUND && sock_state <= SOCKSTATE_CONNECTED) 725 { 726 status = dgram_recv_helper->ReadDataIfAvailable(buf, len, from); 727 if (status != PJ_SUCCESS) 728 pj_thread_sleep(100); 729 730 if (++max_loop > 10) 731 return PJ_ETIMEDOUT; 732 } 733 return status; 734 } 735 736 /* For non-blocking version, just return PJ_EPENDING */ 737 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 738 } 739 740 741 pj_status_t PjUwpSocket::Connect(const pj_sockaddr_t *addr) 742 { 743 pj_status_t status; 744 745 PJ_ASSERT_RETURN((sock_type == SOCKTYPE_UNKNOWN && sock_state == SOCKSTATE_NULL) || 746 (sock_type == SOCKTYPE_DATAGRAM && sock_state == SOCKSTATE_INITIALIZED), 747 PJ_EINVALIDOP); 748 749 if (sock_type == SOCKTYPE_UNKNOWN) { 750 InitSocket(SOCKTYPE_STREAM); 751 // No need to check pending bind, no bind for TCP client socket 752 } 753 754 pj_sockaddr_cp(&remote_addr, addr); 755 756 auto t = concurrency::create_task([this, addr]() 757 { 758 HostName ^hostname; 759 int port; 760 sockaddr_to_hostname_port(&remote_addr, hostname, &port); 761 if (sock_type == SOCKTYPE_STREAM) 762 return stream_sock->ConnectAsync(hostname, port.ToString(), 763 SocketProtectionLevel::PlainSocket); 764 else 765 return datagram_sock->ConnectAsync(hostname, port.ToString()); 766 }).then([=](concurrency::task<void> t_) 767 { 768 try { 769 t_.get(); 770 771 sock_state = SOCKSTATE_CONNECTED; 772 773 // Update local & remote address 774 HostName^ local_address; 775 String^ local_port; 776 777 if (sock_type == SOCKTYPE_STREAM) { 778 local_address = stream_sock->Information->LocalAddress; 779 local_port = stream_sock->Information->LocalPort; 780 781 socket_reader = ref new DataReader(stream_sock->InputStream); 782 socket_writer = ref new DataWriter(stream_sock->OutputStream); 783 socket_reader->InputStreamOptions = InputStreamOptions::Partial; 784 } else { 785 local_address = datagram_sock->Information->LocalAddress; 786 local_port = datagram_sock->Information->LocalPort; 787 } 788 if (local_address && local_port) { 789 wstr_addr_to_sockaddr(local_address->CanonicalName->Data(), 790 local_port->Data(), 791 &local_addr); 792 } 793 794 if (!is_blocking && cb.on_connect) { 795 (*cb.on_connect)(this, PJ_SUCCESS); 796 } 797 return (pj_status_t)PJ_SUCCESS; 798 799 } catch (Exception^ ex) { 800 801 SocketErrorStatus status = SocketError::GetStatus(ex->HResult); 802 803 switch (status) 804 { 805 case SocketErrorStatus::UnreachableHost: 806 break; 807 case SocketErrorStatus::ConnectionTimedOut: 808 break; 809 case SocketErrorStatus::ConnectionRefused: 810 break; 811 default: 812 break; 813 } 814 815 if (!is_blocking && cb.on_connect) { 816 (*cb.on_connect)(this, PJ_EUNKNOWN); 817 } 818 819 return (pj_status_t)PJ_EUNKNOWN; 820 } 821 }); 822 823 if (!is_blocking) 824 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_CONNECT_ERROR_VAL); 825 826 try { 827 status = t.get(); 828 } catch (Exception^) { 829 return PJ_EUNKNOWN; 830 } 831 return status; 832 } 833 834 pj_status_t PjUwpSocket::Listen() 835 { 836 PJ_ASSERT_RETURN((sock_type == SOCKTYPE_UNKNOWN) || 837 (sock_type == SOCKTYPE_LISTENER && 838 sock_state == SOCKSTATE_INITIALIZED), 839 PJ_EINVALIDOP); 840 841 if (sock_type == SOCKTYPE_UNKNOWN) 842 InitSocket(SOCKTYPE_LISTENER); 843 844 if (has_pending_bind) 845 Bind(); 846 847 /* Start listen */ 848 if (listener_helper == nullptr) { 849 listener_helper = ref new PjUwpSocketListenerHelper(this); 850 } 851 852 return PJ_SUCCESS; 853 } 854 855 pj_status_t PjUwpSocket::Accept(PjUwpSocket **new_sock) 856 { 857 if (sock_type != SOCKTYPE_LISTENER || sock_state != SOCKSTATE_INITIALIZED) 858 return PJ_EINVALIDOP; 859 860 StreamSocket^ accepted_sock; 861 pj_status_t status = listener_helper->GetAcceptedSocket(accepted_sock); 862 if (status == PJ_ENOTFOUND) 863 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 864 865 if (status != PJ_SUCCESS) 866 return status; 867 868 *new_sock = CreateAcceptSocket(accepted_sock); 869 return PJ_SUCCESS; 870 } 401 871 402 872 … … 718 1188 PJ_ASSERT_RETURN(sock, PJ_EINVAL); 719 1189 PJ_ASSERT_RETURN(addr && len>=(int)sizeof(pj_sockaddr_in), PJ_EINVAL); 720 721 1190 PjUwpSocket *s = (PjUwpSocket*)sock; 722 723 if (s->sock_state > SOCKSTATE_INITIALIZED) 724 return PJ_EINVALIDOP; 725 726 pj_sockaddr_cp(&s->local_addr, addr); 727 728 /* Bind now if this is UDP. But if it is TCP, unfortunately we don't 729 * know yet whether it is SocketStream or Listener! 730 */ 731 if (s->type == pj_SOCK_DGRAM()) { 732 HRESULT err = 0; 733 734 try { 735 concurrency::create_task([s, addr]() { 736 HostName ^hostname; 737 int port; 738 sockaddr_to_hostname_port(addr, hostname, &port); 739 if (pj_sockaddr_has_addr(addr)) { 740 s->datagram_sock->BindEndpointAsync(hostname, 741 port.ToString()); 742 } else if (pj_sockaddr_get_port(addr) != 0) { 743 s->datagram_sock->BindServiceNameAsync(port.ToString()); 744 } 745 }).then([s, &err](concurrency::task<void> t) 746 { 747 try { 748 t.get(); 749 s->sock_state = SOCKSTATE_CONNECTED; 750 } catch (Exception^ e) { 751 err = e->HResult; 752 } 753 }).get(); 754 } catch (Exception^ e) { 755 err = e->HResult; 756 } 757 758 return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS); 759 } 760 761 return PJ_SUCCESS; 1191 return s->Bind(addr); 762 1192 } 763 1193 … … 812 1242 813 1243 PjUwpSocket *s = (PjUwpSocket*)sock; 814 815 pj_sockaddr_cp(addr, &s->remote_addr); 816 *namelen = pj_sockaddr_get_len(&s->remote_addr); 1244 pj_sockaddr_cp(addr, s->GetRemoteAddr()); 1245 *namelen = pj_sockaddr_get_len(addr); 817 1246 818 1247 return PJ_SUCCESS; … … 831 1260 832 1261 PjUwpSocket *s = (PjUwpSocket*)sock; 833 834 pj_sockaddr_cp(addr, &s->local_addr); 835 *namelen = pj_sockaddr_get_len(&s->local_addr); 1262 pj_sockaddr_cp(addr, s->GetLocalAddr()); 1263 *namelen = pj_sockaddr_get_len(addr); 836 1264 837 1265 return PJ_SUCCESS; 838 1266 } 839 1267 840 841 static pj_status_t sock_send_imp(PjUwpSocket *s, const void *buf,842 pj_ssize_t *len)843 {844 if (s->is_busy_sending)845 return PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK);846 847 if (*len > (pj_ssize_t)s->send_buffer->Capacity)848 return PJ_ETOOBIG;849 850 CopyToIBuffer((unsigned char*)buf, *len, s->send_buffer);851 s->send_buffer->Length = *len;852 s->socket_writer->WriteBuffer(s->send_buffer);853 854 if (s->is_blocking) {855 pj_status_t status = PJ_SUCCESS;856 concurrency::cancellation_token_source cts;857 auto cts_token = cts.get_token();858 auto t = concurrency::create_task(s->socket_writer->StoreAsync(),859 cts_token);860 *len = cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT).861 then([cts_token, &status](concurrency::task<unsigned int> t_)862 {863 int sent = 0;864 try {865 if (cts_token.is_canceled())866 status = PJ_ETIMEDOUT;867 else868 sent = t_.get();869 } catch (Exception^ e) {870 status = PJ_RETURN_OS_ERROR(e->HResult);871 }872 return sent;873 }).get();874 875 return status;876 }877 878 s->is_busy_sending = true;879 concurrency::create_task(s->socket_writer->StoreAsync()).880 then([s](concurrency::task<unsigned int> t_)881 {882 try {883 unsigned int l = t_.get();884 s->is_busy_sending = false;885 886 // invoke callback887 if (s->on_write) {888 (*s->on_write)(s, l);889 }890 } catch (Exception^ e) {891 s->is_busy_sending = false;892 if (s->sock_type == SOCKTYPE_STREAM)893 s->sock_state = SOCKSTATE_DISCONNECTED;894 895 // invoke callback896 if (s->on_write) {897 (*s->on_write)(s, -PJ_RETURN_OS_ERROR(e->HResult));898 }899 }900 });901 902 return PJ_EPENDING;903 }904 1268 905 1269 /* … … 916 1280 917 1281 PjUwpSocket *s = (PjUwpSocket*)sock; 918 919 if ((s->sock_type!=SOCKTYPE_STREAM && s->sock_type!=SOCKTYPE_DATAGRAM) || 920 (s->sock_state!=SOCKSTATE_CONNECTED)) 921 { 922 return PJ_EINVALIDOP; 923 } 924 925 /* Sending for SOCKTYPE_DATAGRAM is implemented in pj_sock_sendto() */ 926 if (s->sock_type == SOCKTYPE_DATAGRAM) { 927 return pj_sock_sendto(sock, buf, len, flags, &s->remote_addr, 928 pj_sockaddr_get_len(&s->remote_addr)); 929 } 930 931 return sock_send_imp(s, buf, len); 1282 return s->Send(buf, len); 932 1283 } 933 1284 … … 949 1300 950 1301 PjUwpSocket *s = (PjUwpSocket*)sock; 951 952 if (s->sock_type != SOCKTYPE_DATAGRAM || 953 s->sock_state < SOCKSTATE_INITIALIZED) 954 { 955 return PJ_EINVALIDOP; 956 } 957 958 if (s->is_busy_sending) 959 return PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK); 960 961 if (*len > (pj_ssize_t)s->send_buffer->Capacity) 962 return PJ_ETOOBIG; 963 964 HostName ^hostname; 965 int port; 966 sockaddr_to_hostname_port(to, hostname, &port); 967 968 concurrency::cancellation_token_source cts; 969 auto cts_token = cts.get_token(); 970 auto t = concurrency::create_task( 971 s->datagram_sock->GetOutputStreamAsync( 972 hostname, port.ToString()), cts_token); 973 pj_status_t status = PJ_SUCCESS; 974 975 cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 976 then([s, cts_token, &status](concurrency::task<IOutputStream^> t_) 977 { 978 try { 979 if (cts_token.is_canceled()) { 980 status = PJ_ETIMEDOUT; 981 } else { 982 IOutputStream^ outstream = t_.get(); 983 s->socket_writer = ref new DataWriter(outstream); 984 } 985 } catch (Exception^ e) { 986 status = PJ_RETURN_OS_ERROR(e->HResult); 987 } 988 }).get(); 989 990 if (status != PJ_SUCCESS) 991 return status; 992 993 status = sock_send_imp(s, buf, len); 994 995 if ((status == PJ_SUCCESS || status == PJ_EPENDING) && 996 s->sock_state < SOCKSTATE_CONNECTED) 997 { 998 s->sock_state = SOCKSTATE_CONNECTED; 999 } 1000 1001 return status; 1002 } 1003 1004 1005 static int consume_read_buffer(PjUwpSocket *s, void *buf, int max_len) 1006 { 1007 if (s->socket_reader->UnconsumedBufferLength == 0) 1008 return 0; 1009 1010 int read_len = PJ_MIN((int)s->socket_reader->UnconsumedBufferLength, 1011 max_len); 1012 IBuffer^ buffer = s->socket_reader->ReadBuffer(read_len); 1013 read_len = buffer->Length; 1014 CopyFromIBuffer((unsigned char*)buf, read_len, buffer); 1015 return read_len; 1302 return s->SendTo(buf, len, to); 1016 1303 } 1017 1304 … … 1031 1318 1032 1319 PjUwpSocket *s = (PjUwpSocket*)sock; 1033 1034 /* Only for TCP, at least for now! */ 1035 if (s->sock_type == SOCKTYPE_DATAGRAM) 1036 return PJ_ENOTSUP; 1037 1038 if (s->sock_type != SOCKTYPE_STREAM || 1039 s->sock_state != SOCKSTATE_CONNECTED) 1040 { 1041 return PJ_EINVALIDOP; 1042 } 1043 1044 /* First check if there is already some data in the read buffer */ 1045 int avail_len = consume_read_buffer(s, buf, *len); 1046 if (avail_len > 0) { 1047 *len = avail_len; 1048 return PJ_SUCCESS; 1049 } 1050 1051 /* Start sync read */ 1052 if (s->is_blocking) { 1053 pj_status_t status = PJ_SUCCESS; 1054 concurrency::cancellation_token_source cts; 1055 auto cts_token = cts.get_token(); 1056 auto t = concurrency::create_task(s->socket_reader->LoadAsync(*len), cts_token); 1057 *len = cancel_after_timeout(t, cts, READ_TIMEOUT) 1058 .then([s, len, buf, cts_token, &status](concurrency::task<unsigned int> t_) 1059 { 1060 try { 1061 if (cts_token.is_canceled()) { 1062 status = PJ_ETIMEDOUT; 1063 return 0; 1064 } 1065 t_.get(); 1066 } catch (Exception^) { 1067 status = PJ_ETIMEDOUT; 1068 return 0; 1069 } 1070 1071 *len = consume_read_buffer(s, buf, *len); 1072 return (int)*len; 1073 }).get(); 1074 1075 return status; 1076 } 1077 1078 /* Start async read */ 1079 int read_len = *len; 1080 concurrency::create_task(s->socket_reader->LoadAsync(read_len)) 1081 .then([s, &read_len](concurrency::task<unsigned int> t_) 1082 { 1083 try { 1084 // catch any exception 1085 t_.get(); 1086 1087 // invoke callback 1088 read_len = PJ_MIN((int)s->socket_reader->UnconsumedBufferLength, 1089 read_len); 1090 if (read_len > 0 && s->on_read) { 1091 (*s->on_read)(s, read_len); 1092 } 1093 } catch (Exception^ e) { 1094 // invoke callback 1095 if (s->on_read) { 1096 (*s->on_read)(s, -PJ_RETURN_OS_ERROR(e->HResult)); 1097 } 1098 return 0; 1099 } 1100 1101 return (int)read_len; 1102 }); 1103 1104 return PJ_EPENDING; 1320 return s->Recv(buf, len); 1105 1321 } 1106 1322 … … 1123 1339 1124 1340 PjUwpSocket *s = (PjUwpSocket*)sock; 1125 1126 if (s->sock_type != SOCKTYPE_DATAGRAM || 1127 s->sock_state < SOCKSTATE_INITIALIZED) 1128 { 1129 return PJ_EINVALIDOP; 1130 } 1131 1132 /* Start receive, if not yet */ 1133 if (s->datagram_recv_helper == nullptr) { 1134 s->datagram_recv_helper = ref new PjUwpSocketDatagramRecvHelper(s); 1135 } 1136 1137 /* Try to read any available data first */ 1138 pj_status_t status = s->datagram_recv_helper-> 1139 ReadDataIfAvailable(buf, len, from); 1140 if (status != PJ_ENOTFOUND) 1141 return status; 1142 1143 /* Start sync read */ 1144 if (s->is_blocking) { 1145 while (status == PJ_ENOTFOUND && s->sock_state <= SOCKSTATE_CONNECTED) 1146 { 1147 status = s->datagram_recv_helper-> 1148 ReadDataIfAvailable(buf, len, from); 1149 pj_thread_sleep(100); 1150 } 1151 return PJ_SUCCESS; 1152 } 1153 1154 /* For async read, just return PJ_EPENDING */ 1155 return PJ_EPENDING; 1341 pj_status_t status = s->RecvFrom(buf, len, from); 1342 if (status == PJ_SUCCESS) 1343 *fromlen = pj_sockaddr_get_len(from); 1344 return status; 1156 1345 } 1157 1346 … … 1222 1411 } 1223 1412 1224 static pj_status_t tcp_bind(PjUwpSocket *s)1225 {1226 /* If no bound address is set, just return */1227 if (!pj_sockaddr_has_addr(&s->local_addr) &&1228 pj_sockaddr_get_port(&s->local_addr)==0)1229 {1230 return PJ_SUCCESS;1231 }1232 1233 HRESULT err = 0;1234 1235 try {1236 concurrency::create_task([s]() {1237 HostName ^hostname;1238 int port;1239 sockaddr_to_hostname_port(&s->local_addr, hostname, &port);1240 1241 s->listener_sock->BindEndpointAsync(hostname,1242 port.ToString());1243 }).then([s, &err](concurrency::task<void> t)1244 {1245 try {1246 t.get();1247 s->sock_state = SOCKSTATE_CONNECTED;1248 } catch (Exception^ e) {1249 err = e->HResult;1250 }1251 }).get();1252 } catch (Exception^ e) {1253 err = e->HResult;1254 }1255 1256 return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS);1257 }1258 1259 1413 1260 1414 /* … … 1267 1421 PJ_CHECK_STACK(); 1268 1422 PJ_ASSERT_RETURN(sock && addr, PJ_EINVAL); 1269 1270 1423 PJ_UNUSED_ARG(namelen); 1271 1424 1272 1425 PjUwpSocket *s = (PjUwpSocket*)sock; 1273 pj_status_t status; 1274 1275 pj_sockaddr_cp(&s->remote_addr, addr); 1276 1277 /* UDP */ 1278 1279 if (s->sock_type == SOCKTYPE_DATAGRAM) { 1280 if (s->sock_state != SOCKSTATE_INITIALIZED) 1281 return PJ_EINVALIDOP; 1282 1283 HostName ^hostname; 1284 int port; 1285 sockaddr_to_hostname_port(addr, hostname, &port); 1286 1287 try { 1288 concurrency::create_task(s->datagram_sock->ConnectAsync 1289 (hostname, port.ToString())) 1290 .then([s](concurrency::task<void> t_) 1291 { 1292 try { 1293 t_.get(); 1294 } catch (Exception^ ex) 1295 { 1296 1297 } 1298 }).get(); 1299 } catch (Exception^) { 1300 return PJ_EUNKNOWN; 1301 } 1302 1303 // Update local & remote address 1304 wstr_addr_to_sockaddr(s->datagram_sock->Information->RemoteAddress->CanonicalName->Data(), 1305 s->datagram_sock->Information->RemotePort->Data(), 1306 &s->remote_addr); 1307 wstr_addr_to_sockaddr(s->datagram_sock->Information->LocalAddress->CanonicalName->Data(), 1308 s->datagram_sock->Information->LocalPort->Data(), 1309 &s->local_addr); 1310 1311 s->sock_state = SOCKSTATE_CONNECTED; 1312 1313 return PJ_SUCCESS; 1314 } 1315 1316 /* TCP */ 1317 1318 /* Init stream socket now */ 1319 s->InitSocket(SOCKTYPE_STREAM); 1320 1321 pj_sockaddr_cp(&s->remote_addr, addr); 1322 wstr_addr_to_sockaddr(s->stream_sock->Information->LocalAddress->CanonicalName->Data(), 1323 s->stream_sock->Information->LocalPort->Data(), 1324 &s->local_addr); 1325 1326 /* Perform any pending bind */ 1327 status = tcp_bind(s); 1328 if (status != PJ_SUCCESS) 1329 return status; 1330 1331 char tmp[PJ_INET6_ADDRSTRLEN]; 1332 wchar_t wtmp[PJ_INET6_ADDRSTRLEN]; 1333 pj_sockaddr_print(addr, tmp, PJ_INET6_ADDRSTRLEN, 0); 1334 pj_ansi_to_unicode(tmp, pj_ansi_strlen(tmp), wtmp, 1335 PJ_INET6_ADDRSTRLEN); 1336 auto host = ref new HostName(ref new String(wtmp)); 1337 int port = pj_sockaddr_get_port(addr); 1338 1339 auto t = concurrency::create_task(s->stream_sock->ConnectAsync 1340 (host, port.ToString(), SocketProtectionLevel::PlainSocket)) 1341 .then([=](concurrency::task<void> t_) 1342 { 1343 try { 1344 t_.get(); 1345 s->socket_reader = ref new DataReader(s->stream_sock->InputStream); 1346 s->socket_writer = ref new DataWriter(s->stream_sock->OutputStream); 1347 1348 // Update local & remote address 1349 wstr_addr_to_sockaddr(s->stream_sock->Information->RemoteAddress->CanonicalName->Data(), 1350 s->stream_sock->Information->RemotePort->Data(), 1351 &s->remote_addr); 1352 wstr_addr_to_sockaddr(s->stream_sock->Information->LocalAddress->CanonicalName->Data(), 1353 s->stream_sock->Information->LocalPort->Data(), 1354 &s->local_addr); 1355 1356 s->sock_state = SOCKSTATE_CONNECTED; 1357 1358 if (!s->is_blocking && s->on_connect) { 1359 (*s->on_connect)(s, PJ_SUCCESS); 1360 } 1361 return (pj_status_t)PJ_SUCCESS; 1362 } catch (Exception^ ex) { 1363 SocketErrorStatus status = SocketError::GetStatus(ex->HResult); 1364 1365 switch (status) 1366 { 1367 case SocketErrorStatus::UnreachableHost: 1368 break; 1369 case SocketErrorStatus::ConnectionTimedOut: 1370 break; 1371 case SocketErrorStatus::ConnectionRefused: 1372 break; 1373 default: 1374 break; 1375 } 1376 1377 if (!s->is_blocking && s->on_connect) { 1378 (*s->on_connect)(s, PJ_EUNKNOWN); 1379 } 1380 1381 return (pj_status_t)PJ_EUNKNOWN; 1382 } 1383 }); 1384 1385 if (!s->is_blocking) 1386 return PJ_EPENDING; 1387 1388 try { 1389 status = t.get(); 1390 } catch (Exception^) { 1391 return PJ_EUNKNOWN; 1392 } 1393 return status; 1426 return s->Connect(addr); 1394 1427 } 1395 1428 … … 1420 1453 1421 1454 PjUwpSocket *s = (PjUwpSocket*)sock; 1422 pj_status_t status; 1423 1424 /* Init listener socket now */ 1425 s->InitSocket(SOCKTYPE_LISTENER); 1426 1427 /* Perform any pending bind */ 1428 status = tcp_bind(s); 1429 if (status != PJ_SUCCESS) 1430 return status; 1431 1432 /* Start listen */ 1433 if (s->listener_helper == nullptr) { 1434 s->listener_helper = ref new PjUwpSocketListenerHelper(s); 1435 } 1436 1437 return PJ_SUCCESS; 1455 return s->Listen(); 1438 1456 } 1439 1457 … … 1446 1464 int *addrlen) 1447 1465 { 1448 PJ_CHECK_STACK(); 1449 1466 pj_status_t status; 1467 1468 PJ_CHECK_STACK(); 1450 1469 PJ_ASSERT_RETURN(serverfd && newsock, PJ_EINVAL); 1451 1470 1452 1471 PjUwpSocket *s = (PjUwpSocket*)serverfd; 1453 1454 if (s->sock_type != SOCKTYPE_LISTENER || 1455 s->sock_state != SOCKSTATE_INITIALIZED) 1456 { 1457 return PJ_EINVALIDOP; 1458 } 1459 1460 StreamSocket^ accepted_sock; 1461 pj_status_t status = s->listener_helper->GetAcceptedSocket(accepted_sock); 1462 if (status == PJ_ENOTFOUND) 1463 return PJ_EPENDING; 1464 1472 PjUwpSocket *new_uwp_sock; 1473 1474 status = s->Accept(&new_uwp_sock); 1465 1475 if (status != PJ_SUCCESS) 1466 1476 return status; 1467 1468 PjUwpSocket *new_sock = s->CreateAcceptSocket(accepted_sock);1469 1470 pj_sockaddr_cp(addr, &new_sock->remote_addr);1471 *addrlen = pj_sockaddr_get_len(&new_sock->remote_addr);1472 * newsock = (pj_sock_t)new_sock;1477 if (newsock == NULL) 1478 return PJ_ENOTFOUND; 1479 1480 *newsock = (pj_sock_t)new_uwp_sock; 1481 pj_sockaddr_cp(addr, new_uwp_sock->GetRemoteAddr()); 1482 *addrlen = pj_sockaddr_get_len(addr); 1473 1483 1474 1484 return PJ_SUCCESS; -
pjproject/branches/projects/uwp/pjlib/src/pj/sock_uwp.h
r5254 r5256 19 19 #pragma once 20 20 21 22 #include <pj/assert.h> 23 #include <pj/sock.h> 24 #include <pj/string.h> 25 #include <pj/unicode.h> 26 27 21 28 enum { 22 29 READ_TIMEOUT = 60 * 1000, … … 37 44 ref class PjUwpSocketDatagramRecvHelper; 38 45 ref class PjUwpSocketListenerHelper; 46 class PjUwpSocket; 47 48 49 typedef struct PjUwpSocketCallback 50 { 51 void (*on_read)(PjUwpSocket *s, int bytes_read); 52 void (*on_write)(PjUwpSocket *s, int bytes_sent); 53 void (*on_accept)(PjUwpSocket *s); 54 void (*on_connect)(PjUwpSocket *s, pj_status_t status); 55 } PjUwpSocketCallback; 56 39 57 40 58 /* … … 45 63 public: 46 64 PjUwpSocket(int af_, int type_, int proto_); 65 virtual ~PjUwpSocket(); 66 pj_status_t InitSocket(enum PjUwpSocketType sock_type_); 67 void DeinitSocket(); 68 69 void* GetUserData() { return user_data; } 70 void SetNonBlocking(const PjUwpSocketCallback *cb_, void *user_data_) 71 { 72 is_blocking = PJ_FALSE; 73 cb=*cb_; 74 user_data = user_data_; 75 } 76 77 enum PjUwpSocketType GetType() { return sock_type; } 78 enum PjUwpSocketState GetState() { return sock_state; } 79 80 pj_sockaddr* GetLocalAddr() { return &local_addr; } 81 pj_sockaddr* GetRemoteAddr() { return &remote_addr; } 82 83 84 pj_status_t Bind(const pj_sockaddr_t *addr = NULL); 85 pj_status_t Send(const void *buf, pj_ssize_t *len); 86 pj_status_t SendTo(const void *buf, pj_ssize_t *len, const pj_sockaddr_t *to); 87 pj_status_t Recv(void *buf, pj_ssize_t *len); 88 pj_status_t RecvFrom(void *buf, pj_ssize_t *len, pj_sockaddr_t *from); 89 pj_status_t Connect(const pj_sockaddr_t *addr); 90 pj_status_t Listen(); 91 pj_status_t Accept(PjUwpSocket **new_sock); 92 93 void (*on_read)(PjUwpSocket *s, int bytes_read); 94 void (*on_write)(PjUwpSocket *s, int bytes_sent); 95 void (*on_accept)(PjUwpSocket *s, pj_status_t status); 96 void (*on_connect)(PjUwpSocket *s, pj_status_t status); 97 98 private: 47 99 PjUwpSocket* CreateAcceptSocket(Windows::Networking::Sockets::StreamSocket^ stream_sock_); 48 virtual ~PjUwpSocket(); 49 50 pj_status_t InitSocket(enum PjUwpSocketType sock_type_); 51 52 public: 100 pj_status_t SendImp(const void *buf, pj_ssize_t *len); 101 int ConsumeReadBuffer(void *buf, int max_len); 102 53 103 int af; 54 104 int type; … … 57 107 pj_sockaddr remote_addr; 58 108 pj_bool_t is_blocking; 109 pj_bool_t has_pending_bind; 110 pj_bool_t has_pending_send; 111 pj_bool_t has_pending_recv; 59 112 void *user_data; 113 PjUwpSocketCallback cb; 60 114 61 115 enum PjUwpSocketType sock_type; … … 66 120 67 121 /* Helper objects */ 68 PjUwpSocketDatagramRecvHelper^ d atagram_recv_helper;122 PjUwpSocketDatagramRecvHelper^ dgram_recv_helper; 69 123 PjUwpSocketListenerHelper^ listener_helper; 70 124 … … 72 126 Windows::Storage::Streams::DataWriter^ socket_writer; 73 127 Windows::Storage::Streams::IBuffer^ send_buffer; 74 pj_bool_t is_busy_sending; 75 76 void *read_userdata; 77 void *write_userdata; 78 void *accept_userdata; 79 80 void (*on_read)(PjUwpSocket *s, int bytes_read); 81 void (*on_write)(PjUwpSocket *s, int bytes_sent); 82 void (*on_accept)(PjUwpSocket *s, pj_status_t status); 83 void (*on_connect)(PjUwpSocket *s, pj_status_t status); 128 129 friend PjUwpSocketDatagramRecvHelper; 130 friend PjUwpSocketListenerHelper; 84 131 }; 85 132 … … 93 140 pj_sockaddr_t *sockaddr) 94 141 { 142 #if 0 95 143 char tmp_str_buf[PJ_INET6_ADDRSTRLEN+1]; 96 144 pj_assert(wcslen(waddr) < sizeof(tmp_str_buf)); … … 102 150 103 151 return PJ_SUCCESS; 152 #endif 153 char tmp_str_buf[PJ_INET6_ADDRSTRLEN+1]; 154 pj_assert(wcslen(waddr) < sizeof(tmp_str_buf)); 155 pj_unicode_to_ansi(waddr, wcslen(waddr), tmp_str_buf, sizeof(tmp_str_buf)); 156 pj_str_t remote_host; 157 pj_strset(&remote_host, tmp_str_buf, pj_ansi_strlen(tmp_str_buf)); 158 pj_sockaddr *addr = (pj_sockaddr*)sockaddr; 159 pj_bool_t got_addr = PJ_FALSE; 160 161 if (pj_inet_pton(PJ_AF_INET, &remote_host, &addr->ipv4.sin_addr) 162 == PJ_SUCCESS) 163 { 164 addr->addr.sa_family = PJ_AF_INET; 165 got_addr = PJ_TRUE; 166 } else if (pj_inet_pton(PJ_AF_INET6, &remote_host, &addr->ipv6.sin6_addr) 167 == PJ_SUCCESS) 168 { 169 addr->addr.sa_family = PJ_AF_INET6; 170 got_addr = PJ_TRUE; 171 } 172 if (!got_addr) 173 return PJ_EINVAL; 174 175 pj_sockaddr_set_port(addr, (pj_uint16_t)_wtoi(wport)); 176 return PJ_SUCCESS; 104 177 } 105 178 -
pjproject/branches/projects/uwp/pjmedia/src/pjmedia/transport_udp.c
r4538 r5256 753 753 754 754 if (udp->attached) { 755 int i; 756 755 757 /* Lock the ioqueue keys to make sure that callbacks are 756 758 * not executed. See ticket #460 for details. … … 772 774 udp->rtcp_cb = NULL; 773 775 udp->user_data = NULL; 776 777 /* Cancel any outstanding send */ 778 for (i=0; i<PJ_ARRAY_SIZE(udp->rtp_pending_write); ++i) { 779 pj_ioqueue_post_completion(udp->rtp_key, 780 &udp->rtp_pending_write[i].op_key, 0); 781 } 782 pj_ioqueue_post_completion(udp->rtcp_key, &udp->rtcp_write_op, 0); 774 783 775 784 /* Unlock keys */ -
pjproject/branches/projects/uwp/pjsip-apps/src/pjsua/winrt/gui/uwp/VoipBackEnd/MyApp.cpp
r5254 r5256 471 471 472 472 try { 473 sipTpConfig->port = 5060; 473 474 ep.transportCreate(::pjsip_transport_type_e::PJSIP_TRANSPORT_TCP, 474 475 *sipTpConfig); … … 583 584 bool MyAppRT::isThreadRegistered() 584 585 { 585 return ep.libIsThreadRegistered(); 586 // Some threads are registered using PJLIB API, ep.libIsThreadRegistered() will return false on those threads. 587 //return ep.libIsThreadRegistered(); 588 return pj_thread_is_registered() != PJ_FALSE; 586 589 } 587 590
Note: See TracChangeset
for help on using the changeset viewer.