- Timestamp:
- Oct 24, 2006 5:13:30 PM (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
pjproject/branches/symbian/pjlib/src/pj/ioqueue_symbian.cpp
r686 r788 18 18 */ 19 19 #include <pj/ioqueue.h> 20 #include <pj/assert.h> 21 #include <pj/errno.h> 22 #include <pj/list.h> 23 #include <pj/lock.h> 24 #include <pj/pool.h> 25 #include <pj/string.h> 26 27 #include "os_symbian.h" 28 29 class CIoqueueCallback; 30 31 /* 32 * IO Queue structure. 33 */ 34 struct pj_ioqueue_t 35 { 36 int eventCount; 37 CPjTimeoutTimer *timeoutTimer; 38 }; 39 40 41 ///////////////////////////////////////////////////////////////////////////// 42 // Class to encapsulate asynchronous socket operation. 43 // 44 class CIoqueueCallback : public CActive 45 { 46 public: 47 CIoqueueCallback(pj_ioqueue_t *ioqueue, 48 pj_ioqueue_key_t *key, pj_sock_t sock, 49 const pj_ioqueue_callback *cb, void *user_data) 50 : CActive(CActive::EPriorityStandard), 51 ioqueue_(ioqueue), key_(key), sock_((CPjSocket*)sock), cb_(cb), 52 user_data_(user_data), aBufferPtr_(NULL, 0), type_(TYPE_NONE) 53 { 54 CActiveScheduler::Add(this); 55 } 56 57 // 58 // Start asynchronous recv() operation 59 // 60 pj_status_t StartRead(pj_ioqueue_op_key_t *op_key, 61 void *buf, pj_ssize_t *size, unsigned flags, 62 pj_sockaddr_t *addr, int *addrlen) 63 { 64 PJ_ASSERT_RETURN(IsActive()==false, PJ_EBUSY); 65 PJ_ASSERT_RETURN(pending_data_.common_.op_key_==NULL, PJ_EBUSY); 66 67 pending_data_.read_.op_key_ = op_key; 68 pending_data_.read_.addr_ = addr; 69 pending_data_.read_.addrlen_ = addrlen; 70 71 aBufferPtr_.Set((TUint8*)buf, 0, (TInt)*size); 72 73 type_ = TYPE_READ; 74 SetActive(); 75 sock_->Socket().RecvFrom(aBufferPtr_, aAddress_, flags, iStatus); 76 77 return PJ_EPENDING; 78 } 79 80 81 // 82 // Start asynchronous accept() operation. 83 // 84 pj_status_t StartAccept(pj_ioqueue_op_key_t *op_key, 85 pj_sock_t *new_sock, 86 pj_sockaddr_t *local, 87 pj_sockaddr_t *remote, 88 int *addrlen ) 89 { 90 PJ_ASSERT_RETURN(IsActive()==false, PJ_EBUSY); 91 PJ_ASSERT_RETURN(pending_data_.common_.op_key_==NULL, PJ_EBUSY); 92 93 pending_data_.accept_.op_key_ = op_key; 94 pending_data_.accept_.new_sock_ = new_sock; 95 pending_data_.accept_.local_ = local; 96 pending_data_.accept_.remote_ = remote; 97 pending_data_.accept_.addrlen_ = addrlen; 98 99 // Create blank socket 100 blank_sock_.Open(PjSymbianOS::Instance()->SocketServ()); 101 102 type_ = TYPE_ACCEPT; 103 SetActive(); 104 sock_->Socket().Accept(blank_sock_, iStatus); 105 106 return PJ_EPENDING; 107 } 108 109 110 // 111 // Completion callback. 112 // 113 void RunL() 114 { 115 Type cur_type = type_; 116 117 type_ = TYPE_NONE; 118 119 if (cur_type == TYPE_READ) { 120 // 121 // Completion of asynchronous RecvFrom() 122 // 123 124 /* Clear op_key (save it to temp variable first!) */ 125 pj_ioqueue_op_key_t *op_key = pending_data_.read_.op_key_; 126 pending_data_.read_.op_key_ = NULL; 127 128 // Handle failure condition 129 if (iStatus != KErrNone) { 130 cb_->on_read_complete(key_, op_key, -PJ_RETURN_OS_ERROR(iStatus.Int())); 131 return; 132 } 133 134 if (pending_data_.read_.addr_) { 135 PjSymbianOS::Addr2pj(aAddress_, 136 *(pj_sockaddr_in*)pending_data_.read_.addr_); 137 pending_data_.read_.addr_ = NULL; 138 } 139 if (pending_data_.read_.addrlen_) { 140 *pending_data_.read_.addrlen_ = sizeof(pj_sockaddr_in); 141 pending_data_.read_.addrlen_ = NULL; 142 } 143 144 /* Call callback */ 145 cb_->on_read_complete(key_, op_key, aBufferPtr_.Length()); 146 147 } else if (cur_type == TYPE_ACCEPT) { 148 // 149 // Completion of asynchronous Accept() 150 // 151 152 /* Clear op_key (save it to temp variable first!) */ 153 pj_ioqueue_op_key_t *op_key = pending_data_.read_.op_key_; 154 pending_data_.read_.op_key_ = NULL; 155 156 // Handle failure condition 157 if (iStatus != KErrNone) { 158 if (pending_data_.accept_.new_sock_) 159 *pending_data_.accept_.new_sock_ = PJ_INVALID_SOCKET; 160 161 cb_->on_accept_complete(key_, op_key, PJ_INVALID_SOCKET, 162 -PJ_RETURN_OS_ERROR(iStatus.Int())); 163 return; 164 } 165 166 CPjSocket *pjNewSock = new CPjSocket(blank_sock_); 167 168 if (pending_data_.accept_.new_sock_) { 169 *pending_data_.accept_.new_sock_ = (pj_sock_t)pjNewSock; 170 pending_data_.accept_.new_sock_ = NULL; 171 } 172 173 if (pending_data_.accept_.local_) { 174 TInetAddr aAddr; 175 blank_sock_.LocalName(aAddr); 176 PjSymbianOS::Addr2pj(aAddr, *(pj_sockaddr_in*)pending_data_.accept_.local_); 177 pending_data_.accept_.local_ = NULL; 178 } 179 180 if (pending_data_.accept_.remote_) { 181 TInetAddr aAddr; 182 blank_sock_.RemoteName(aAddr); 183 PjSymbianOS::Addr2pj(aAddr, *(pj_sockaddr_in*)pending_data_.accept_.remote_); 184 pending_data_.accept_.remote_ = NULL; 185 } 186 187 if (pending_data_.accept_.addrlen_) { 188 *pending_data_.accept_.addrlen_ = sizeof(pj_sockaddr_in); 189 pending_data_.accept_.addrlen_ = NULL; 190 } 191 192 // Call callback. 193 cb_->on_accept_complete(key_, op_key, (pj_sock_t)pjNewSock, PJ_SUCCESS); 194 } 195 196 ioqueue_->eventCount++; 197 } 198 199 // 200 // CActive's DoCancel() 201 // 202 void DoCancel() 203 { 204 if (type_ == TYPE_READ) 205 sock_->Socket().CancelRecv(); 206 else if (type_ == TYPE_ACCEPT) 207 sock_->Socket().CancelAccept(); 208 209 type_ = TYPE_NONE; 210 } 211 212 // 213 // Cancel operation and call callback. 214 // 215 void CancelOperation(pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_status) 216 { 217 Type cur_type = type_; 218 219 Cancel(); 220 221 if (cur_type == TYPE_READ) 222 cb_->on_read_complete(key_, op_key, bytes_status); 223 else if (cur_type == TYPE_ACCEPT) 224 ; 225 } 226 227 // 228 // Accessors 229 // 230 void* get_user_data() const 231 { 232 return user_data_; 233 } 234 void set_user_data(void *user_data) 235 { 236 user_data_ = user_data; 237 } 238 pj_ioqueue_op_key_t *get_op_key() const 239 { 240 return pending_data_.common_.op_key_; 241 } 242 CPjSocket* get_pj_socket() 243 { 244 return sock_; 245 } 246 247 private: 248 // Type of pending operation. 249 enum Type { 250 TYPE_NONE, 251 TYPE_READ, 252 TYPE_ACCEPT, 253 }; 254 255 // Static data. 256 pj_ioqueue_t *ioqueue_; 257 pj_ioqueue_key_t *key_; 258 CPjSocket *sock_; 259 const pj_ioqueue_callback *cb_; 260 void *user_data_; 261 262 // Symbian data. 263 TPtr8 aBufferPtr_; 264 TInetAddr aAddress_; 265 266 // Application data. 267 Type type_; 268 269 union Pending_Data 270 { 271 struct Common 272 { 273 pj_ioqueue_op_key_t *op_key_; 274 } common_; 275 276 struct Pending_Read 277 { 278 pj_ioqueue_op_key_t *op_key_; 279 pj_sockaddr_t *addr_; 280 int *addrlen_; 281 } read_; 282 283 struct Pending_Accept 284 { 285 pj_ioqueue_op_key_t *op_key_; 286 pj_sock_t *new_sock_; 287 pj_sockaddr_t *local_; 288 pj_sockaddr_t *remote_; 289 int *addrlen_; 290 } accept_; 291 }; 292 293 union Pending_Data pending_data_; 294 RSocket blank_sock_; 295 }; 296 297 298 299 300 /* 301 * IO Queue key structure. 302 */ 303 struct pj_ioqueue_key_t 304 { 305 CIoqueueCallback *cbObj; 306 }; 20 307 21 308 … … 25 312 PJ_DEF(const char*) pj_ioqueue_name(void) 26 313 { 27 return "symbian"; 28 } 29 314 return "ioqueue-symbian"; 315 } 316 317 318 /* 319 * Create a new I/O Queue framework. 320 */ 321 PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 322 pj_size_t max_fd, 323 pj_ioqueue_t **p_ioqueue) 324 { 325 pj_ioqueue_t *ioq; 326 327 PJ_UNUSED_ARG(max_fd); 328 329 ioq = (pj_ioqueue_t*) pj_pool_zalloc(pool, sizeof(pj_ioqueue_t)); 330 ioq->timeoutTimer = CPjTimeoutTimer::NewL(); 331 *p_ioqueue = ioq; 332 return PJ_SUCCESS; 333 } 334 335 336 /* 337 * Destroy the I/O queue. 338 */ 339 PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioq ) 340 { 341 delete ioq->timeoutTimer; 342 ioq->timeoutTimer = NULL; 343 344 return PJ_SUCCESS; 345 } 346 347 348 /* 349 * Set the lock object to be used by the I/O Queue. 350 */ 351 PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioq, 352 pj_lock_t *lock, 353 pj_bool_t auto_delete ) 354 { 355 /* Don't really need lock for now */ 356 PJ_UNUSED_ARG(ioq); 357 358 if (auto_delete) { 359 pj_lock_destroy(lock); 360 } 361 362 return PJ_SUCCESS; 363 } 364 365 366 /* 367 * Register a socket to the I/O queue framework. 368 */ 369 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, 370 pj_ioqueue_t *ioq, 371 pj_sock_t sock, 372 void *user_data, 373 const pj_ioqueue_callback *cb, 374 pj_ioqueue_key_t **p_key ) 375 { 376 pj_ioqueue_key_t *key; 377 378 key = (pj_ioqueue_key_t*) pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); 379 key->cbObj = new CIoqueueCallback(ioq, key, sock, cb, user_data); 380 381 *p_key = key; 382 return PJ_SUCCESS; 383 } 384 385 /* 386 * Unregister from the I/O Queue framework. 387 */ 388 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) 389 { 390 if (key == NULL || key->cbObj == NULL) 391 return PJ_SUCCESS; 392 393 // Close socket. 394 key->cbObj->get_pj_socket()->Socket().Close(); 395 delete key->cbObj->get_pj_socket(); 396 397 // Delete async object 398 delete key->cbObj; 399 key->cbObj = NULL; 400 401 return PJ_SUCCESS; 402 } 403 404 405 /* 406 * Get user data associated with an ioqueue key. 407 */ 408 PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) 409 { 410 return key->cbObj->get_user_data(); 411 } 412 413 414 /* 415 * Set or change the user data to be associated with the file descriptor or 416 * handle or socket descriptor. 417 */ 418 PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, 419 void *user_data, 420 void **old_data) 421 { 422 if (old_data) 423 *old_data = key->cbObj->get_user_data(); 424 key->cbObj->set_user_data(user_data); 425 426 return PJ_SUCCESS; 427 } 428 429 430 /* 431 * Initialize operation key. 432 */ 433 PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key, 434 pj_size_t size ) 435 { 436 pj_memset(op_key, 0, size); 437 } 438 439 440 /* 441 * Check if operation is pending on the specified operation key. 442 */ 443 PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key, 444 pj_ioqueue_op_key_t *op_key ) 445 { 446 return key->cbObj->get_op_key()==op_key && 447 key->cbObj->IsActive(); 448 } 449 450 451 /* 452 * Post completion status to the specified operation key and call the 453 * appropriate callback. 454 */ 455 PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, 456 pj_ioqueue_op_key_t *op_key, 457 pj_ssize_t bytes_status ) 458 { 459 if (pj_ioqueue_is_pending(key, op_key)) { 460 key->cbObj->CancelOperation(op_key, bytes_status); 461 } 462 return PJ_SUCCESS; 463 } 464 465 466 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 467 /** 468 * Instruct I/O Queue to accept incoming connection on the specified 469 * listening socket. 470 */ 471 PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, 472 pj_ioqueue_op_key_t *op_key, 473 pj_sock_t *new_sock, 474 pj_sockaddr_t *local, 475 pj_sockaddr_t *remote, 476 int *addrlen ) 477 { 478 479 return key->cbObj->StartAccept(op_key, new_sock, local, remote, addrlen); 480 } 481 482 483 /* 484 * Initiate non-blocking socket connect. 485 */ 486 PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, 487 const pj_sockaddr_t *addr, 488 int addrlen ) 489 { 490 PJ_ASSERT_RETURN(addrlen == sizeof(pj_sockaddr_in), PJ_EINVAL); 491 492 RSocket &rSock = key->cbObj->get_pj_socket()->Socket(); 493 TInetAddr inetAddr; 494 PjSymbianOS::pj2Addr(*(const pj_sockaddr_in*)addr, inetAddr); 495 TRequestStatus reqStatus; 496 497 // We don't support async connect for now. 498 PJ_TODO(IOQUEUE_SUPPORT_ASYNC_CONNECT); 499 500 rSock.Connect(inetAddr, reqStatus); 501 User::WaitForRequest(reqStatus); 502 503 if (reqStatus == KErrNone) 504 return PJ_SUCCESS; 505 506 return PJ_RETURN_OS_ERROR(reqStatus.Int()); 507 } 508 509 510 #endif /* PJ_HAS_TCP */ 511 512 /* 513 * Poll the I/O Queue for completed events. 514 */ 515 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioq, 516 const pj_time_val *timeout) 517 { 518 CPjTimeoutTimer *timer; 519 520 if (timeout) { 521 if (!ioq->timeoutTimer->IsActive()) 522 timer = ioq->timeoutTimer; 523 else 524 timer = CPjTimeoutTimer::NewL(); 525 526 timer->StartTimer(timeout->sec*1000 + timeout->msec); 527 528 } else { 529 timer = NULL; 530 } 531 532 ioq->eventCount = 0; 533 534 do { 535 PjSymbianOS::Instance()->WaitForActiveObjects(); 536 } while (ioq->eventCount == 0 && !timer->HasTimedOut()); 537 538 if (!timer->HasTimedOut()) 539 timer->Cancel(); 540 541 if (timer != ioq->timeoutTimer) 542 delete timer; 543 544 return ioq->eventCount; 545 } 546 547 548 /* 549 * Instruct the I/O Queue to read from the specified handle. 550 */ 551 PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, 552 pj_ioqueue_op_key_t *op_key, 553 void *buffer, 554 pj_ssize_t *length, 555 pj_uint32_t flags ) 556 { 557 return pj_ioqueue_recvfrom(key, op_key, buffer, length, flags, NULL, NULL); 558 } 559 560 561 /* 562 * This function behaves similarly as #pj_ioqueue_recv(), except that it is 563 * normally called for socket, and the remote address will also be returned 564 * along with the data. 565 */ 566 PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, 567 pj_ioqueue_op_key_t *op_key, 568 void *buffer, 569 pj_ssize_t *length, 570 pj_uint32_t flags, 571 pj_sockaddr_t *addr, 572 int *addrlen) 573 { 574 if (key->cbObj->IsActive()) 575 return PJ_EBUSY; 576 577 return key->cbObj->StartRead(op_key, buffer, length, flags, addr, addrlen); 578 } 579 580 581 /* 582 * Instruct the I/O Queue to write to the handle. 583 */ 584 PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, 585 pj_ioqueue_op_key_t *op_key, 586 const void *data, 587 pj_ssize_t *length, 588 pj_uint32_t flags ) 589 { 590 TRequestStatus reqStatus; 591 TPtrC8 aBuffer((const TUint8*)data, (TInt)*length); 592 TSockXfrLength aLen; 593 594 PJ_UNUSED_ARG(op_key); 595 596 // Forcing pending operation is not supported. 597 PJ_ASSERT_RETURN((flags & PJ_IOQUEUE_ALWAYS_ASYNC)==0, PJ_EINVAL); 598 599 key->cbObj->get_pj_socket()->Socket().Send(aBuffer, flags, reqStatus, aLen); 600 User::WaitForRequest(reqStatus); 601 602 if (reqStatus.Int() != KErrNone) 603 return PJ_RETURN_OS_ERROR(reqStatus.Int()); 604 605 *length = aLen.Length(); 606 return PJ_SUCCESS; 607 } 608 609 610 /* 611 * Instruct the I/O Queue to write to the handle. 612 */ 613 PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, 614 pj_ioqueue_op_key_t *op_key, 615 const void *data, 616 pj_ssize_t *length, 617 pj_uint32_t flags, 618 const pj_sockaddr_t *addr, 619 int addrlen) 620 { 621 TRequestStatus reqStatus; 622 TPtrC8 aBuffer; 623 TInetAddr inetAddr; 624 TSockXfrLength aLen; 625 626 PJ_UNUSED_ARG(op_key); 627 628 // Forcing pending operation is not supported. 629 PJ_ASSERT_RETURN((flags & PJ_IOQUEUE_ALWAYS_ASYNC)==0, PJ_EINVAL); 630 631 // Must be pj_sockaddr_in for now. 632 PJ_ASSERT_RETURN(addrlen == sizeof(pj_sockaddr_in), PJ_EINVAL); 633 634 aBuffer.Set((const TUint8*)data, (TInt)*length); 635 PjSymbianOS::pj2Addr(*(const pj_sockaddr_in*)addr, inetAddr); 636 CPjSocket *pjSock = key->cbObj->get_pj_socket(); 637 638 pjSock->Socket().Send(aBuffer, flags, reqStatus, aLen); 639 User::WaitForRequest(reqStatus); 640 641 if (reqStatus.Int() != KErrNone) 642 return PJ_RETURN_OS_ERROR(reqStatus.Int()); 643 644 *length = aLen.Length(); 645 return PJ_SUCCESS; 646 } 647
Note: See TracChangeset
for help on using the changeset viewer.