- Timestamp:
- Mar 11, 2016 4:17:32 AM (9 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/branches/projects/uwp/pjlib/src/pj/sock_uwp.cpp
r5254 r5256 17 17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 18 18 */ 19 #include <pj/sock.h>20 #include <pj/addr_resolv.h>21 19 #include <pj/assert.h> 22 20 #include <pj/errno.h> 23 21 #include <pj/math.h> 24 22 #include <pj/os.h> 25 #include <pj/string.h>26 #include <pj/unicode.h>27 23 #include <pj/compat/socket.h> 28 24 … … 30 26 #include <string> 31 27 28 #define THIS_FILE "sock_uwp.cpp" 29 32 30 #include "sock_uwp.h" 33 34 #define THIS_FILE "sock_uwp.cpp"35 31 36 32 /* … … 201 197 { 202 198 try { 199 if (uwp_sock->sock_state >= SOCKSTATE_DISCONNECTED) 200 return; 201 203 202 recv_args = args; 204 203 avail_data_len = args->GetDataReader()->UnconsumedBufferLength; 205 204 206 // Notify application asynchronously 207 concurrency::create_task([this]() 208 { 209 if (uwp_sock->on_read) { 210 if (!pj_thread_is_registered()) 211 pj_thread_register("MsgReceive", thread_desc, 212 &rec_thread); 213 214 (tp)(*uwp_sock->read_userdata) 215 (*uwp_sock->on_read)(uwp_sock, avail_data_len); 216 } 217 }); 205 if (uwp_sock->cb.on_read) { 206 (*uwp_sock->cb.on_read)(uwp_sock, avail_data_len); 207 } 218 208 219 209 WaitForSingleObjectEx(recv_wait, INFINITE, false); 220 } catch ( Exception^ e) {}210 } catch (...) {} 221 211 } 222 212 … … 265 255 HANDLE recv_wait; 266 256 int avail_data_len; 267 pj_thread_desc thread_desc;268 pj_thread_t *rec_thread;269 257 }; 270 258 … … 288 276 conn_args = args; 289 277 290 // Notify application asynchronously 291 concurrency::create_task([this]() 292 { 293 if (uwp_sock->on_accept) { 294 if (!pj_thread_is_registered()) 295 pj_thread_register("ConnReceive", thread_desc, 296 &listener_thread); 297 298 (*uwp_sock->on_accept)(uwp_sock, PJ_SUCCESS); 299 } 300 }); 278 if (uwp_sock->cb.on_accept) { 279 (*uwp_sock->cb.on_accept)(uwp_sock); 280 } 301 281 302 282 WaitForSingleObjectEx(conn_wait, INFINITE, false); … … 304 284 } 305 285 306 pj_status_t GetAcceptedSocket(StreamSocket^ stream_sock)286 pj_status_t GetAcceptedSocket(StreamSocket^& stream_sock) 307 287 { 308 288 if (conn_args == nullptr) … … 333 313 EventRegistrationToken event_token; 334 314 HANDLE conn_wait; 335 336 pj_thread_desc thread_desc;337 pj_thread_t *listener_thread;338 315 }; 339 316 … … 341 318 PjUwpSocket::PjUwpSocket(int af_, int type_, int proto_) : 342 319 af(af_), type(type_), proto(proto_), 343 sock_type(SOCKTYPE_UNKNOWN), sock_state(SOCKSTATE_NULL), 344 is_blocking(PJ_TRUE), is_busy_sending(PJ_FALSE) 320 sock_type(SOCKTYPE_UNKNOWN), 321 sock_state(SOCKSTATE_NULL), 322 is_blocking(PJ_TRUE), 323 has_pending_bind(PJ_FALSE), 324 has_pending_send(PJ_FALSE), 325 has_pending_recv(PJ_FALSE) 345 326 { 346 327 pj_sockaddr_init(pj_AF_INET(), &local_addr, NULL, 0); … … 349 330 350 331 PjUwpSocket::~PjUwpSocket() 351 {} 332 { 333 DeinitSocket(); 334 } 352 335 353 336 PjUwpSocket* PjUwpSocket::CreateAcceptSocket(Windows::Networking::Sockets::StreamSocket^ stream_sock_) … … 359 342 new_sock->socket_reader = ref new DataReader(new_sock->stream_sock->InputStream); 360 343 new_sock->socket_writer = ref new DataWriter(new_sock->stream_sock->OutputStream); 344 new_sock->socket_reader->InputStreamOptions = InputStreamOptions::Partial; 361 345 new_sock->send_buffer = ref new Buffer(SEND_BUFFER_SIZE); 362 346 new_sock->is_blocking = is_blocking; … … 399 383 } 400 384 385 386 void PjUwpSocket::DeinitSocket() 387 { 388 if (stream_sock) { 389 concurrency::create_task(stream_sock->CancelIOAsync()).wait(); 390 } 391 if (datagram_sock) { 392 concurrency::create_task(datagram_sock->CancelIOAsync()).wait(); 393 } 394 if (listener_sock) { 395 concurrency::create_task(listener_sock->CancelIOAsync()).wait(); 396 } 397 stream_sock = nullptr; 398 datagram_sock = nullptr; 399 dgram_recv_helper = nullptr; 400 listener_sock = nullptr; 401 listener_helper = nullptr; 402 socket_writer = nullptr; 403 socket_reader = nullptr; 404 sock_state = SOCKSTATE_NULL; 405 } 406 407 pj_status_t PjUwpSocket::Bind(const pj_sockaddr_t *addr) 408 { 409 /* Not initialized yet, socket type is perhaps TCP, just not decided yet 410 * whether it is a stream or a listener. 411 */ 412 if (sock_state < SOCKSTATE_INITIALIZED) { 413 pj_sockaddr_cp(&local_addr, addr); 414 has_pending_bind = PJ_TRUE; 415 return PJ_SUCCESS; 416 } 417 418 PJ_ASSERT_RETURN(sock_state == SOCKSTATE_INITIALIZED, PJ_EINVALIDOP); 419 if (sock_type != SOCKTYPE_DATAGRAM && sock_type != SOCKTYPE_LISTENER) 420 return PJ_EINVALIDOP; 421 422 if (has_pending_bind) { 423 has_pending_bind = PJ_FALSE; 424 if (!addr) 425 addr = &local_addr; 426 } 427 428 /* If no bound address is set, just return */ 429 if (!pj_sockaddr_has_addr(addr) && !pj_sockaddr_get_port(addr)) 430 return PJ_SUCCESS; 431 432 if (addr != &local_addr) 433 pj_sockaddr_cp(&local_addr, addr); 434 435 HRESULT err = 0; 436 try { 437 concurrency::create_task([this, addr]() { 438 HostName ^host; 439 int port; 440 sockaddr_to_hostname_port(addr, host, &port); 441 if (pj_sockaddr_has_addr(addr)) { 442 if (sock_type == SOCKTYPE_DATAGRAM) 443 return datagram_sock->BindEndpointAsync(host, port.ToString()); 444 else 445 return listener_sock->BindEndpointAsync(host, port.ToString()); 446 } else /* if (pj_sockaddr_get_port(addr) != 0) */ { 447 if (sock_type == SOCKTYPE_DATAGRAM) 448 return datagram_sock->BindServiceNameAsync(port.ToString()); 449 else 450 return listener_sock->BindServiceNameAsync(port.ToString()); 451 } 452 }).then([this, &err](concurrency::task<void> t) 453 { 454 try { 455 t.get(); 456 } catch (Exception^ e) { 457 err = e->HResult; 458 } 459 }).get(); 460 } catch (Exception^ e) { 461 err = e->HResult; 462 } 463 464 return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS); 465 } 466 467 468 pj_status_t PjUwpSocket::SendImp(const void *buf, pj_ssize_t *len) 469 { 470 if (has_pending_send) 471 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 472 473 if (*len > (pj_ssize_t)send_buffer->Capacity) 474 return PJ_ETOOBIG; 475 476 CopyToIBuffer((unsigned char*)buf, *len, send_buffer); 477 send_buffer->Length = *len; 478 socket_writer->WriteBuffer(send_buffer); 479 480 /* Blocking version */ 481 if (is_blocking) { 482 pj_status_t status = PJ_SUCCESS; 483 concurrency::cancellation_token_source cts; 484 auto cts_token = cts.get_token(); 485 auto t = concurrency::create_task(socket_writer->StoreAsync(), 486 cts_token); 487 *len = cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 488 then([cts_token, &status](concurrency::task<unsigned int> t_) 489 { 490 int sent = 0; 491 try { 492 if (cts_token.is_canceled()) 493 status = PJ_ETIMEDOUT; 494 else 495 sent = t_.get(); 496 } catch (Exception^ e) { 497 status = PJ_RETURN_OS_ERROR(e->HResult); 498 } 499 return sent; 500 }).get(); 501 502 return status; 503 } 504 505 /* Non-blocking version */ 506 has_pending_send = PJ_TRUE; 507 concurrency::create_task(socket_writer->StoreAsync()). 508 then([this](concurrency::task<unsigned int> t_) 509 { 510 try { 511 unsigned int l = t_.get(); 512 has_pending_send = PJ_FALSE; 513 514 // invoke callback 515 if (cb.on_write) { 516 (*cb.on_write)(this, l); 517 } 518 } catch (...) { 519 has_pending_send = PJ_FALSE; 520 sock_state = SOCKSTATE_ERROR; 521 DeinitSocket(); 522 523 // invoke callback 524 if (cb.on_write) { 525 (*cb.on_write)(this, -PJ_EUNKNOWN); 526 } 527 } 528 }); 529 530 return PJ_SUCCESS; 531 } 532 533 534 pj_status_t PjUwpSocket::Send(const void *buf, pj_ssize_t *len) 535 { 536 if ((sock_type!=SOCKTYPE_STREAM && sock_type!=SOCKTYPE_DATAGRAM) || 537 (sock_state!=SOCKSTATE_CONNECTED)) 538 { 539 return PJ_EINVALIDOP; 540 } 541 542 /* Sending for SOCKTYPE_DATAGRAM is implemented in pj_sock_sendto() */ 543 if (sock_type == SOCKTYPE_DATAGRAM) { 544 return SendTo(buf, len, &remote_addr); 545 } 546 547 return SendImp(buf, len); 548 } 549 550 551 pj_status_t PjUwpSocket::SendTo(const void *buf, pj_ssize_t *len, 552 const pj_sockaddr_t *to) 553 { 554 if (sock_type != SOCKTYPE_DATAGRAM || sock_state < SOCKSTATE_INITIALIZED 555 || sock_state >= SOCKSTATE_DISCONNECTED) 556 { 557 return PJ_EINVALIDOP; 558 } 559 560 if (has_pending_send) 561 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 562 563 if (*len > (pj_ssize_t)send_buffer->Capacity) 564 return PJ_ETOOBIG; 565 566 HostName ^hostname; 567 int port; 568 sockaddr_to_hostname_port(to, hostname, &port); 569 570 concurrency::cancellation_token_source cts; 571 auto cts_token = cts.get_token(); 572 auto t = concurrency::create_task(datagram_sock->GetOutputStreamAsync( 573 hostname, port.ToString()), cts_token); 574 pj_status_t status = PJ_SUCCESS; 575 576 cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 577 then([this, cts_token, &status](concurrency::task<IOutputStream^> t_) 578 { 579 try { 580 if (cts_token.is_canceled()) { 581 status = PJ_ETIMEDOUT; 582 } else { 583 IOutputStream^ outstream = t_.get(); 584 socket_writer = ref new DataWriter(outstream); 585 } 586 } catch (Exception^ e) { 587 status = PJ_RETURN_OS_ERROR(e->HResult); 588 } 589 }).get(); 590 591 if (status != PJ_SUCCESS) 592 return status; 593 594 status = SendImp(buf, len); 595 if ((status == PJ_SUCCESS || status == PJ_EPENDING) && 596 sock_state < SOCKSTATE_CONNECTED) 597 { 598 sock_state = SOCKSTATE_CONNECTED; 599 } 600 601 return status; 602 } 603 604 605 int PjUwpSocket::ConsumeReadBuffer(void *buf, int max_len) 606 { 607 if (socket_reader->UnconsumedBufferLength == 0) 608 return 0; 609 610 int read_len = PJ_MIN((int)socket_reader->UnconsumedBufferLength,max_len); 611 IBuffer^ buffer = socket_reader->ReadBuffer(read_len); 612 read_len = buffer->Length; 613 CopyFromIBuffer((unsigned char*)buf, read_len, buffer); 614 return read_len; 615 } 616 617 618 pj_status_t PjUwpSocket::Recv(void *buf, pj_ssize_t *len) 619 { 620 /* Only for TCP, at least for now! */ 621 if (sock_type == SOCKTYPE_DATAGRAM) 622 return PJ_ENOTSUP; 623 624 if (sock_type != SOCKTYPE_STREAM || sock_state != SOCKSTATE_CONNECTED) 625 return PJ_EINVALIDOP; 626 627 if (has_pending_recv) 628 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 629 630 /* First check if there is already some data in the read buffer */ 631 if (buf) { 632 int avail_len = ConsumeReadBuffer(buf, *len); 633 if (avail_len > 0) { 634 *len = avail_len; 635 return PJ_SUCCESS; 636 } 637 } 638 639 /* Blocking version */ 640 if (is_blocking) { 641 pj_status_t status = PJ_SUCCESS; 642 concurrency::cancellation_token_source cts; 643 auto cts_token = cts.get_token(); 644 auto t = concurrency::create_task(socket_reader->LoadAsync(*len), 645 cts_token); 646 *len = cancel_after_timeout(t, cts, READ_TIMEOUT) 647 .then([this, len, buf, cts_token, &status] 648 (concurrency::task<unsigned int> t_) 649 { 650 try { 651 if (cts_token.is_canceled()) { 652 status = PJ_ETIMEDOUT; 653 return 0; 654 } 655 t_.get(); 656 } catch (Exception^) { 657 status = PJ_ETIMEDOUT; 658 return 0; 659 } 660 661 *len = ConsumeReadBuffer(buf, *len); 662 return (int)*len; 663 }).get(); 664 665 return status; 666 } 667 668 /* Non-blocking version */ 669 670 has_pending_recv = PJ_TRUE; 671 concurrency::create_task(socket_reader->LoadAsync(*len)) 672 .then([this](concurrency::task<unsigned int> t_) 673 { 674 try { 675 // catch any exception 676 t_.get(); 677 has_pending_recv = PJ_FALSE; 678 679 // invoke callback 680 int read_len = socket_reader->UnconsumedBufferLength; 681 if (read_len > 0 && cb.on_read) { 682 (*cb.on_read)(this, read_len); 683 } 684 } catch (Exception^ e) { 685 has_pending_recv = PJ_FALSE; 686 687 // invoke callback 688 if (cb.on_read) { 689 (*cb.on_read)(this, -PJ_EUNKNOWN); 690 } 691 } 692 }); 693 694 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 695 } 696 697 698 pj_status_t PjUwpSocket::RecvFrom(void *buf, pj_ssize_t *len, 699 pj_sockaddr_t *from) 700 { 701 if (sock_type != SOCKTYPE_DATAGRAM || sock_state < SOCKSTATE_INITIALIZED 702 || sock_state >= SOCKSTATE_DISCONNECTED) 703 { 704 return PJ_EINVALIDOP; 705 } 706 707 /* Start receive, if not yet */ 708 if (dgram_recv_helper == nullptr) { 709 dgram_recv_helper = ref new PjUwpSocketDatagramRecvHelper(this); 710 } 711 712 /* Try to read any available data first */ 713 if (buf || is_blocking) { 714 pj_status_t status; 715 status = dgram_recv_helper->ReadDataIfAvailable(buf, len, from); 716 if (status != PJ_ENOTFOUND) 717 return status; 718 } 719 720 /* Blocking version */ 721 if (is_blocking) { 722 int max_loop = 0; 723 pj_status_t status = PJ_ENOTFOUND; 724 while (status == PJ_ENOTFOUND && sock_state <= SOCKSTATE_CONNECTED) 725 { 726 status = dgram_recv_helper->ReadDataIfAvailable(buf, len, from); 727 if (status != PJ_SUCCESS) 728 pj_thread_sleep(100); 729 730 if (++max_loop > 10) 731 return PJ_ETIMEDOUT; 732 } 733 return status; 734 } 735 736 /* For non-blocking version, just return PJ_EPENDING */ 737 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 738 } 739 740 741 pj_status_t PjUwpSocket::Connect(const pj_sockaddr_t *addr) 742 { 743 pj_status_t status; 744 745 PJ_ASSERT_RETURN((sock_type == SOCKTYPE_UNKNOWN && sock_state == SOCKSTATE_NULL) || 746 (sock_type == SOCKTYPE_DATAGRAM && sock_state == SOCKSTATE_INITIALIZED), 747 PJ_EINVALIDOP); 748 749 if (sock_type == SOCKTYPE_UNKNOWN) { 750 InitSocket(SOCKTYPE_STREAM); 751 // No need to check pending bind, no bind for TCP client socket 752 } 753 754 pj_sockaddr_cp(&remote_addr, addr); 755 756 auto t = concurrency::create_task([this, addr]() 757 { 758 HostName ^hostname; 759 int port; 760 sockaddr_to_hostname_port(&remote_addr, hostname, &port); 761 if (sock_type == SOCKTYPE_STREAM) 762 return stream_sock->ConnectAsync(hostname, port.ToString(), 763 SocketProtectionLevel::PlainSocket); 764 else 765 return datagram_sock->ConnectAsync(hostname, port.ToString()); 766 }).then([=](concurrency::task<void> t_) 767 { 768 try { 769 t_.get(); 770 771 sock_state = SOCKSTATE_CONNECTED; 772 773 // Update local & remote address 774 HostName^ local_address; 775 String^ local_port; 776 777 if (sock_type == SOCKTYPE_STREAM) { 778 local_address = stream_sock->Information->LocalAddress; 779 local_port = stream_sock->Information->LocalPort; 780 781 socket_reader = ref new DataReader(stream_sock->InputStream); 782 socket_writer = ref new DataWriter(stream_sock->OutputStream); 783 socket_reader->InputStreamOptions = InputStreamOptions::Partial; 784 } else { 785 local_address = datagram_sock->Information->LocalAddress; 786 local_port = datagram_sock->Information->LocalPort; 787 } 788 if (local_address && local_port) { 789 wstr_addr_to_sockaddr(local_address->CanonicalName->Data(), 790 local_port->Data(), 791 &local_addr); 792 } 793 794 if (!is_blocking && cb.on_connect) { 795 (*cb.on_connect)(this, PJ_SUCCESS); 796 } 797 return (pj_status_t)PJ_SUCCESS; 798 799 } catch (Exception^ ex) { 800 801 SocketErrorStatus status = SocketError::GetStatus(ex->HResult); 802 803 switch (status) 804 { 805 case SocketErrorStatus::UnreachableHost: 806 break; 807 case SocketErrorStatus::ConnectionTimedOut: 808 break; 809 case SocketErrorStatus::ConnectionRefused: 810 break; 811 default: 812 break; 813 } 814 815 if (!is_blocking && cb.on_connect) { 816 (*cb.on_connect)(this, PJ_EUNKNOWN); 817 } 818 819 return (pj_status_t)PJ_EUNKNOWN; 820 } 821 }); 822 823 if (!is_blocking) 824 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_CONNECT_ERROR_VAL); 825 826 try { 827 status = t.get(); 828 } catch (Exception^) { 829 return PJ_EUNKNOWN; 830 } 831 return status; 832 } 833 834 pj_status_t PjUwpSocket::Listen() 835 { 836 PJ_ASSERT_RETURN((sock_type == SOCKTYPE_UNKNOWN) || 837 (sock_type == SOCKTYPE_LISTENER && 838 sock_state == SOCKSTATE_INITIALIZED), 839 PJ_EINVALIDOP); 840 841 if (sock_type == SOCKTYPE_UNKNOWN) 842 InitSocket(SOCKTYPE_LISTENER); 843 844 if (has_pending_bind) 845 Bind(); 846 847 /* Start listen */ 848 if (listener_helper == nullptr) { 849 listener_helper = ref new PjUwpSocketListenerHelper(this); 850 } 851 852 return PJ_SUCCESS; 853 } 854 855 pj_status_t PjUwpSocket::Accept(PjUwpSocket **new_sock) 856 { 857 if (sock_type != SOCKTYPE_LISTENER || sock_state != SOCKSTATE_INITIALIZED) 858 return PJ_EINVALIDOP; 859 860 StreamSocket^ accepted_sock; 861 pj_status_t status = listener_helper->GetAcceptedSocket(accepted_sock); 862 if (status == PJ_ENOTFOUND) 863 return PJ_RETURN_OS_ERROR(PJ_BLOCKING_ERROR_VAL); 864 865 if (status != PJ_SUCCESS) 866 return status; 867 868 *new_sock = CreateAcceptSocket(accepted_sock); 869 return PJ_SUCCESS; 870 } 401 871 402 872 … … 718 1188 PJ_ASSERT_RETURN(sock, PJ_EINVAL); 719 1189 PJ_ASSERT_RETURN(addr && len>=(int)sizeof(pj_sockaddr_in), PJ_EINVAL); 720 721 1190 PjUwpSocket *s = (PjUwpSocket*)sock; 722 723 if (s->sock_state > SOCKSTATE_INITIALIZED) 724 return PJ_EINVALIDOP; 725 726 pj_sockaddr_cp(&s->local_addr, addr); 727 728 /* Bind now if this is UDP. But if it is TCP, unfortunately we don't 729 * know yet whether it is SocketStream or Listener! 730 */ 731 if (s->type == pj_SOCK_DGRAM()) { 732 HRESULT err = 0; 733 734 try { 735 concurrency::create_task([s, addr]() { 736 HostName ^hostname; 737 int port; 738 sockaddr_to_hostname_port(addr, hostname, &port); 739 if (pj_sockaddr_has_addr(addr)) { 740 s->datagram_sock->BindEndpointAsync(hostname, 741 port.ToString()); 742 } else if (pj_sockaddr_get_port(addr) != 0) { 743 s->datagram_sock->BindServiceNameAsync(port.ToString()); 744 } 745 }).then([s, &err](concurrency::task<void> t) 746 { 747 try { 748 t.get(); 749 s->sock_state = SOCKSTATE_CONNECTED; 750 } catch (Exception^ e) { 751 err = e->HResult; 752 } 753 }).get(); 754 } catch (Exception^ e) { 755 err = e->HResult; 756 } 757 758 return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS); 759 } 760 761 return PJ_SUCCESS; 1191 return s->Bind(addr); 762 1192 } 763 1193 … … 812 1242 813 1243 PjUwpSocket *s = (PjUwpSocket*)sock; 814 815 pj_sockaddr_cp(addr, &s->remote_addr); 816 *namelen = pj_sockaddr_get_len(&s->remote_addr); 1244 pj_sockaddr_cp(addr, s->GetRemoteAddr()); 1245 *namelen = pj_sockaddr_get_len(addr); 817 1246 818 1247 return PJ_SUCCESS; … … 831 1260 832 1261 PjUwpSocket *s = (PjUwpSocket*)sock; 833 834 pj_sockaddr_cp(addr, &s->local_addr); 835 *namelen = pj_sockaddr_get_len(&s->local_addr); 1262 pj_sockaddr_cp(addr, s->GetLocalAddr()); 1263 *namelen = pj_sockaddr_get_len(addr); 836 1264 837 1265 return PJ_SUCCESS; 838 1266 } 839 1267 840 841 static pj_status_t sock_send_imp(PjUwpSocket *s, const void *buf,842 pj_ssize_t *len)843 {844 if (s->is_busy_sending)845 return PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK);846 847 if (*len > (pj_ssize_t)s->send_buffer->Capacity)848 return PJ_ETOOBIG;849 850 CopyToIBuffer((unsigned char*)buf, *len, s->send_buffer);851 s->send_buffer->Length = *len;852 s->socket_writer->WriteBuffer(s->send_buffer);853 854 if (s->is_blocking) {855 pj_status_t status = PJ_SUCCESS;856 concurrency::cancellation_token_source cts;857 auto cts_token = cts.get_token();858 auto t = concurrency::create_task(s->socket_writer->StoreAsync(),859 cts_token);860 *len = cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT).861 then([cts_token, &status](concurrency::task<unsigned int> t_)862 {863 int sent = 0;864 try {865 if (cts_token.is_canceled())866 status = PJ_ETIMEDOUT;867 else868 sent = t_.get();869 } catch (Exception^ e) {870 status = PJ_RETURN_OS_ERROR(e->HResult);871 }872 return sent;873 }).get();874 875 return status;876 }877 878 s->is_busy_sending = true;879 concurrency::create_task(s->socket_writer->StoreAsync()).880 then([s](concurrency::task<unsigned int> t_)881 {882 try {883 unsigned int l = t_.get();884 s->is_busy_sending = false;885 886 // invoke callback887 if (s->on_write) {888 (*s->on_write)(s, l);889 }890 } catch (Exception^ e) {891 s->is_busy_sending = false;892 if (s->sock_type == SOCKTYPE_STREAM)893 s->sock_state = SOCKSTATE_DISCONNECTED;894 895 // invoke callback896 if (s->on_write) {897 (*s->on_write)(s, -PJ_RETURN_OS_ERROR(e->HResult));898 }899 }900 });901 902 return PJ_EPENDING;903 }904 1268 905 1269 /* … … 916 1280 917 1281 PjUwpSocket *s = (PjUwpSocket*)sock; 918 919 if ((s->sock_type!=SOCKTYPE_STREAM && s->sock_type!=SOCKTYPE_DATAGRAM) || 920 (s->sock_state!=SOCKSTATE_CONNECTED)) 921 { 922 return PJ_EINVALIDOP; 923 } 924 925 /* Sending for SOCKTYPE_DATAGRAM is implemented in pj_sock_sendto() */ 926 if (s->sock_type == SOCKTYPE_DATAGRAM) { 927 return pj_sock_sendto(sock, buf, len, flags, &s->remote_addr, 928 pj_sockaddr_get_len(&s->remote_addr)); 929 } 930 931 return sock_send_imp(s, buf, len); 1282 return s->Send(buf, len); 932 1283 } 933 1284 … … 949 1300 950 1301 PjUwpSocket *s = (PjUwpSocket*)sock; 951 952 if (s->sock_type != SOCKTYPE_DATAGRAM || 953 s->sock_state < SOCKSTATE_INITIALIZED) 954 { 955 return PJ_EINVALIDOP; 956 } 957 958 if (s->is_busy_sending) 959 return PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK); 960 961 if (*len > (pj_ssize_t)s->send_buffer->Capacity) 962 return PJ_ETOOBIG; 963 964 HostName ^hostname; 965 int port; 966 sockaddr_to_hostname_port(to, hostname, &port); 967 968 concurrency::cancellation_token_source cts; 969 auto cts_token = cts.get_token(); 970 auto t = concurrency::create_task( 971 s->datagram_sock->GetOutputStreamAsync( 972 hostname, port.ToString()), cts_token); 973 pj_status_t status = PJ_SUCCESS; 974 975 cancel_after_timeout(t, cts, (unsigned int)WRITE_TIMEOUT). 976 then([s, cts_token, &status](concurrency::task<IOutputStream^> t_) 977 { 978 try { 979 if (cts_token.is_canceled()) { 980 status = PJ_ETIMEDOUT; 981 } else { 982 IOutputStream^ outstream = t_.get(); 983 s->socket_writer = ref new DataWriter(outstream); 984 } 985 } catch (Exception^ e) { 986 status = PJ_RETURN_OS_ERROR(e->HResult); 987 } 988 }).get(); 989 990 if (status != PJ_SUCCESS) 991 return status; 992 993 status = sock_send_imp(s, buf, len); 994 995 if ((status == PJ_SUCCESS || status == PJ_EPENDING) && 996 s->sock_state < SOCKSTATE_CONNECTED) 997 { 998 s->sock_state = SOCKSTATE_CONNECTED; 999 } 1000 1001 return status; 1002 } 1003 1004 1005 static int consume_read_buffer(PjUwpSocket *s, void *buf, int max_len) 1006 { 1007 if (s->socket_reader->UnconsumedBufferLength == 0) 1008 return 0; 1009 1010 int read_len = PJ_MIN((int)s->socket_reader->UnconsumedBufferLength, 1011 max_len); 1012 IBuffer^ buffer = s->socket_reader->ReadBuffer(read_len); 1013 read_len = buffer->Length; 1014 CopyFromIBuffer((unsigned char*)buf, read_len, buffer); 1015 return read_len; 1302 return s->SendTo(buf, len, to); 1016 1303 } 1017 1304 … … 1031 1318 1032 1319 PjUwpSocket *s = (PjUwpSocket*)sock; 1033 1034 /* Only for TCP, at least for now! */ 1035 if (s->sock_type == SOCKTYPE_DATAGRAM) 1036 return PJ_ENOTSUP; 1037 1038 if (s->sock_type != SOCKTYPE_STREAM || 1039 s->sock_state != SOCKSTATE_CONNECTED) 1040 { 1041 return PJ_EINVALIDOP; 1042 } 1043 1044 /* First check if there is already some data in the read buffer */ 1045 int avail_len = consume_read_buffer(s, buf, *len); 1046 if (avail_len > 0) { 1047 *len = avail_len; 1048 return PJ_SUCCESS; 1049 } 1050 1051 /* Start sync read */ 1052 if (s->is_blocking) { 1053 pj_status_t status = PJ_SUCCESS; 1054 concurrency::cancellation_token_source cts; 1055 auto cts_token = cts.get_token(); 1056 auto t = concurrency::create_task(s->socket_reader->LoadAsync(*len), cts_token); 1057 *len = cancel_after_timeout(t, cts, READ_TIMEOUT) 1058 .then([s, len, buf, cts_token, &status](concurrency::task<unsigned int> t_) 1059 { 1060 try { 1061 if (cts_token.is_canceled()) { 1062 status = PJ_ETIMEDOUT; 1063 return 0; 1064 } 1065 t_.get(); 1066 } catch (Exception^) { 1067 status = PJ_ETIMEDOUT; 1068 return 0; 1069 } 1070 1071 *len = consume_read_buffer(s, buf, *len); 1072 return (int)*len; 1073 }).get(); 1074 1075 return status; 1076 } 1077 1078 /* Start async read */ 1079 int read_len = *len; 1080 concurrency::create_task(s->socket_reader->LoadAsync(read_len)) 1081 .then([s, &read_len](concurrency::task<unsigned int> t_) 1082 { 1083 try { 1084 // catch any exception 1085 t_.get(); 1086 1087 // invoke callback 1088 read_len = PJ_MIN((int)s->socket_reader->UnconsumedBufferLength, 1089 read_len); 1090 if (read_len > 0 && s->on_read) { 1091 (*s->on_read)(s, read_len); 1092 } 1093 } catch (Exception^ e) { 1094 // invoke callback 1095 if (s->on_read) { 1096 (*s->on_read)(s, -PJ_RETURN_OS_ERROR(e->HResult)); 1097 } 1098 return 0; 1099 } 1100 1101 return (int)read_len; 1102 }); 1103 1104 return PJ_EPENDING; 1320 return s->Recv(buf, len); 1105 1321 } 1106 1322 … … 1123 1339 1124 1340 PjUwpSocket *s = (PjUwpSocket*)sock; 1125 1126 if (s->sock_type != SOCKTYPE_DATAGRAM || 1127 s->sock_state < SOCKSTATE_INITIALIZED) 1128 { 1129 return PJ_EINVALIDOP; 1130 } 1131 1132 /* Start receive, if not yet */ 1133 if (s->datagram_recv_helper == nullptr) { 1134 s->datagram_recv_helper = ref new PjUwpSocketDatagramRecvHelper(s); 1135 } 1136 1137 /* Try to read any available data first */ 1138 pj_status_t status = s->datagram_recv_helper-> 1139 ReadDataIfAvailable(buf, len, from); 1140 if (status != PJ_ENOTFOUND) 1141 return status; 1142 1143 /* Start sync read */ 1144 if (s->is_blocking) { 1145 while (status == PJ_ENOTFOUND && s->sock_state <= SOCKSTATE_CONNECTED) 1146 { 1147 status = s->datagram_recv_helper-> 1148 ReadDataIfAvailable(buf, len, from); 1149 pj_thread_sleep(100); 1150 } 1151 return PJ_SUCCESS; 1152 } 1153 1154 /* For async read, just return PJ_EPENDING */ 1155 return PJ_EPENDING; 1341 pj_status_t status = s->RecvFrom(buf, len, from); 1342 if (status == PJ_SUCCESS) 1343 *fromlen = pj_sockaddr_get_len(from); 1344 return status; 1156 1345 } 1157 1346 … … 1222 1411 } 1223 1412 1224 static pj_status_t tcp_bind(PjUwpSocket *s)1225 {1226 /* If no bound address is set, just return */1227 if (!pj_sockaddr_has_addr(&s->local_addr) &&1228 pj_sockaddr_get_port(&s->local_addr)==0)1229 {1230 return PJ_SUCCESS;1231 }1232 1233 HRESULT err = 0;1234 1235 try {1236 concurrency::create_task([s]() {1237 HostName ^hostname;1238 int port;1239 sockaddr_to_hostname_port(&s->local_addr, hostname, &port);1240 1241 s->listener_sock->BindEndpointAsync(hostname,1242 port.ToString());1243 }).then([s, &err](concurrency::task<void> t)1244 {1245 try {1246 t.get();1247 s->sock_state = SOCKSTATE_CONNECTED;1248 } catch (Exception^ e) {1249 err = e->HResult;1250 }1251 }).get();1252 } catch (Exception^ e) {1253 err = e->HResult;1254 }1255 1256 return (err? PJ_RETURN_OS_ERROR(err) : PJ_SUCCESS);1257 }1258 1259 1413 1260 1414 /* … … 1267 1421 PJ_CHECK_STACK(); 1268 1422 PJ_ASSERT_RETURN(sock && addr, PJ_EINVAL); 1269 1270 1423 PJ_UNUSED_ARG(namelen); 1271 1424 1272 1425 PjUwpSocket *s = (PjUwpSocket*)sock; 1273 pj_status_t status; 1274 1275 pj_sockaddr_cp(&s->remote_addr, addr); 1276 1277 /* UDP */ 1278 1279 if (s->sock_type == SOCKTYPE_DATAGRAM) { 1280 if (s->sock_state != SOCKSTATE_INITIALIZED) 1281 return PJ_EINVALIDOP; 1282 1283 HostName ^hostname; 1284 int port; 1285 sockaddr_to_hostname_port(addr, hostname, &port); 1286 1287 try { 1288 concurrency::create_task(s->datagram_sock->ConnectAsync 1289 (hostname, port.ToString())) 1290 .then([s](concurrency::task<void> t_) 1291 { 1292 try { 1293 t_.get(); 1294 } catch (Exception^ ex) 1295 { 1296 1297 } 1298 }).get(); 1299 } catch (Exception^) { 1300 return PJ_EUNKNOWN; 1301 } 1302 1303 // Update local & remote address 1304 wstr_addr_to_sockaddr(s->datagram_sock->Information->RemoteAddress->CanonicalName->Data(), 1305 s->datagram_sock->Information->RemotePort->Data(), 1306 &s->remote_addr); 1307 wstr_addr_to_sockaddr(s->datagram_sock->Information->LocalAddress->CanonicalName->Data(), 1308 s->datagram_sock->Information->LocalPort->Data(), 1309 &s->local_addr); 1310 1311 s->sock_state = SOCKSTATE_CONNECTED; 1312 1313 return PJ_SUCCESS; 1314 } 1315 1316 /* TCP */ 1317 1318 /* Init stream socket now */ 1319 s->InitSocket(SOCKTYPE_STREAM); 1320 1321 pj_sockaddr_cp(&s->remote_addr, addr); 1322 wstr_addr_to_sockaddr(s->stream_sock->Information->LocalAddress->CanonicalName->Data(), 1323 s->stream_sock->Information->LocalPort->Data(), 1324 &s->local_addr); 1325 1326 /* Perform any pending bind */ 1327 status = tcp_bind(s); 1328 if (status != PJ_SUCCESS) 1329 return status; 1330 1331 char tmp[PJ_INET6_ADDRSTRLEN]; 1332 wchar_t wtmp[PJ_INET6_ADDRSTRLEN]; 1333 pj_sockaddr_print(addr, tmp, PJ_INET6_ADDRSTRLEN, 0); 1334 pj_ansi_to_unicode(tmp, pj_ansi_strlen(tmp), wtmp, 1335 PJ_INET6_ADDRSTRLEN); 1336 auto host = ref new HostName(ref new String(wtmp)); 1337 int port = pj_sockaddr_get_port(addr); 1338 1339 auto t = concurrency::create_task(s->stream_sock->ConnectAsync 1340 (host, port.ToString(), SocketProtectionLevel::PlainSocket)) 1341 .then([=](concurrency::task<void> t_) 1342 { 1343 try { 1344 t_.get(); 1345 s->socket_reader = ref new DataReader(s->stream_sock->InputStream); 1346 s->socket_writer = ref new DataWriter(s->stream_sock->OutputStream); 1347 1348 // Update local & remote address 1349 wstr_addr_to_sockaddr(s->stream_sock->Information->RemoteAddress->CanonicalName->Data(), 1350 s->stream_sock->Information->RemotePort->Data(), 1351 &s->remote_addr); 1352 wstr_addr_to_sockaddr(s->stream_sock->Information->LocalAddress->CanonicalName->Data(), 1353 s->stream_sock->Information->LocalPort->Data(), 1354 &s->local_addr); 1355 1356 s->sock_state = SOCKSTATE_CONNECTED; 1357 1358 if (!s->is_blocking && s->on_connect) { 1359 (*s->on_connect)(s, PJ_SUCCESS); 1360 } 1361 return (pj_status_t)PJ_SUCCESS; 1362 } catch (Exception^ ex) { 1363 SocketErrorStatus status = SocketError::GetStatus(ex->HResult); 1364 1365 switch (status) 1366 { 1367 case SocketErrorStatus::UnreachableHost: 1368 break; 1369 case SocketErrorStatus::ConnectionTimedOut: 1370 break; 1371 case SocketErrorStatus::ConnectionRefused: 1372 break; 1373 default: 1374 break; 1375 } 1376 1377 if (!s->is_blocking && s->on_connect) { 1378 (*s->on_connect)(s, PJ_EUNKNOWN); 1379 } 1380 1381 return (pj_status_t)PJ_EUNKNOWN; 1382 } 1383 }); 1384 1385 if (!s->is_blocking) 1386 return PJ_EPENDING; 1387 1388 try { 1389 status = t.get(); 1390 } catch (Exception^) { 1391 return PJ_EUNKNOWN; 1392 } 1393 return status; 1426 return s->Connect(addr); 1394 1427 } 1395 1428 … … 1420 1453 1421 1454 PjUwpSocket *s = (PjUwpSocket*)sock; 1422 pj_status_t status; 1423 1424 /* Init listener socket now */ 1425 s->InitSocket(SOCKTYPE_LISTENER); 1426 1427 /* Perform any pending bind */ 1428 status = tcp_bind(s); 1429 if (status != PJ_SUCCESS) 1430 return status; 1431 1432 /* Start listen */ 1433 if (s->listener_helper == nullptr) { 1434 s->listener_helper = ref new PjUwpSocketListenerHelper(s); 1435 } 1436 1437 return PJ_SUCCESS; 1455 return s->Listen(); 1438 1456 } 1439 1457 … … 1446 1464 int *addrlen) 1447 1465 { 1448 PJ_CHECK_STACK(); 1449 1466 pj_status_t status; 1467 1468 PJ_CHECK_STACK(); 1450 1469 PJ_ASSERT_RETURN(serverfd && newsock, PJ_EINVAL); 1451 1470 1452 1471 PjUwpSocket *s = (PjUwpSocket*)serverfd; 1453 1454 if (s->sock_type != SOCKTYPE_LISTENER || 1455 s->sock_state != SOCKSTATE_INITIALIZED) 1456 { 1457 return PJ_EINVALIDOP; 1458 } 1459 1460 StreamSocket^ accepted_sock; 1461 pj_status_t status = s->listener_helper->GetAcceptedSocket(accepted_sock); 1462 if (status == PJ_ENOTFOUND) 1463 return PJ_EPENDING; 1464 1472 PjUwpSocket *new_uwp_sock; 1473 1474 status = s->Accept(&new_uwp_sock); 1465 1475 if (status != PJ_SUCCESS) 1466 1476 return status; 1467 1468 PjUwpSocket *new_sock = s->CreateAcceptSocket(accepted_sock);1469 1470 pj_sockaddr_cp(addr, &new_sock->remote_addr);1471 *addrlen = pj_sockaddr_get_len(&new_sock->remote_addr);1472 * newsock = (pj_sock_t)new_sock;1477 if (newsock == NULL) 1478 return PJ_ENOTFOUND; 1479 1480 *newsock = (pj_sock_t)new_uwp_sock; 1481 pj_sockaddr_cp(addr, new_uwp_sock->GetRemoteAddr()); 1482 *addrlen = pj_sockaddr_get_len(addr); 1473 1483 1474 1484 return PJ_SUCCESS;
Note: See TracChangeset
for help on using the changeset viewer.